# Project 3
 _Josh Archer_

# First part, in the command line
_the jupyter notebook work is in the second half of this notebook_

## Setup

- Copy in the YAML file (Week 13), Comment out ports and 8888 on cloudera;
Uncomment on spark

```
cp ~/w205/course-content/13-Understanding-Data/docker-compose.yml ~/w205/project-3-jearcher
```

- Copy in `game_api.py`

```
cp ~/w205/course-content/13-Understanding-Data/game_api.py ~/w205/project-3-jearcher 
```

## Starting Spark with Jupyter

- Start up Spark

```
docker-compose exec spark bash
```

- In Spark, create a symbolic link
```
ln -s /w205 w205
exit
```

- Run enhanced version of pyspark comand line to tarket jupyter notebook
```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
```

- URL for Dec 5

http://34.83.253.127:8888/?token=adac487a3a7124a3ee5a4957a64f981715a0f36a4c05eff7

# Edit game API to add events and metadata

- Edit game api, add event buy_sword and join_guild, add metadata

```
vi game_api.py
```

- Example new event with metadata

```
@app.route("/buy_a_sword")
def buy_a_sword():
    buy_sword_event = {'event_type': 'buy_sword',
            'sword_condition': 'rustyAF'}
    log_to_kafka('events', buy_sword_event)
    return "Sword Bought!\n"
```
- Save edits 

## Spin up the cluster and take out the flask app

- Spin up the cluster

```
docker-compose up -d
```

- Check to make sure everything came up, nothing extra

```
docker-compose ps
```

- Create topic events
```
docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```

- Take out **Flask App**, hold it open in a window

```
docker-compose exec mids env FLASK_APP=/w205/project-3-jearcher/game_api.py flask run --host 0.0.0.0
```

## Generate events using Apache Bench

- Generate Events using Apache Bench (generates 10 events per line)

**user 1**
    
```
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/join_guild
```

**user 2**
     
```
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/buy_a_sword
docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/join_guild
```

### Infinite loop for Apache Bench
- After week 13, use streaming apache bench (leave open in window), an infinite loop to run Apache Bench Command

- Purchase Sword (User1)

```
while true; do docker-compose exec mids ab -n 10 -H "Host: user1.comcast.com" http://localhost:5000/purchase_a_sword; sleep 5; done
```

- Purchase Sword (User2)


```
while true; do docker-compose exec mids ab -n 10 -H "Host: user2.att.com" http://localhost:5000/purchase_a_sword; sleep 5; done
```


- Look at the events (should get more every few seconds)

```
docker-compose exec mids kafkacat -C -b kafka:29092 -t events -o beginning -e
```

- Check out what it wrote in Hadoop

```
docker-compose exec cloudera hadoop fs -ls /tmp/sword_purchases
```

## Using HIVE

```
docker-compose exec cloudera hive
```

- Writing stream to sword purchases (create external table)

```
create external table if not exists default.sword_purchases (Accept string, Host string, User_Agent string, event_type string, timestamp string) stored as parquet location '/tmp/sword_purchases'  tblproperties ("parquet.compress"="SNAPPY");
```

- In HIVE, take a look at the table

```
> show tables; 
> exit;
```

## Query from Presto

```
docker-compose exec presto presto --server presto:8080 --catalog hive --schema default
```

- in Presto

```
> show tables;
> select * from sword_purchases;
```

- Check the count a couple times, should increase every few seconds
```
> select count(*) from sword_purchases;
```

## Shutting down cluster

```
docker-compose down
docker-compose ps
docker ps -a
```

# Second part, Work in the Jupyter Notebook
## After Week 11

In [71]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [72]:
# User Defined Function
@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)


In [73]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [74]:
raw_events.show()

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 48 6F 73 7...|events|        0|     0|2020-12-08 02:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     1|2020-12-08 02:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     2|2020-12-08 02:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     3|2020-12-08 02:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     4|2020-12-08 02:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     5|2020-12-08 02:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     6|2020-12-08 02:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0|     7|2020-12-08 02:36:...|            0|
|null|[7B 22 48 6F 73 7...|events|        0

In [75]:
munged_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .withColumn('munged', munge_event('raw'))

In [76]:
munged_events.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|raw                                                                                                                                        |timestamp              |munged                                                                                                                                                    |
+-------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"Host": "user1.comcast.com", "sword

### We can't call a `.show()` on the following due to Schema issues that will be fixed in week 12

In [77]:
extracted_events = munged_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.munged))) \
        .toDF()

