## Project 2: Tracking User Activity
#### Vaibhav Beohar - Spring w205 - Section 4
==============================================

Following project is an attempt created by a hypothetical employee at an ed-tech firm that does the following: 

- creates a topic called "assessments" on Kafka 
- publishes assessments by various users of the ad tech firm 
- allows data scientists working at the ad tech firms' customers to run queries on the asessements topic

The approach taken by the project is as follows:
- Create Kafka topic for publishing-subscribing topic assessment for production-consumption 
- Create a Spark base execution engine, to perform ETL, use of programming languages such as Python, providing in-memory execution of data using Spark SQL and store the massively parallel RDD (Robust Distributed Dataset) to allow us to store data on memory in a transparent manner and to retain it on disk as required
- Finally save the RDD files in the HDFS base file system for Hadoop

## Spin a Jupyter Notebook for a pyspark kernel (instead of pyspark command line). 

Make sure in the docker-compose.yml file, Spark container has an expose section and ports section with the following entries. Also make sure any possible Cloudera container doesn't have the 8888 port conflict.

Also, we set the ip address to 0.0.0.0 generated by the pyspark kernel with our external ip address for the Google cloud virtual machine

    expose:
      - "8888"
    ports:
      - "8888:8888"

    docker-compose up -d

    docker-compose exec spark bash
    ln -s /w205 w205

    exit
    
    docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark

    http://0.0.0.0:8888/?token=99c8b790f17f5b6bf6597177ddd7eaa0bc8b1c6d093c5f3c

    http://35.230.105.251:8888/?token=99c8b790f17f5b6bf6597177ddd7eaa0bc8b1c6d093c5f3c

    curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp



## Create the Kafka topic assessments

    docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
    

## Write the assessments json object to the Kafka topic assessments
    docker-compose exec mids bash -c "cat /w205/project-2-vbeohar/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"

In [84]:
import json

from pyspark.sql import Row

In [85]:
spark

In [86]:
sc

In [87]:
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

In [88]:
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [89]:
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [90]:
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

In [91]:
extracted_assessments.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



In [92]:
extracted_assessments.registerTempTable('assessments')

In [93]:
spark.sql("select keen_id from assessments limit 10").show()

+--------------------+
|             keen_id|
+--------------------+
|5a6745820eb8ab000...|
|5a674541ab6b0a000...|
|5a67999d3ed3e3000...|
|5a6799694fc7c7000...|
|5a6791e824fccd000...|
|5a67a0b6852c2a000...|
|5a67b627cc80e6000...|
|5a67ac8cb0a5f4000...|
|5a67a9ba060087000...|
|5a67ac54411aed000...|
+--------------------+



In [94]:
spark.sql("select keen_timestamp, sequences.questions[0].user_incomplete from assessments limit 10").show()

+------------------+-------------------------------------------------------+
|    keen_timestamp|sequences[questions] AS `questions`[0][user_incomplete]|
+------------------+-------------------------------------------------------+
| 1516717442.735266|                                                   true|
| 1516717377.639827|                                                  false|
| 1516738973.653394|                                                  false|
|1516738921.1137421|                                                  false|
| 1516737000.212122|                                                  false|
| 1516740790.309757|                                                  false|
|1516746279.3801291|                                                  false|
| 1516743820.305464|                                                  false|
|  1516743098.56811|                                                  false|
| 1516743764.813107|                                                  false|

In [96]:
spark.sql("select sequences.abc123 from assessments limit 10").show()


+------+
|abc123|
+------+
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
+------+



######  This command below does not work
>> spark.sql("select sequence.id from assessments limit 10").show()


#### Nested multi-value as a dictionary

In [97]:
def my_lambda_sequences_id(x):
    raw_dict = json.loads(x.value)
    my_dict = {"keen_id" : raw_dict["keen_id"], "sequences_id" : raw_dict["sequences"]["id"]}
    return Row(**my_dict)


In [98]:

my_sequences = assessments.rdd.map(my_lambda_sequences_id).toDF()

my_sequences.registerTempTable('sequences')

In [99]:

spark.sql("select sequences_id from sequences limit 10").show()

spark.sql("select a.keen_id, a.keen_timestamp, s.sequences_id from assessments a join sequences s on a.keen_id = s.keen_id limit 10").show()

+--------------------+
|        sequences_id|
+--------------------+
|5b28a462-7a3b-42e...|
|5b28a462-7a3b-42e...|
|b370a3aa-bf9e-4c1...|
|b370a3aa-bf9e-4c1...|
|04a192c1-4f5c-4ac...|
|e7110aed-0d08-4cb...|
|5251db24-2a6e-424...|
|066b5326-e547-4da...|
|8ac691f8-8c1a-403...|
|066b5326-e547-4da...|
+--------------------+

