# MIDS W205 Fall 2021 Project 3
### Instructor: Shiraz Chakraverty
### Student: Ben Mok
### Team Members: Aastha Khanna, Ben Mok, Don Irwin, Theresa Kuruvilla
### Section Tuesday 6 P.M.
#### Date: 12/02/2021

## Introduction:

In this document we will explain the key components of project 3.

We will inspect the environment we set up.

We will inspect the code we run create the pipeline from and push data through it.

The Video link below gives a walk-through of our entire project end-to-end.

In [5]:
from IPython.display import HTML
HTML('<a href="https://www.youtube.com/watch?v=TpS3rIrctBo" target="https://www.youtube.com/watch?v=TpS3rIrctBo"> Click on this text or the image below to view an explanation video</a>')


In [6]:
HTML('<a href="https://www.youtube.com/watch?v=TpS3rIrctBo" target="https://youtu.be/Mgce9pA9ASc"> <img src="https://tuneman7.github.io/video.png" border=0, width="20%">    </a>')

***
## Team Contribution To the Codebase:

The instructor has been granted read access, and may view check-in history on our collaboration github repo, to view team member checkins for verification of the assessment below:

https://github.com/mids-w205-chakraverty/project_3_team_2_tue_6_30

Aastha:  

Contributed to hive table extraction and supported general development.  Was also responsible for finding fix which saved many hours.  Assisted Theresa in applying this fix to her environment.

Ben:  

Created Parameterized random sythetic event pitcher, YML file as well as, demonstrated understanding of the entire pipeline end-to-end. 

Theresa: 

Contributed to Flask API endpoint.

Don: 

Solution architect, contributed to steaming, query, table schema, and automation.

Lise:  

Dropped the class.



***

## Building blocks of the pipeline

1. The YML file:

    This is infrastructure as code file.  It specifies all the containers and their connections to one another.

2.  Configuraton Files:

    Some configuration files "log4j.properties" and the like are used for configuring spark.

3. Fask API file:

    These python files which we submit through Spark-Submit, this makes the pipeline reproductable and extensible.

4. Hive Table Definition Files

5. Spark Submit Program

6. Synthetic Parameterized Event Generation

7. Hive Query Files.

8. Jupyter Notebook File For Hive Reporting.

9. Bash Script File.
    

We will walk through each of these building blocks one by one.
    

### The YML File -- Our Infrastructure as Code Environment:

#### The YML file can be found at the following location:
./code_files/docker-compose.yml

Let us inspect the YML file, please read comments below:

```yml
---
#  Don Irwin 12/02/2021
#  
#  Some notable changes include exposing drives on certain containers.
#  Additionally for the conflientinc images we are no longer using the "latest" images.
#  This is because the latest images have changes in them which do not user "--zookeeper" flags
#  for topic creations from the command prompt.
#  
#  This changed on us during the work.
#  Hence we had to revert to versions :5.3.1 in order for our code to continue to work.
#
#
version: '2'
services:
  redis:
    image: redis:latest
    expose:
      - "6379"
      
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    expose:
      - "2181"
      - "2888"
      - "32181"
      - "3888"
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:5.3.1
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    expose:
      - "9092"
      - "29092"
    extra_hosts:
      - "moby:127.0.0.1"

  cloudera:
    image: midsw205/hadoop:0.0.2
    hostname: cloudera
    expose:
      - "8020" # nn
      - "8888" # hue
      - "9083" # hive thrift
      - "10000" # hive jdbc
      - "50070" # nn http
    ports:
      - "8888:8888"
      - "9093"
    extra_hosts:
      - "moby:127.0.0.1"
    volumes:
      - ~/w205:/w205      

  spark:
    image: midsw205/spark-python:0.0.6
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    expose:
      - "8888"
    #ports:
    #  - "8888:8888"
    depends_on:
      - cloudera
    environment:
      HADOOP_NAMENODE: cloudera
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"
    command: bash

  presto:
    image: midsw205/presto:0.0.1
    hostname: presto
    volumes:
      - ~/w205:/w205
    expose:
      - "8080"
    ports:
      - "8082:8080" # Adding binding to local port 8082 for connection from notebooks; 8080 was in use
    environment:
      HIVE_THRIFTSERVER: cloudera:9083
    extra_hosts:
      - "moby:127.0.0.1"

  mids:
    image: midsw205/base:0.1.9
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    expose:
      - "5000"
    ports:
      - "5000:5000"
    extra_hosts:
      - "moby:127.0.0.1"
    command: jupyter notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root


```

