## Project 2: Tracking User Activity
### Maria DiMedio
w205 Summer 2021

In [1]:
#load packages for pyspark and transformation
import json
import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

import pyspark.sql.functions as F
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import udf, from_json, split, col, regexp_replace, avg, trim
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

from pyspark.sql import SparkSession, Row
spark1 = SparkSession.builder.appName('Ops').getOrCreate()

## Overview
In the following code, an unstructured data file containing information on assessments taken by our company's service will be published to Kafka, transformed and processed in Spark, and then landed in HDFS for future analysis. The steps taken to build this pipeline are annotated for future reference and replication by our Data Science team.

These messages regarding assessments taken through our service are not structured in one schema, and therefore caused some limitation in the level of processing that could be done. Each of these assessment messages in Kafka record data on the type of tests taken, the scores, the user ids, timestamps, as well as nested values. In those nested values, there are additional data on the number of attemps made by a user when taking the test, the questions they got correct, incorrect, or unanswered, and some other data. The questions asked on each assessment are different from test to test, and therefore do not follow the same data schema. For this reason, these fields have been left nested, and can be extracted for further analysis on a test-by-test basis. Further explanation of this approach can be found in the report below. 

In the following code blocks, I will also point to 3 business questions which can be answered by this data. Those will be:

1. How many people took the exam 'Introduction to Apache Spark'?
2. What is the most common and least common exam taken through our service?
3. How many of the assessments administered through our service allow users to have more than one attempt to take it?
4. How many exams were administered with more than one attempt available?
5. How many assessments were completed by users where all of the questions were answered correctly?

### Set Up the Environment to Run Kafka, Spark, and Hadoop on the VM 

**First, I created a docker-compose.yml file that would contain the necessary components to ingest the messages throguh Kafka, query and clean the data ingested through Spark, and land them in HDFS for further analysis and business use by potential Data Scientists. An example of that file is below:**

---
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    expose:
      - "2181"
      - "2888"
      - "32181"
      - "3888"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    expose:
      - "9092"
      - "29092"

  cloudera:
    image: midsw205/cdh-minimal:latest
    expose:
      - "8020" # nn
      - "50070" # nn http
      - "8888" # hue
    #ports:
    #- "8888:8888"

  spark:
    image: midsw205/spark-python:0.0.5
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205
    command: bash
    depends_on:
      - cloudera
    environment:
      HADOOP_NAMENODE: cloudera
    expose:
      - "7000" #jupyter notebook
    ports:
      - "7000:7000" # map instance:service

  mids:
    image: midsw205/base:latest
    stdin_open: true
    tty: true
    volumes:
      - ~/w205:/w205



**Then, I ran that docker image and launched spark with kafka and HDFS. The following terminal commands were used:**

docker-compose up -d

docker-compose logs -f kafka

**Next, I created a topic 'assessments' to store the messages ingested. This allows for the messages for this data pipeline to be distinctly tied to this topic, while we have other topics also storing data not relevant to this project.**

docker-compose exec kafka \
  kafka-topics \
    --create \
    --topic assessments \
    --partitions 1 \
    --replication-factor 1 \
    --if-not-exists \
    --zookeeper zookeeper:32181


**Next, I used kafka to read the messages in to the topic from the .json file, using kafkacat. I produced these messages in the assessments topic to ensure this pipeline was correct and could be replicated for this specific stream of user data:**

docker-compose exec mids bash -c "cat /w205/project-2-med0521/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"




### Read Messages from Kafka to Spark and Clean
The following code blocks are run to load the messages on assessments from kafka to spark to be transformed into a structured parquet file, to then land in HDFS for future use by the Data Science team.

In [8]:
#Load the data in from kafka to spark. Use the 'assessments' topic, and make sure that you are loading all messages (from earliest to latest)
raw_messages = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

In [9]:
#take a look at the raw schema
raw_messages.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
#save the data as strings to transform further
assessments = raw_messages.selectExpr("CAST(value AS STRING)")

In [12]:
#extract messages from the string file using spark json package mapping a lambda function to pull values from the rdd
#look at extracted schema and take note of the fields that are still nested for further investigation
extracted_assessments1 = spark.read.json(assessments.rdd.map(lambda x: x.value))
extracted_assessments1.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: struct (nullable = true)
 |    |-- attempt: long (nullable = true)
 |    |-- counts: struct (nullable = true)
 |    |    |-- all_correct: boolean (nullable = true)
 |    |    |-- correct: long (nullable = true)
 |    |    |-- incomplete: long (nullable = true)
 |    |    |-- incorrect: long (nullable = true)
 |    |    |-- submitted: long (nullable = true)
 |    |    |-- total: long (nullable = true)
 |    |    |-- unanswered: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- questions: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- options: arra

