# **Tracking User Activity on EdTech Assessments**
### Sophia Skowronski | Project 2
### Summer 2020 | MIDS w205 | Fundamentals of Data Engineering

## Summary

An EdTech firm created a tool that delivers data from assessments on online learning, and it has been prepared here for customers to run additional queries for publishing their results. The annotations below provide a step-by-step overview of how the data was written to, streamed, and consumed via Kafka, transformed via Spark, and stored via Hadoop Distributed File System (HDFS).

## Annotations

#### Open the directory
`cd w205/project-2-sophiaski/`

#### Copy the docker-compose file that has Spark stack with Kafka and HDFS
`cp ~/w205/course-content/08-Querying-Data/docker-compose.yml .`
- I added open ports to `docker-compose.yml` to allow interaction with spark service, using a custom port 8850 for Jupyter Notebook in order to not conflict with the default on the Google Cloud Platform instance at 8888.

#### Download the assessments dataset
`curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp`

#### Check that the docker compose file and json file are in the directory 
`ls`

#### Spin up the cluster in detached mode, running the containers in the background
`docker-compose up -d`

#### Create a topic named "assess" with a single partition and only one replica
`docker-compose exec kafka kafka-topics --create --topic assess --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181`
- There is only one partition, so all messages will be committed in Kafka and consumed by Spark in the same order
- There is one leader broker with one replica, so any committed messages will not be lost if the leader broker fails
- When creating the topic, the action will only execute if the topic does not already exist
- This new topic will be connected to zookeeper and its client port

#### To know which broker is doing what, using the "describe topics" command of the topic just created
`docker-compose exec kafka kafka-topics --describe --topic assess --zookeeper zookeeper:32181`
- The output confirms the above statement, and the `isr` value shows that the one replica of our only partition is "in-sync" with the leader.

#### Check out the individual messages in the json file
`docker-compose exec mids bash -c "cat /w205/project-2-sophiaski/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c"`

#### Check out the message count in the json file: 3280
`docker-compose exec mids bash -c "cat /w205/project-2-sophiaski/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | wc -l"`

#### From the MIDS container, use "kafkacat" in producer mode to read the 3280 messages from standard input, specifying the Kafka broker and "assess" topic.
`docker-compose exec mids bash -c "cat /w205/project-2-sophiaski/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assess && echo 'Produced 3280 messages.'"`


#### From the Kafka container, use "kafka-console-consumer" tool to print out 10 messages from the specified Kafka broker and "assess"  topic, dump them into standard output.
`docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic assess --from-beginning --max-messages 10`

#### Spin up a pyspark process in Jupyter Notebook. Open up another tab, and copy and paste URL shared in output, replacing `0.0.0.0` with  external IP from Google Cloud instance.
`docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8850 --ip 0.0.0.0 --allow-root --notebook-dir=/w205' pyspark`

## Transforming the messages with Spark

In [1]:
# Import the json module and pyspark Row method for reading in and loading in the json file.
import json
from pyspark.sql import Row

In [2]:
# First create a Kafka source in Spark for batch consumption, which subscribes to the "assess" topic, at the earliest and latest offsets.
raw_assess = spark.read.format("kafka")\
    .option("kafka.bootstrap.servers", "kafka:29092")\
    .option("subscribe","assess")\
    .option("startingOffsets", "earliest")\
    .option("endingOffsets", "latest")\
    .load("json")

# Cache raw_assess to cut back on warnings later
raw_assess.cache()

# See the schema, messages, and total count
raw_assess.printSchema()
raw_assess.show(1)
print("Number of rows:",raw_assess.count())

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 6B 65 65 6...|assess|        0|     0|1969-12-31 23:59:...|            0|
+----+--------------------+------+---------+------+--------------------+-------------+
only showing top 1 row

Number of rows: 3280


#### Now, let's extract the data, promote data columns into real dataframe columns.

In [3]:
# Unrolling json and save to a dataframe. A DataFrame can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string.
assessDF = raw_assess.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

# See the schema
assessDF.printSchema()

# You can see in the schema that the column "sequences" contains nested objects.

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



In [4]:
# What does the first message look like?
assessDF.take(1)