### Configuration Files

####  The Spark log4j.properties file is available at the following location.

./code_files/log4j.properties

We are interested in this file because adjusting a setting shuts down Pyspark's verbose warning messaging.

The line we are interested in is below.

```bash

log4j.rootCategory=FATAL, console

```

We copy our modified log4j.properties file to our spark instance using the following command.

```bash

docker-compose exec spark bash -c "cp /w205/project-3-tuneman7/code_files/log4j.properties ./conf/log4j.properties"

```



### Flask API File:

####  The Python Query file is available at the following location:

./code_files/game_api.py

This file contains the Python code run within the flask http server.

```bash

docker-compose exec mids env FLASK_APP=/w205/project-3-tuneman7/code_files/game_api.py flask run >> log_file1.txt &

```

Let us inspect this file's contents:

```python

#!/usr/bin/env python
from __future__ import print_function
import json
import uuid
from kafka import KafkaProducer
from flask import Flask, request, session
from flask import jsonify
import sys
from multiprocessing import Value


counter = Value('i', 0)
app = Flask(__name__)

event_id = 0

def get_event_id():
    out = str(uuid.uuid4())
    return out

def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers='kafka:29092')

def log_event_parameters():
    args = request.args
    print(args, file=sys.stderr)
    print(request.args.to_dict(),file=sys.stderr)
    print(request.args.viewkeys(),file=sys.stderr)
    key_views = request.args.viewkeys()
    output_list = []
    output_dict = {}
    event_id = get_event_id()
    return_string = ""
    key_count = 0
    for key in request.args.viewkeys():
        my_dict = {}
        my_dict["event_id"] = event_id
        my_dict["parameter_name"] = key
        my_dict["parameter_value"] = request.args.get(key)       
        print(key,file=sys.stderr)
        print(request.args.get(key),file=sys.stderr)
        output_dict[key]=request.args.get(key)       
        output_list.append(my_dict)
        key_count = key_count +1
        producer.send("event_parameters", json.dumps(my_dict).encode())
    if key_count==0:
        my_dict = {}
        my_dict["event_id"] = event_id
        my_dict["parameter_name"] = "user"
        my_dict["parameter_value"] = "NONE"
        producer.send("event_parameters", json.dumps(my_dict).encode())
    return event_id

def log_to_kafka(topic, event):
    event_id = log_event_parameters()
    event.update(request.headers)
    event_id_dict={'event_id':event_id}
    event.update(event_id_dict)
    producer.send(topic, json.dumps(event).encode())


@app.route("/")
def default_response():
    default_event = {'event_type': 'default'}
    log_to_kafka('events', default_event)
    return "This is the default response!\n"


@app.route("/purchase_a_sword")
def purchase_a_sword():
    purchase_sword_event = {'event_type': 'purchase_sword'}
    log_to_kafka('events', purchase_sword_event)
    return "Sword Purchased!\n"

@app.route("/join_a_guild")
def join_guild():
    join_guild_event = {'event_type': 'join_guild'}
    log_to_kafka('events', join_guild_event)
    return "Joined Guild!\n"

@app.route("/leave_guild")
def leave_guild():
    leave_guild_event = {'event_type': 'leave_guild'}
    log_to_kafka('events', leave_guild_event)
    return "Left Guild!\n"

@app.route("/get_credit")
def get_credit():
    get_credit_event = {'event_type': 'get_credit'}
    log_to_kafka('events', get_credit_event)
    return "Received Credit!\n"

@app.route("/shutdown")
def shutdown():
    func = request.environ.get('werkzeug.server.shutdown')
    if func is None:
        raise RuntimeError('Not running with the Werkzeug Server')
    func()
    return ""


```


***
### Hive Table Hive Table Definition Files

####  The HQL file can be found at the following location:

./code_files/hive_table_creation.hql

This file is executed within our pipeline in the following way:

```bash
docker-compose exec cloudera hive -f /w205/project-3-tuneman7/code_files/hive_table_creation.hql 
```
Its contents are below:

```sql

create external table if not exists default.all_events (
    raw_event string,
    timestamp string,
    Accept string,
    Host string,
    User_Agent string,
    event_id string,
    event_type string
  )
  stored as parquet 
  location '/tmp/all_events'
  tblproperties ("parquet.compress"="SNAPPY");
  

create external table if not exists default.event_parameters (
    raw_event string,
    timestamp string,
    Accept string,
    Host string,
    User_Agent string,
    event_id string,
    parameter_name string,
    parameter_value string
    
  )
  stored as parquet 
  location '/tmp/event_parameters'
  tblproperties ("parquet.compress"="SNAPPY");

```