In [13]:
#created a table in spark to be able to manipulate columns and extract those most relevant to our team.
extracted_assessments1.registerTempTable('assessments_table')

In [14]:
#each observation in the dataset refers to an assessment taken through the service by a user. There are 3280 observations.
extracted_assessments1.count()

3280

In [15]:
#EDA
#before proceeding, take a look at the id fields and notice if they correspond to row counts.
#from this, we can see that there are roughly 40 observations which not have a unique id based on the user_exam_id column
df1 = extracted_assessments1.select(countDistinct("user_exam_id"), countDistinct('base_exam_id'), countDistinct("keen_id"))
df1.show()

+----------------------------+----------------------------+-----------------------+
|count(DISTINCT user_exam_id)|count(DISTINCT base_exam_id)|count(DISTINCT keen_id)|
+----------------------------+----------------------------+-----------------------+
|                        3242|                         107|                   3242|
+----------------------------+----------------------------+-----------------------+



In [24]:
#This code is used to check for missing values in the unique id column. We can see that there is no missing data here, so we can 
extracted_assessments1.select([count(when(isnan('keen_id'),True))]).show()


+---------------------------------------------+
|count(CASE WHEN isnan(keen_id) THEN true END)|
+---------------------------------------------+
|                                            0|
+---------------------------------------------+



In [19]:
#next, create a dataframe selecting the useful columns for our business questions from our assessments 
#pyspark dataframe using spark.sql

df = spark.sql("select assessments_table.base_exam_id, assessments_table.certification, assessments_table.exam_name, assessments_table.max_attempts, from assessments_table")

In [20]:
df.show(5)

