# W205 Project 3 Notebook

### Initial Command Line Work

We first need to clone in our git repository for project 3. We navigate to our w205 directory and use the following command
```
git clone https://github.com/mids-w205-crook/project-3-isaacvernon1
```

We can then navigate into this directory and copy in the initial files for docker-compose.yml and game_api.py. The changes made to these files can be found by looking at the respective files
```
cp ~/w205/course-content/13-Understanding-Data/docker-compose.yml ~/w205/project-3-isaacvernon1
cp ~/w205/course-content/12-Querying-Data-II/*.py ~/w205/project-3-isaacvernon1
```

Now that everything we need has been imported, we can go ahead and bring up our cluster and check that it is brought up.
```
docker-compose up -d
docker-compose ps
docker ps -a
```

We can then create a Kafka topic "events" that we will log our app server events to.
```
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```

Now that we have our Kafka topic, we will start up our flask instance server (API server)
```
docker-compose exec mids env FLASK_APP=/w205/project-3-isaacvernon1/game_api.py flask run --host 0.0.0.0
```

With our API (Flask) server set up, we can now use Apache Bench to generate some test data for our pipeline.
```
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild
```

We can now create a symbolic link to the w205 directory to allow us to run this notebook
```
docker-compose exec spark bash
ln -s /w205 w205
exit
```

We can then create a notebook instance with the following
```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
```

### Pyspark Code In Notebook

We first need to import a couple of python packages to run our code

In [8]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

Next, we create a user defined function "is_purchase" which returns True if the event type is "purchase_sword" and False if not.

In [9]:
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

We can then read in the raw events from our Kafka topic events (This code will read all of the events in the topic).

In [10]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

Next, we can filter our the events that were purchase events by using our previously defined function

In [11]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

Finally, we can then convert these purchase events into a readable dataframe, with the schema and the output shown below (This is only with the static tests, though it would work with the later infinite apache bench test command as well)

In [12]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [13]:
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [14]:
extracted_purchase_events.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|us

Now that we have a table for our purchase events, we can then write this to HDFS as parquet.

In [15]:
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

We can also reproduce this pipeline for the other events by modifying the User Defined Function from earlier

In [16]:
@udf('boolean')
def is_guild(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'join_guild':
        return True
    return False

guild_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_guild('raw'))
        
extracted_guild_events = guild_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()
        
extracted_guild_events.show()

+------+-----------------+---------------+----------+----------+--------------------+
|Accept|             Host|     User-Agent|event_type|guild_name|           timestamp|
+------+-----------------+---------------+----------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild| 

In [17]:
extracted_guild_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/guild')

We can then verify that these wrote to HDFS properly with the following commands
```
docker-compose exec cloudera hadoop fs -ls /tmp/
docker-compose exec cloudera hadoop fs -ls /tmp/purchases
docker-compose exec cloudera hadoop fs -ls /tmp/guild
```

Now that we have stored our tables in HDFS, we can read them back in as well

In [18]:
purchases = spark.read.parquet('/tmp/purchases')
guilds = spark.read.parquet('/tmp/guild')

In [19]:
purchases.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|us

In [20]:
guilds.show()

+------+-----------------+---------------+----------+----------+--------------------+
|Accept|             Host|     User-Agent|event_type|guild_name|           timestamp|
+------+-----------------+---------------+----------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild| 

We can now register these as tables in SQL, which will allow us to query from them with spark sql

In [21]:
purchases.registerTempTable('purchases')
guilds.registerTempTable('guilds')

In [22]:
purchases_query = spark.sql("select * from purchases where Host = 'user1.comcast.com'")
purchases_query.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|  falchion|2020-12-04 23:44:...|
|   */*|us

In [23]:
guilds_query = spark.sql("select * from guilds where guild_name = 'mage'")
guilds_query.show()

+------+-----------------+---------------+----------+----------+--------------------+
|Accept|             Host|     User-Agent|event_type|guild_name|           timestamp|
+------+-----------------+---------------+----------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild|      mage|2020-12-04 23:45:...|
|   */*|user1.comcast.com|ApacheBench/2.3|join_guild| 

We can then convert these SQL tables into Pandas Dataframes.

In [24]:
df = purchases_query.toPandas()
df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
count,10,10,10,10,10,10
unique,1,1,1,1,1,10
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,falchion,2020-12-04 23:44:58.04
freq,10,10,10,10,10,1