***
### Spark Submit Program

####  The Spark Submit program can be found at the following location:

./code_files/separate_event_stream_2.py

This file is executed within our pipeline in the following way:

```bash

docker-compose exec spark spark-submit /w205/project-3-tuneman7/code_files/separate_events_stream_2.py &

```
Its contents are below:

```python

#!/usr/bin/env python
"""Extract events from kafka and write them to hdfs
"""
import json,time
from pyspark.sql import SparkSession, Row
#from pyspark.sql.functions import udf
from pyspark.sql.functions import udf, from_json
from pyspark.sql.types import *

def general_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_id", StringType(), True),
        StructField("event_type", StringType(), True),
    ])

def event_parameter_event_schema():
    """
    root
    |-- Accept: string (nullable = true)
    |-- Host: string (nullable = true)
    |-- User-Agent: string (nullable = true)
    |-- event_type: string (nullable = true)
    """
    return StructType([
        StructField("Accept", StringType(), True),
        StructField("Host", StringType(), True),
        StructField("User-Agent", StringType(), True),
        StructField("event_id", StringType(), True),
        StructField("parameter_name", StringType(), True),
        StructField("parameter_value", StringType(), True),
    ])




@udf('string')
def munge_event(event_as_json):
    event = json.loads(event_as_json)
    event['Host'] = "moe"
    event['Cache-Control'] = "no-cache"
    return json.dumps(event)

def main():
    """main
    """
    spark = SparkSession \
        .builder \
        .appName("ExtractEventsJob") \
        .getOrCreate()

    raw_events = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "events") \
        .load()
    
    raw_event_parameters = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:29092") \
        .option("subscribe", "event_parameters") \
        .load()

    event_parameters = raw_event_parameters \
        .select(raw_event_parameters.value.cast('string').alias('raw_event'),
                raw_event_parameters.timestamp.cast('string'),
                from_json(raw_event_parameters.value.cast('string'),
                          event_parameter_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')

    all_events = raw_events \
        .select(raw_events.value.cast('string').alias('raw_event'),
                raw_events.timestamp.cast('string'),
                from_json(raw_events.value.cast('string'),
                          general_event_schema()).alias('json')) \
        .select('raw_event', 'timestamp', 'json.*')    
    
    sink1 = event_parameters \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_event_parameters") \
        .option("path", "/tmp/event_parameters") \
        .trigger(processingTime="10 seconds") \
        .start()
        
    sink2 = all_events \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/tmp/checkpoints_for_all_events") \
        .option("path", "/tmp/all_events") \
        .trigger(processingTime="10 seconds") \
        .start()        
    

    sink1.awaitTermination()
    sink2.awaitTermination()

    
if __name__ == "__main__":
    main()


```

***
### Sythetic Parameterized Event (data) Generation

####  This program can be found at the following location:

./code_files/primative_event_pitcher_ab_2.py

This file is executed within our pipeline in the following way:

```bash

python primative_event_pitcher_ab_2.py >> log_event_pitcher.txt

```

Its contents are listed below:

```python

import sys, getopt,os,smtplib,time
from os.path import basename
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import COMMASPACE, formatdate
import subprocess
import random

def main():
    
    with open('guild_names.csv', encoding='ISO-8859-1') as g:
        guild_names = [row.split(',')[0].strip('\n') for row in g]
        g.close()
    with open('sword_types.csv',encoding='ISO-8859-1') as s:
        sword_types = [row.split(',')[0].strip('\n') for row in s]
        s.close()
    rand_int  = random.randint(5,110)
    thisdir   = os.getcwd()
    users     = ['ben', 'aastha', 'lise', 'theresa', 'don']
    events    = ['purchase_a_sword', 'join_a_guild', 'leave_guild', 'get_credit']
    
    
    flask_shutdown_command = "docker-compose exec mids curl http://localhost:5000/shutdown"

    print("pitching events")
    # Just create 10000 events.
    i = 1
    
    while i < 140:
        i+=1
        e_randint = random.randint(0,len(events)-1)
        u_randint = random.randint(0,len(users)-1)
        g_randint = random.randint(0,len(guild_names)-1)
        s_randint = random.randint(0,len(sword_types)-1)
        b_randint = random.randint(0,40)
        base      = 'docker-compose exec mids ab -n {} -H "Host: user2.att.com" http://localhost:5000/'.format(b_randint)

        
        if events[e_randint] =='join_a_guild':
            line = base + 'join_a_guild"?user={}&guild_name={}"'
            line = line.format(users[u_randint],guild_names[g_randint])
        elif events[e_randint] =='leave_guild':
            line = base + 'leave_guild"?user={}&guild_name={}"'
            line = line.format(users[u_randint],guild_names[g_randint])
        elif events[e_randint] == 'purchase_a_sword':
            line = base + 'purchase_a_sword"?user={}&sword_type={}"'
            line = line.format(users[u_randint],sword_types[s_randint])
        else:
            line = base + 'get_credit"?user={}&guild_name={}"'
            line = line.format(users[u_randint],guild_names[g_randint])
        print(line)
        subprocess.call(line, shell=True)
        print("Press and HOLD CTRL+C to terminate, else 10000 events will be created")
        
        

main()


```