+--------------------+-------------+--------------------+------------+--------------------+
|        base_exam_id|certification|           exam_name|max_attempts|           sequences|
+--------------------+-------------+--------------------+------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...|         1.0|[1,[false,2,1,1,4...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...|         1.0|[1,[false,1,2,1,4...|
|4beeac16-bb83-4d5...|        false|The Principles of...|         1.0|[1,[false,3,0,1,4...|
|4beeac16-bb83-4d5...|        false|The Principles of...|         1.0|[1,[false,2,2,0,4...|
|6442707e-7488-11e...|        false|Introduction to B...|         1.0|[1,[false,3,0,1,4...|
+--------------------+-------------+--------------------+------------+--------------------+
only showing top 5 rows



In [21]:
#the size of this dataframe is 5 columns x 3280 rows
df.count()

3280

After having extracted the columns that are useful to answer the business questions outlined above, I took a look at the size of the dataframe. There are now 5 columns, and 3280 rows. The number of observations here represents the number of unique assessments taken through our service, each with its own id. 

As noted above, some of the observations in this dataset have the same base_exam_id unique identifier. For that reason, when aggregating data off this spark dataframe, take note of how those repeats are handled.

Next, extract the data fields from the sequences column and create new columns to include in a more detailed dataframe. First, a few exploratory sql statements are run on the nested data fields.

In [22]:
spark.sql("select assessments_table.sequences from assessments_table limit 10").show()

+--------------------+
|           sequences|
+--------------------+
|[1,[false,2,1,1,4...|
|[1,[false,1,2,1,4...|
|[1,[false,3,0,1,4...|
|[1,[false,2,2,0,4...|
|[1,[false,3,0,1,4...|
|[1,[true,5,0,0,5,...|
|[1,[true,1,0,0,1,...|
|[1,[true,5,0,0,5,...|
|[1,[true,4,0,0,4,...|
|[1,[false,0,1,0,1...|
+--------------------+



In [30]:
spark.sql("select assessments_table.sequences.questions from assessments_table").show(2)

+--------------------+
|           questions|
+--------------------+
|[[7a2ed6d3-f492-4...|
|[[95194331-ac43-4...|
+--------------------+
only showing top 2 rows



Given the nested structure of the 'counts' column, which refers to the questions answered on each exam, further unnesting can be done to extract those values into individual columns. That was done using additional spark sql querying and saved to another dataframe called 'full_df' to indicate it is has additional data for the Data Science team.

In [37]:
full_df = spark.sql("select assessments_table.base_exam_id, assessments_table.certification, assessments_table.exam_name, assessments_table.max_attempts, assessments_table.sequences.id, assessments_table.sequences.counts, assessments_table.sequences.questions, assessments_table.sequences.attempt, assessments_table.sequences.counts.all_correct, assessments_table.sequences.counts.correct, assessments_table.sequences.counts.incomplete, assessments_table.sequences.counts.incorrect, assessments_table.sequences.counts.submitted, assessments_table.sequences.counts.total, assessments_table.sequences.counts.unanswered from assessments_table")

In [38]:
full_df.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- id: string (nullable = true)
 |-- counts: struct (nullable = true)
 |    |-- all_correct: boolean (nullable = true)
 |    |-- correct: long (nullable = true)
 |    |-- incomplete: long (nullable = true)
 |    |-- incorrect: long (nullable = true)
 |    |-- submitted: long (nullable = true)
 |    |-- total: long (nullable = true)
 |    |-- unanswered: long (nullable = true)
 |-- questions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- options: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- at: string (nullable = true)
 |    |    |    |    |-- checked: boolean (nullable = true)
 |    |    |    |    |-- correct: boolean (nullable = true)
 |    |    |    |    |-- id:

In [39]:
full_df.show(5)

+--------------------+-------------+--------------------+------------+--------------------+-------------------+--------------------+-------+-----------+-------+----------+---------+---------+-----+----------+
|        base_exam_id|certification|           exam_name|max_attempts|                  id|             counts|           questions|attempt|all_correct|correct|incomplete|incorrect|submitted|total|unanswered|
+--------------------+-------------+--------------------+------------+--------------------+-------------------+--------------------+-------+-----------+-------+----------+---------+---------+-----+----------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...|         1.0|5b28a462-7a3b-42e...|[false,2,1,1,4,4,0]|[[7a2ed6d3-f492-4...|      1|      false|      2|         1|        1|        4|    4|         0|
|37f0a30a-7464-11e...|        false|Normal Forms and ...|         1.0|5b28a462-7a3b-42e...|[false,1,2,1,4,4,0]|[[95194331-ac43-4...|      1|      false|      1|    

Although it is out of the scope of this analysis, the 'questions' column now contains nested information about the users' response to questions asked in the assessments. Noteably, there are many different formats of the types of questions and response data, each with a differenct schema, so this field can be useful if the Data Science team separates this dataframe out by exam type first, and then takes a look at the question schema for each specific exam. This will be saved in this format in later steps and saved in HDFS so that the Data Science team will have access to this nested structure. 

For the next steps of analysis, the columns that are of interest are used and saved in both this expanded, and a focused format for the purposes of the key business questions listed above.

In [324]:
focused_df = full_df.select("base_exam_id", "certification", "exam_name", "max_attempts")

In [325]:
focused_df.show(5)

+--------------------+-------------+--------------------+------------+
|        base_exam_id|certification|           exam_name|max_attempts|
+--------------------+-------------+--------------------+------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...|         1.0|
|37f0a30a-7464-11e...|        false|Normal Forms and ...|         1.0|
|4beeac16-bb83-4d5...|        false|The Principles of...|         1.0|
|4beeac16-bb83-4d5...|        false|The Principles of...|         1.0|
|6442707e-7488-11e...|        false|Introduction to B...|         1.0|
+--------------------+-------------+--------------------+------------+
only showing top 5 rows



### Answering Business Questions - Spark Analysis
In the following code blocks, the three business questions posed above will be answered. This is an example of the types of analysis which the Data Science team can expect to be able to perform off of this user data stream.

**1. How many people took the exam 'Introduction to Apache Spark'?**


In [40]:
spark.sql("select * from assessments_table where exam_name='Introduction to Apache Spark'").count()

9

In [41]:
spark.sql("select * from assessments_table").count()

3280

9 people took the exam 'Introduction to Apache Spark' through our service, out of the 3280 total assessments administered to users in this dataset. 

**2. What is the most common and least common exam taken through our service?**


In [42]:
spark.sql("select exam_name, count(exam_name) from assessments_table group by exam_name order by count(exam_name)").show()

+--------------------+----------------+
|           exam_name|count(exam_name)|
+--------------------+----------------+
|Learning to Visua...|               1|
|Nulls, Three-valu...|               1|
|Native Web Apps f...|               1|
|Operating Red Hat...|               1|
|The Closed World ...|               2|
|Client-Side Data ...|               2|
|Arduino Prototypi...|               2|
|Hibernate and JPA...|               2|
|Understanding the...|               2|
|What's New in Jav...|               2|
|Learning Spring P...|               2|
|Service Based Arc...|               3|
|Using Web Components|               3|
|Building Web Serv...|               3|
| Mastering Web Views|               3|
|Getting Ready for...|               3|
|Using Storytellin...|               4|
|       View Updating|               4|
|Modeling for Soft...|               5|
|An Introduction t...|               5|
+--------------------+----------------+
only showing top 20 rows



In [43]:
spark.sql("select exam_name, count(exam_name) from assessments_table group by exam_name order by count(exam_name) desc").show()

+--------------------+----------------+
|           exam_name|count(exam_name)|
+--------------------+----------------+
|        Learning Git|             394|
|Introduction to P...|             162|
|Introduction to J...|             158|
|Intermediate Pyth...|             158|
|Learning to Progr...|             128|
|Introduction to M...|             119|
|Software Architec...|             109|
|Beginning C# Prog...|              95|
|    Learning Eclipse|              85|
|Learning Apache M...|              80|
|Beginning Program...|              79|
|       Mastering Git|              77|
|Introduction to B...|              75|
|Advanced Machine ...|              67|
|Learning Linux Sy...|              59|
|JavaScript: The G...|              58|
|        Learning SQL|              57|
|Practical Java Pr...|              53|
|    HTML5 The Basics|              52|
|   Python Epiphanies|              51|
+--------------------+----------------+
only showing top 20 rows



From these two select statements, we can see that the least common exams were tied between 'Nulls, Three-valued Logic and Missing Information', 'Learning to Visualize Data with D3.js', and 'Native Web Apps for Android'. The most popular exam is 'Learning Git', which was about twice as popular as the next most common exam taken through our service.

**3. How many of the assessments administered through our service are for certifications?**

In [45]:
spark.sql("select exam_name, certification from assessments_table where certification = 'false'").count()

3148

In [46]:
spark.sql("select exam_name, certification from assessments_table where certification != false").count()

0

There were no assessments listed in this dataset which were administered for a certification. This would be an interesting peice of information for our potential customers, in order to target their marketing strategy and understand their customer base.

**4. How many exams were administered with more than one attempt available?**

In [48]:
spark.sql("select max_attempts, count(*) from assessments_table group by max_attempts").show()

+------------+--------+
|max_attempts|count(1)|
+------------+--------+
|         1.0|    3280|
+------------+--------+



None of the exams administered through our service gave the user more than one attempt. This is helpful for our customers to know as they create their assessments and formats.

**5. How many assessments were completed by users where all of the questions were answered correctly?**

In [52]:
spark.sql("select assessments_table.sequences.counts.all_correct from assessments_table where assessments_table.sequences.counts.all_correct = true").count()

841

Out of all the assessments taken through our service in this data stream, 841 were completed with all questions answered correctly.

### Land Parquet Files in Hadoop for Data Scientists
In the following code blocks, the data containing primarily unnested messages data, captured in Kafka and transformed in Spark, are saved into Parquet files and stored in HDFS. The two dataframes being saved are:

1. 'full_data_frame_final1': The 'full_df'dataframe containing un-nested primary columns which can be used by the Data Science team to manipulate and extract further nested data in the 'questions' column.

2. 'focused_data_frame_final': The 'df' dataframe containing focused columns that were used to answer key business questions in this report. This file omits the nested data columns for purposes when the Data Science team does not need the additional fields for focused reporting to potential clients.

In [56]:
full_df.write.parquet("/tmp/full_data_frame_final1")

In [55]:
df.write.parquet("/tmp/focused_data_frame_final")

**Once the files have been writen, the code below was used in the terminal to check that they had landed correctly in HDFS:**

docker-compose exec cloudera hadoop fs -ls /tmp/

**The following was the result, listing the HDFS files in the terminal:**

Found 4 items

drwxr-xr-x   - root   supergroup          0 2021-06-28 23:01 /tmp/focused_data_frame_final

drwxr-xr-x   - root   supergroup          0 2021-06-28 22:54 /tmp/full_data_frame_final1

drwxrwxrwt   - mapred mapred              0 2018-02-06 18:27 /tmp/hadoop-yarn

drwx-wx-wx   - root   supergroup          0 2021-06-23 20:46 /tmp/hive

The files are now stored and can be used by the Data Science team!