+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|        sequences_id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|8ac691f8-8c1a-403...|
|5a26ee9cbf5ce1000...|1512500892.4166169|9bd87823-4508-4e0...|
|5a29dcac74b662000...|1512692908.8423469|e7110aed-0d08-4cb...|
|5a2fdab0eabeda000...|1513085616.2275269|cd800e92-afc3-447...|
|5a30105020e9d4000...|1513099344.8624721|8ac691f8-8c1a-403...|
|5a3a6fc3f0a100000...| 1513779139.354213|e7110aed-0d08-4cb...|
|5a4e17fe08a892000...|1515067390.1336551|9abd5b51-6bd8-11e...|
|5a4f3c69cc6444000...| 1515142249.858722|083844

#### Nested multi-valued as a list

In [100]:
def my_lambda_questions(x):
    raw_dict = json.loads(x.value)
    my_list = []
    my_count = 0
    for l in raw_dict["sequences"]["questions"]:
        my_count += 1
        my_dict = {"keen_id" : raw_dict["keen_id"], "my_count" : my_count, "id" : l["id"]}
        my_list.append(Row(**my_dict))
    return my_list


In [101]:

my_questions = assessments.rdd.flatMap(my_lambda_questions).toDF()

my_questions.registerTempTable('questions')

In [102]:

spark.sql("select id, my_count from questions limit 10").show()

spark.sql("select q.keen_id, a.keen_timestamp, q.id from assessments a join questions q on a.keen_id = q.keen_id limit 10").show()

+--------------------+--------+
|                  id|my_count|
+--------------------+--------+
|7a2ed6d3-f492-49b...|       1|
|bbed4358-999d-446...|       2|
|e6ad8644-96b1-461...|       3|
|95194331-ac43-454...|       4|
|95194331-ac43-454...|       1|
|bbed4358-999d-446...|       2|
|e6ad8644-96b1-461...|       3|
|7a2ed6d3-f492-49b...|       4|
|b9ff2e88-cf9d-4bd...|       1|
|bec23e7b-4870-49f...|       2|
+--------------------+--------+

+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|                  id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|803fc93f-7eb2-412...|
|5a17a67efa1257000...|1511499390.3836269|f3cb88cc-5b79-41b...|
|5a17a67efa1257000...|1511499390.3836269|32fe7d8d-6d89-4db...|
|5a17a67efa1257000...|1511499390.3836269|5c34cf19-8cfd-4f5...|
|5a26ee9cbf5ce1000...|1512500892.4166169|0603e6f4-c3f9-4c2...|
|5a26ee9cbf5ce1000...|1512500892.4166169|26a06b

#### How to handle "holes" in json data

In [103]:
def my_lambda_correct_total(x):
    
    raw_dict = json.loads(x.value)
    my_list = []
    
    if "sequences" in raw_dict:
        
        if "counts" in raw_dict["sequences"]:
            
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:
                    
                my_dict = {"correct": raw_dict["sequences"]["counts"]["correct"], 
                           "total": raw_dict["sequences"]["counts"]["total"]}
                my_list.append(Row(**my_dict))
    
    return my_list


In [104]:

my_correct_total = assessments.rdd.flatMap(my_lambda_correct_total).toDF()

my_correct_total.registerTempTable('ct')


In [105]:

spark.sql("select * from ct limit 10").show()

spark.sql("select correct / total as score from ct limit 10").show()

spark.sql("select avg(correct / total)*100 as avg_score from ct limit 10").show()

spark.sql("select stddev(correct / total) as standard_deviation from ct limit 10").show()

+-------+-----+
|correct|total|
+-------+-----+
|      2|    4|
|      1|    4|
|      3|    4|
|      2|    4|
|      3|    4|
|      5|    5|
|      1|    1|
|      5|    5|
|      4|    4|
|      0|    5|
+-------+-----+

+-----+
|score|
+-----+
|  0.5|
| 0.25|
| 0.75|
|  0.5|
| 0.75|
|  1.0|
|  1.0|
|  1.0|
|  1.0|
|  0.0|
+-----+

+-----------------+
|        avg_score|
+-----------------+
|62.65699745547047|
+-----------------+

+-------------------+
| standard_deviation|
+-------------------+
|0.31086692286170553|
+-------------------+



#### Writing data frame to Parquet

In [83]:
my_correct_total.write.parquet("/tmp/correct_total2")

#### Write 1 to 3 business queries

###### How many assesstments are in the dataset?

In [106]:
spark.sql("select count(*) from assessments").show()

+--------+
|count(1)|
+--------+
|    3280|
+--------+



###### What is the least common course taken?

In [79]:
spark.sql("select exam_name, count(*) from assessments group by exam_name order by 2 limit 10").show()


+--------------------+--------+
|           exam_name|count(1)|
+--------------------+--------+
|Nulls, Three-valu...|       1|
|Learning to Visua...|       1|
|Native Web Apps f...|       1|
|Operating Red Hat...|       1|
|Client-Side Data ...|       2|
|Arduino Prototypi...|       2|
|What's New in Jav...|       2|
|Learning Spring P...|       2|
|Hibernate and JPA...|       2|
|Understanding the...|       2|
+--------------------+--------+



In [70]:
assessments.write.parquet("/tmp/assessments2")