***
### Hive Query File

This is a HQL file which contains "canned" queries to answer questions about the state of the game / pipeline.

####  This program can be found at the following location:

./code_files/query_hive_tables.hql

This file is executed within our pipeline in the following way:

```bash

docker-compose exec presto presto --server presto:8080 --catalog hive --schema default -f /w205/project-3-tuneman7/code_files/query_hive_tables.hql 

```

Its contents are listed below:

```sql

select '-----------LOOK AT TOTAL EVENT COUNTS--------------------';
select
    event_type,
    count(event_type) as event_count
from 
    all_events
group by event_type;
select '-----------LOOK USER GUILD JOIN COUNT--------------------';
select 
    un.parameter_value as user_name,
    et.event_type as event,
    count(un.parameter_value) as guild_join_count
from 
    all_events et
join 
    event_parameters un
on 
    et.event_id = un.event_id
and 
    et.event_type = 'join_guild'
and 
    un.parameter_name = 'user'    
group by 
    un.parameter_value
    ,et.event_type
order by 
    count(un.parameter_value) desc limit 10;
select '-----------LOOK USER GUILD LEAVE COUNT--------------------';
select 
    un.parameter_value as user_name,
    et.event_type as event,
    count(un.parameter_value) as guild_leave_count
from 
    all_events et
join 
    event_parameters un
on 
    et.event_id = un.event_id
and 
    et.event_type = 'leave_guild'
and 
    un.parameter_name = 'user'    
group by 
    un.parameter_value
    ,et.event_type
order by 
    count(un.parameter_value) desc limit 10;
select '-----------LOOK USER GET CREDIT COUNT--------------------';
select 
    un.parameter_value as user_name,
    et.event_type as event,
    count(un.parameter_value) as get_credit_count
from 
    all_events et
join 
    event_parameters un
on 
    et.event_id = un.event_id
and 
    et.event_type = 'get_credit'
and 
    un.parameter_name = 'user'    
group by 
    un.parameter_value
    ,et.event_type
order by 
    count(un.parameter_value) desc limit 10;
select '-----------LOOK AT 10 MOST POPULAR SWORDS--------------------';
select 
    un.parameter_value as sword_name,
    et.event_type as event,
    count(un.parameter_value) as popular_sword_count
from 
    all_events et
join 
    event_parameters un
on 
    et.event_id = un.event_id
and 
    un.parameter_name = 'sword_type'
group by 
    un.parameter_value
    ,et.event_type
order by 
    count(un.parameter_value) desc limit 10;
select '-----------LOOK AT 10 MOST POPULAR GUILDS--------------------';
select 
    un.parameter_value as guild_name,
    et.event_type as event,
    count(un.parameter_value) as popular_guild_count
from 
    all_events et
join 
    event_parameters un
on 
    et.event_id = un.event_id
and 
    un.parameter_name = 'guild_name'
and 
    et.event_type = 'join_guild'
group by 
    un.parameter_value
    ,et.event_type
order by 
    count(un.parameter_value) desc limit 10;
select '-------------------------------';    

```


***
### Jyputer Notebook for Hive Reporting

This is a jupyter notebook which contains basic queries and answers to questions.

####  This file can be found at the following location:

./code_files/hive_reports.ipynb



***
### Bash Script for Running the Pipeline

This is the "glue" program that runs the pipeline end-to-end

####  This file can be found at the following location:

./code_files/dd.sh

This program is executed in the following way:

```bash
. dd.sh
```

The contents of this file are below:

```bash

#mkdir ~/w205/spark-from-files/
#cd ~/w205/spark-from-files
#cp ~/w205/course-content/11-Storing-Data-III/docker-compose.yml .
# cp ~/w205/course-content/11-Storing-Data-III/*.py .
#bring up images
docker-compose up -d
echo "sleeping 30"
sleep 30
echo "looking at HDFS"

#look at hdf
docker-compose exec cloudera hadoop fs -ls /tmp/

echo "creating topic"

docker-compose exec kafka kafka-topics --create --topic events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
sleep 3
docker-compose exec kafka kafka-topics --create --topic event_parameters --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
echo "sleeping"
sleep 4
rm log_file1.txt
echo " ">log_file1.txt

#spin up API endpoint flask container.
docker-compose exec mids env FLASK_APP=/w205/project-3-tuneman7/code_files/game_api.py flask run >> log_file1.txt &
echo "sleeping"
sleep 4
echo "copying config files over"
echo "docker-compose exec spark bash -c \"cp /w205/project-3-tuneman7/code_files/log4j.properties ./conf/log4j.properties\""
docker-compose exec spark bash -c "cp /w205/project-3-tuneman7/code_files/log4j.properties ./conf/log4j.properties"

#run spark submit
echo "doing the spark submit"
echo "docker-compose exec spark spark-submit /w205/project-3-tuneman7/code_files/separate_events_stream_2.py"
docker-compose exec spark spark-submit /w205/project-3-tuneman7/code_files/separate_events_stream_2.py &

#create hive tables
echo "docker-compose exec cloudera hive -f /w205/project-3-tuneman7/code_files/hive_table_creation.hql"
docker-compose exec cloudera hive -f /w205/project-3-tuneman7/code_files/hive_table_creation.hql 

#look at hive tables and checkpoint tables.
docker-compose exec cloudera hadoop fs -ls /tmp/


#Run synthetic parameterized event generation.
echo "python primative_event_pitcher_ab_2.py > log_event_pitcher.txt"
echo "press and HOLD CTL+C to terminate:"
x=1
while [ $x -le 500 ]
do
  #Run the event pitcher
  python primative_event_pitcher_ab_2.py >> log_event_pitcher.txt
  #Cycle flask
  docker-compose exec mids curl http://localhost:5000/shutdown
  docker-compose exec mids env FLASK_APP=/w205/project-3-tuneman7/code_files/game_api.py flask run >> log_file1.txt &
  #Run the queries and see results
  docker-compose exec presto presto --server presto:8080 --catalog hive --schema default -f /w205/project-3-tuneman7/code_files/query_hive_tables.hql 
  sleep 2
  echo "press and HOLD CTL+C to terminate:"
done

#docker-compose down


```



### Running the pipeline end-to-end (single cycle)

####  A single-cycle pipeline (one that does only one batch of apache-bench calls) is created below.

```bash

. run_demo_of_pipeline.sh

```

It is not possible to run this pipeline within this notebook.  However, once it is run its output can be gotten.

Below are some ad-hoq query results


In [1]:
!pip install pyhive



In [2]:
from pyhive import presto
import pandas as pd

presto_conn = presto.connect(
    host='0.0.0.0',
    port=8082 # Exposed Presto port (see docker compose file)
)

pd.read_sql_query("SHOW TABLES", presto_conn)

Unnamed: 0,Table
0,all_events
1,event_parameters


In [3]:
event_parameters = pd.read_sql_query("select     un.parameter_value as user_name,     et.event_type as event,     count(un.parameter_value) as guild_join_count  from      all_events et  join      event_parameters un on     et.event_id = un.event_id and    et.event_type = 'join_guild' and     un.parameter_name = 'user'     group by     un.parameter_value    ,et.event_type order by     count(un.parameter_value) desc limit 10", presto_conn)
event_parameters.head()



Unnamed: 0,user_name,event,guild_join_count
0,lise,join_guild,2380
1,don,join_guild,2053
2,ben,join_guild,1863
3,aastha,join_guild,1800
4,theresa,join_guild,1761


In [4]:
event_parameters = pd.read_sql_query("select     un.parameter_value as sword_name,    et.event_type as event,    count(un.parameter_value) as popular_sword_count from     all_events et join     event_parameters un on     et.event_id = un.event_id and     un.parameter_name = 'sword_type' group by     un.parameter_value     ,et.event_type order by     count(un.parameter_value) desc limit 10", presto_conn)
event_parameters.head()



Unnamed: 0,sword_name,event,popular_sword_count
0,Sacrifice,purchase_sword,225
1,The Metallium Slither,purchase_sword,222
2,Justice,purchase_sword,191
3,Unhappy Ending,purchase_sword,189
4,Innocence End,purchase_sword,176