In [25]:
dat = guilds_query.toPandas()
dat.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,guild_name,timestamp
count,20,20,20,20,20,20
unique,1,2,1,1,1,20
top,*/*,user1.comcast.com,ApacheBench/2.3,join_guild,mage,2020-12-05 00:06:50.383
freq,20,10,20,20,20,1


### Simple Analytics

Now that we have SQL tables of subsets of our events, we can run queries and perform simple analytics. We will answer the following questions with queries.

- What is the most popular Guild?
- How many join guilds events do we see?
- How many swords did user 1 purchase?

To find the most popular guild, we can run the following query against the guilds table

In [26]:
guilds_query = spark.sql("select count(*), guild_name from guilds group by guild_name")
guilds_query.show()

+--------+----------+
|count(1)|guild_name|
+--------+----------+
|      20|      mage|
+--------+----------+



We find that the Mage guild is the most popular (and only) guild in our events with 20 join_guild occurences. We can then query the guilds table again to determine how many guild events we have total.

In [27]:
guilds_query = spark.sql("select count(*) from guilds")
guilds_query.show()

+--------+
|count(1)|
+--------+
|      20|
+--------+



Overall, we find that there are 20 join_guild events overall. Next, we can query the purchases table to see how many swords user 1 purchased.

In [28]:
purchases_query = spark.sql("select count(*) from purchases where Host = 'user1.comcast.com'")
purchases_query.show()

+--------+
|count(1)|
+--------+
|      10|
+--------+



We find that User 1 has purchased 10 swords. Next, we want to modify our pipeline to allow for stream processing. First, we will need to import some new packages.

### Stream Processing

In [29]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

We can now define a schema for our purchase sword events and create a new User Defined Function that also filters out purchase sword events.

In [30]:
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

In [31]:
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

Before we start the stream processing pipeline, we will want to create infinite tests. This can be done with the following apache bench command (Will generate 10 purchase sword events every 5 seconds until manually stopped)
```
while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done
```

We can now create our stream processing pipeline using the following (Will write a new parquet file to the sword_purchases directory every 10 seconds).

In [32]:
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [33]:
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [34]:
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

If we want to check the functionality of our pipeline, we can continue to run the following command and see how our number of files increases
```
docker-compose exec cloudera hadoop fs -ls /tmp/sword_purchases
```

When we want to stop writing files to HDFS, we can simply run the following line of code

In [35]:
sink.stop()

Now, if we want to want to look at our tables, we can use Hive and Presto. First we can create and external table for schema on read in Hive
```
docker-compose exec cloudera hive
```

Once we are in Hive, we can run the following command to create the external table in the sword_purchases directory of HDFS
```
create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");
```

We can then run the following code to check if it created our table and to look at the table it created

```
hive> show tables;
OK
sword_purchases
```
This is the result we should see if our table was properly created. We can then look at the output we should have from describing the table

```
hive> desc sword_purchases;
OK
accept                  string                                      
host                    string                                      
user_agent              string                                      
event_type              string                                      
timestamp               string
```

We can then use the following to exit Hive
```
exit;
```

Now, we can see if our data in the HDFS is queryable through Presto. We can first start Presto
```
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
```

We can then run the following to see the tables we have (and we find that we have the sword_purchases table)
```
show tables;
```

We can now query the sword_purchases table to make sure we can do analysis in Presto
```
select count(*) from sword_purchases;
```
The result of the query is below. We can see that when we ran this query, there were 200 sword purchases.
```
presto:default> select count(*) from sword_purchases;
 _col0 
-------
   200 
(1 row)
```

We can also run one more query to be sure
```
select * from sword_purchases limit 5;
```

And below is the output
```
-------------------------------------------------------------------------------------------------------------------------------------------+------------------------
 {"Host": "user1.comcast.com", "sword_type": "falchion", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-05 00:32:35.069
 {"Host": "user1.comcast.com", "sword_type": "falchion", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-05 00:32:35.073
 {"Host": "user1.comcast.com", "sword_type": "falchion", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-05 00:32:35.075
 {"Host": "user1.comcast.com", "sword_type": "falchion", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-05 00:32:35.079
 {"Host": "user1.comcast.com", "sword_type": "falchion", "event_type": "purchase_sword", "Accept": "*/*", "User-Agent": "ApacheBench/2.3"} | 2020-12-05 00:32:35.083
(5 rows)
```

We can then stop presto by using the exit command
```
exit;
```

Now that we have finished all parts of the project, we can go ahead and stop our Flask Server (And our notebook server) with CTRL C. We can then bring down our cluster and check to ensure that it went down properly with 
```
docker-compose down
docker-compose ps
docker ps -a
```