[Row(base_exam_id='37f0a30a-7464-11e6-aa92-a8667f27e5dc', certification='false', exam_name='Normal Forms and All That Jazz Master Class', keen_created_at='1516717442.735266', keen_id='5a6745820eb8ab00016be1f1', keen_timestamp='1516717442.735266', max_attempts='1.0', sequences={'questions': [{'options': None, 'user_correct': False, 'user_incomplete': True, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': False, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}], 'id': None, 'attempt': None, 'counts': None}, started_at='2018-01-23T14:23:19.082Z', user_exam_id='6d4089e4-bde5-4a22-b65f-18bce9ab79c8')]

In [5]:
# For comparison, what does the original first message look like?
with open('assessment-attempts-20180128-121051-nested.json', 'r') as f:
    df = json.load(f)
print(json.dumps(df[0])) # Turn json object into encoded string for printed output

{"keen_timestamp": "1516717442.735266", "max_attempts": "1.0", "started_at": "2018-01-23T14:23:19.082Z", "base_exam_id": "37f0a30a-7464-11e6-aa92-a8667f27e5dc", "user_exam_id": "6d4089e4-bde5-4a22-b65f-18bce9ab79c8", "sequences": {"questions": [{"user_incomplete": true, "user_correct": false, "options": [{"checked": true, "at": "2018-01-23T14:23:24.670Z", "id": "49c574b4-5c82-4ffd-9bd1-c3358faf850d", "submitted": 1, "correct": true}, {"checked": true, "at": "2018-01-23T14:23:25.914Z", "id": "f2528210-35c3-4320-acf3-9056567ea19f", "submitted": 1, "correct": true}, {"checked": false, "correct": true, "id": "d1bf026f-554f-4543-bdd2-54dcf105b826"}], "user_submitted": true, "id": "7a2ed6d3-f492-49b3-b8aa-d080a8aad986", "user_result": "missed_some"}, {"user_incomplete": false, "user_correct": false, "options": [{"checked": true, "at": "2018-01-23T14:23:30.116Z", "id": "a35d0e80-8c49-415d-b8cb-c21a02627e2b", "submitted": 1}, {"checked": false, "correct": true, "id": "bccd6e2e-2cef-4c72-8bfa-3

#### Note on "sequences" and the nested JSON objects within the encoded messages

Using: `raw_assess.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()`

This map function passes one row at a time to a lambda function which then applies json.loads on the row of the dataframe, which is JSON text. Json.loads converts the text into key-value pairs in a dictionary. The RDD map transformation converts the data structure to a spark DataFrame.

As a result of this transformation, the key-value pairs embedded in the nested `sequences` dictionary is not properly decoded. The results show null values for everything in the `sequences` column except for subset of the questions that have Boolean values: `user_complete`, `user_incomplete`, `user_submitted`, `user_result`.

Specifically, the value for `questions` within `sequences` is an array, and so is the value for `options` within `questions`. In sum, the map datatype is incompatible with nested arrays, and to extract this information properly, there would need to be an adjustment to how JSON loads is called into the map function.

## Querying the Data with SparkSQL

The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame. Here are some example questions that would be useful for data scientists to know about the streamed data.

In [6]:
# Register the "assessDF" DataFrame as a SQL temporary view
assessDF.registerTempTable('assess')

#### 1. How many assessments are in the dataset?

In total, there were 3280 assessments recorded in the dataset, counting by individual messages streamed via Kaffa.

In [7]:
# Number of assessments
spark.sql("select count(*) as total_assessments from assess").show()

+-----------------+
|total_assessments|
+-----------------+
|             3280|
+-----------------+



#### 2. How many users took only one assessment?

There are 3222 instances of `user_exam_id` that took an assessment once.

In [8]:
# Number of people that took only one assessment
spark.sql("select count(distinct user_exam_id) as only_one_assessment from (select user_exam_id, count(*) as counts from assess group by user_exam_id) where counts = 1").show()

+-------------------+
|only_one_assessment|
+-------------------+
|               3222|
+-------------------+



#### 3. Were there individuals that retook an exam?

There are 20 instances of `user_exam_id` that repeated an exam, totalling 58 assessments.

In [9]:
# Display of 20 users that retook the same exam
spark.sql("select user_exam_id, exam_name, counts from (select user_exam_id, exam_name, count(*) as counts from assess group by user_exam_id, exam_name) where counts > 1 order by exam_name").show(assessDF.count(),False)

+------------------------------------+-------------------------------------------------------+------+
|user_exam_id                        |exam_name                                              |counts|
+------------------------------------+-------------------------------------------------------+------+
|d4ab4aeb-1368-4866-bc5e-7eee69fd1608|An Introduction to d3.js: From Scattered to Scatterplot|3     |
|c320d47f-60d4-49a5-9d6c-67e947979bf0|Beginning C# Programming                               |3     |
|00745aef-f3af-4127-855c-afc3b6ef4011|Beginning C# Programming                               |3     |
|37cf5b0c-4807-4214-8426-fb1731b57700|Beginning C# Programming                               |3     |
|a244c11a-d890-4e3e-893d-d17c5ce2ad05|Beginning C# Programming                               |3     |
|6132da16-2c0c-436c-9c48-43b8bafe0978|Beginning C# Programming                               |3     |
|66d91177-c436-4ee1-b0b0-daa960e1b2d0|Beginning C# Programming                    

#### 4. What's the name of your Kafka topic? How did you come up with that name?

"Assess" was chosen for its readability and ease of understanding. The name of the Kafka topic is called "assess" because it is shorthand for "assessments." It is semantically linked to the content of the data, which are individual assessments. It also has a simple structure: all lowercased without special characters. 

#### 5. How many people took "Learning Git"?

390 individuals took the "Learning Git" course. There were 394 total assessments because 2 individuals took the assessment 3 times, which added an extra 4 counts.

In [10]:
# Count distinct user_exam_id for "Learning Git"
spark.sql("select count(distinct user_exam_id) as distinct_learning_git from (select exam_name, user_exam_id, count(*) as counts from assess where exam_name='Learning Git' group by exam_name, user_exam_id)").show(assessDF.count(),False)

+---------------------+
|distinct_learning_git|
+---------------------+
|390                  |
+---------------------+



#### 6. What is the least common course taken? And the most common?

The least common courses were those that were only taken once: which were `Native Web Apps for Android`, `Learning to Visualize Data with D3.js`, `Nulls, Three-valued Logic and Missing Information`, `Operating Red Hat Enterprise Linux Servers`.

The most common course was `Learning Git`.

In [11]:
# Least common courses
spark.sql("select exam_name, count(*) as counts from assess group by exam_name order by counts asc limit 4").show(assessDF.count(),False)

+-------------------------------------------------+------+
|exam_name                                        |counts|
+-------------------------------------------------+------+
|Native Web Apps for Android                      |1     |
|Learning to Visualize Data with D3.js            |1     |
|Nulls, Three-valued Logic and Missing Information|1     |
|Operating Red Hat Enterprise Linux Servers       |1     |
+-------------------------------------------------+------+



In [12]:
# Most common course
spark.sql("select exam_name, count(*) as counts from assess group by exam_name order by counts desc limit 1").show(assessDF.count(),False)

+------------+------+
|exam_name   |counts|
+------------+------+
|Learning Git|394   |
+------------+------+



#### 7. Is there any test data in this dataset?

Yes, there are 5 instances where `base_exam_id = example-id` and  `exam_name = Example Exam For Development and Testing oh yeahsdf`

In [13]:
spark.sql("select base_exam_id, exam_name, count(*) as counts from assess where base_exam_id='example-id' group by base_exam_id, exam_name").show(assessDF.count(),False)

+------------+---------------------------------------------------+------+
|base_exam_id|exam_name                                          |counts|
+------------+---------------------------------------------------+------+
|example-id  |Example Exam For Development and Testing oh yeahsdf|5     |
+------------+---------------------------------------------------+------+



#### 8. What is the most popular hour to start exams?

The most popular time to start an assessment was between 2:00-2:59PM.

In [14]:
from pyspark.sql.functions import hour
assessDF.withColumn("hour", hour("started_at")).registerTempTable('assessTIME')
spark.sql("select hour, count(*) as assess_count from assessTIME group by hour order by assess_count desc limit 1").show(assessDF.count(),False)

+----+------------+
|hour|assess_count|
+----+------------+
|14  |224         |
+----+------------+



## Store assessments in HDFS

We will now save the contents of the SparkDataFrame as a Parquet file, preserving the schema.

In [7]:
# Cast as strings
assess_strings = raw_assess.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# See the schema, messages, and total count
assess_strings.printSchema()

# Write this to hdfs
assess_strings.write.parquet("/tmp/assessments")

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



#### Check out results (in another terminal window)
`docker-compose exec cloudera hadoop fs -ls /tmp/
docker-compose exec cloudera hadoop fs -ls -h /tmp/assessments/`

#### Spin down the cluster
`docker-compose down`

#### Write terminal history to file
`history > sophiaski-history.txt`