## After week 12

##### following code from https://github.com/mids-w205-crook/course-content/blob/master/12-Querying-Data-II/filtered_writes.py

In [106]:
import json
from pyspark.sql import Row
from pyspark.sql.functions import udf

In [107]:
# User defined function 
@udf('boolean')
def is_purchase(event_as_json):
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [108]:
raw_events = spark \
        .read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .option("startingOffsets", "earliest") \
        .option("endingOffsets", "latest") \
        .load()

In [109]:
purchase_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw'),
                raw_events.timestamp.cast('string')) \
        .filter(is_purchase('raw'))

In [110]:
extracted_purchase_events = purchase_events \
        .rdd \
        .map(lambda r: Row(timestamp=r.timestamp, **json.loads(r.raw))) \
        .toDF()

In [111]:
extracted_purchase_events.printSchema()

root
 |-- Accept: string (nullable = true)
 |-- Host: string (nullable = true)
 |-- User-Agent: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- sword_type: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [112]:
extracted_purchase_events.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|us

In [113]:
# parquet write it to tmp/purchases
extracted_purchase_events \
        .write \
        .mode('overwrite') \
        .parquet('/tmp/purchases')

#### PySpark code: https://github.com/kevin-crook-ucb/ucb_w205_crook_supplement/blob/master/2020_Fall/synch_12.md

In [114]:
# Read the parquet just wrote
purchases = spark.read.parquet('/tmp/purchases')

In [115]:
purchases.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|us

In [116]:
purchases.registerTempTable('purchases')

In [117]:
purchases_by_example2 = spark.sql("select * from purchases where Host = 'user1.comcast.com'")

In [118]:
purchases_by_example2.show()

+------+-----------------+---------------+--------------+----------+--------------------+
|Accept|             Host|     User-Agent|    event_type|sword_type|           timestamp|
+------+-----------------+---------------+--------------+----------+--------------------+
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|user1.comcast.com|ApacheBench/2.3|purchase_sword|one_handed|2020-12-08 02:36:...|
|   */*|us

In [119]:
df = purchases_by_example2.toPandas()

In [120]:
df.describe()

Unnamed: 0,Accept,Host,User-Agent,event_type,sword_type,timestamp
count,2150,2150,2150,2150,2150,2150
unique,1,1,1,1,1,2150
top,*/*,user1.comcast.com,ApacheBench/2.3,purchase_sword,one_handed,2020-12-08 03:35:01.726
freq,2150,2150,2150,2150,2150,1


## Simple Spark SQL analytics on Spark Dataframe in Memory

- How many times did user 1 Purchase a Sword?

In [121]:
purchases_user1 = spark.sql("select count(*) from purchases where Host = 'user1.comcast.com'")
purchases_user1.show()

+--------+
|count(1)|
+--------+
|    2150|
+--------+



- How many times did user 2 Purchase a Sword?

In [122]:
purchases_user2 = spark.sql("select count(*) from purchases where Host = 'user2.att.com'")
purchases_user2.show()

+--------+
|count(1)|
+--------+
|     130|
+--------+



- How many total Sword Purchases? (since there are only 2 users, this should be a sum of the previous two queries. 

In [123]:
purchases_total = spark.sql("select count(*) from purchases")
purchases_total.show()

+--------+
|count(1)|
+--------+
|    2280|
+--------+



# After Week 13
#### Following code from: https://github.com/mids-w205-crook/course-content/blob/master/13-Understanding-Data/write_swords_stream.py

In [124]:
import json
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import StructType, StructField, StringType

In [125]:
# func to impose schema
def purchase_sword_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    |-- timestamp: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

In [126]:
# User defined function
@udf('boolean')
def is_sword_purchase(event_as_json):
    """udf for filtering events
    """
    event = json.loads(event_as_json)
    if event['event_type'] == 'purchase_sword':
        return True
    return False

In [127]:
# New way to pull out raw events
raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()

In [128]:
# Filtering first, then imposing schema
sword_purchases = raw_events \
        .filter(is_sword_purchase(raw_events.value.cast('string'))) \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          purchase_sword_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

In [139]:
# Notice .start() is in here, will get it to start
sink = sword_purchases \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_sword_purchases") \
        .option("path", "/tmp/sword_purchases") \
        .trigger(processingTime="10 seconds") \
        .start()

In [138]:
# Run IF you want to stop
sink.stop()