# Project 2: Tracking User Activity

## Download Assessment Dataset
```
curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp`
```

## Spin up the cluster
```
docker-compose up -d
```

## Check out Hadoop
```
docker-compose exec cloudera hadoop fs -ls /tmp/
```
Look the the tmp directory and check that what we want to write is not there.

## Create topic `assessments`
```
docker-compose exec kafka \
  kafka-topics \
    --create \
    --topic assessments \
    --partitions 1 \
    --replication-factor 1 \
    --if-not-exists \
    --zookeeper zookeeper:32181
```

## Use kafkacat to produce messages to the `assessments` topic
```
docker-compose exec mids \
  bash -c "cat /w205/project-2-rochelleli/assessment-attempts-20180128-121051-nested.json \
    | jq '.[]' -c \
    | kafkacat -P -b kafka:29092 -t assessments"
```

## Spin up Jupyter notebook instance with pyspark kernel using the `Spark` container
```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8890 --ip 0.0.0.0 --allow-root --notebook-dir=/w205/' pyspark
```

## Import Packages and Start Spark Session

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# start Spark Session
from pyspark.sql import SparkSession
app_name = "project_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

## Read Messages from Kakfa

In [2]:
raw_assessments = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe","assessments") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()

## Cache this to cut back on warnings later

In [3]:
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

## See the Schema

In [4]:
raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



`'Key'` and `'Value'` columns have binary entries, will have to convert to strings.

## Show Raw Messages

In [5]:
raw_assessments.show()

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 6B 65 65 6...|assessments|        0|     0|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     1|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     2|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     3|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     4|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     5|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     6|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     7|1969-12-31 23:59:...|   

`'Key'` column has no entries (null).

## Cast Values as Strings and Show

In [6]:
assessments = raw_assessments.selectExpr("CAST(value AS STRING)")
assessments.printSchema()
assessments.show()

root
 |-- value: string (nullable = true)

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



Still not really readable. Next steps: extract the data and promote data columns to be dataframe columns.

## Extract Data

In [7]:
import json
from pyspark.sql import Row

# Create row objects from the json fields
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()
extracted_assessments.show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

### See the Schema

In [8]:
extracted_assessments.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



From the above schema, we can see that the json is nested. We will have to work on flattening the `sequences` column.

### Use SparkSQL

#### Create a Spark "TempTable"

In [9]:
extracted_assessments.registerTempTable('assessments')

#### Create DataFrames from Queries

In [10]:
spark.sql("select base_exam_id, exam_name, sequences, sequences.questions, started_at from assessments limit 15").show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|        base_exam_id|           exam_name|           sequences|           questions|          started_at|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|Normal Forms and ...|Map(questions -> ...|[Map(user_incompl...|2018-01-23T14:23:...|
|37f0a30a-7464-11e...|Normal Forms and ...|Map(questions -> ...|[Map(user_incompl...|2018-01-23T14:21:...|
|4beeac16-bb83-4d5...|The Principles of...|Map(questions -> ...|[Map(user_incompl...|2018-01-23T20:22:...|
|4beeac16-bb83-4d5...|The Principles of...|Map(questions -> ...|[Map(user_incompl...|2018-01-23T20:21:...|
|6442707e-7488-11e...|Introduction to B...|Map(questions -> ...|[Map(user_incompl...|2018-01-23T19:48:...|
|8b4488de-43a5-4ff...|        Learning Git|Map(questions -> ...|[Map(user_incompl...|2018-01-23T20:51:...|
|e1f07fac-5566-4fd...|Git Fundamental

In [11]:
# look into the sequences columns
spark.sql("select base_exam_id, exam_name, max_attempts, sequences, sequences.questions, sequences.counts from assessments limit 15").show()

+--------------------+--------------------+------------+--------------------+--------------------+------+
|        base_exam_id|           exam_name|max_attempts|           sequences|           questions|counts|
+--------------------+--------------------+------------+--------------------+--------------------+------+
|37f0a30a-7464-11e...|Normal Forms and ...|         1.0|Map(questions -> ...|[Map(user_incompl...|  null|
|37f0a30a-7464-11e...|Normal Forms and ...|         1.0|Map(questions -> ...|[Map(user_incompl...|  null|
|4beeac16-bb83-4d5...|The Principles of...|         1.0|Map(questions -> ...|[Map(user_incompl...|  null|
|4beeac16-bb83-4d5...|The Principles of...|         1.0|Map(questions -> ...|[Map(user_incompl...|  null|
|6442707e-7488-11e...|Introduction to B...|         1.0|Map(questions -> ...|[Map(user_incompl...|  null|
|8b4488de-43a5-4ff...|        Learning Git|         1.0|Map(questions -> ...|[Map(user_incompl...|  null|
|e1f07fac-5566-4fd...|Git Fundamentals ...|   

#### Construct Schema Manually by Referencing the JSON file

In [12]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [13]:
new_Schema = StructType([StructField('base_exam_id', StringType(), True),
                     StructField('certification', StringType(), True),
                     StructField('exam_name', StringType(), True),
                     StructField('keen_created_at', StringType(), True),
                     StructField('keen_id', StringType(), True),
                     StructField('keen_timestamp', StringType(), True),
                     StructField('max_attempts', StringType(), True),
                     StructField('sequences', StructType([
                         StructField('attempt', IntegerType(), True),
                         StructField('counts', StructType([
                             StructField('incomplete', IntegerType(), True),
                             StructField('submitted', IntegerType(), True),
                             StructField('incorrect', IntegerType(), True),
                             StructField('all_correct', StringType(), True),
                             StructField('correct', IntegerType(), True),
                             StructField('total', IntegerType(), True),
                             StructField('unanswered', IntegerType(), True)
                         ]))])),
                    StructField('started_at', StringType(), True),
                    StructField('user_exam_id', StringType(), True)
                        ])

In [14]:
new_extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF(schema=new_Schema)
new_extracted_assessments.show()
new_extracted_assessments.printSchema()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|[1,[1,4,1,false,2...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|[1,[2,4,1,false,1...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

#### Create a Spark "TempTable"

In [15]:
new_extracted_assessments.registerTempTable('assessments')

#### Expand Sequences Column

In [16]:
spark.sql("select exam_name, \
            sequences.attempt, \
            sequences.counts.incomplete, \
            sequences.counts.submitted, \
            sequences.counts.incorrect, \
            sequences.counts.all_correct, \
            sequences.counts.correct, \
            sequences.counts.total, \
            sequences.counts.unanswered \
            from assessments limit 15").show()

+--------------------+-------+----------+---------+---------+-----------+-------+-----+----------+
|           exam_name|attempt|incomplete|submitted|incorrect|all_correct|correct|total|unanswered|
+--------------------+-------+----------+---------+---------+-----------+-------+-----+----------+
|Normal Forms and ...|      1|         1|        4|        1|      false|      2|    4|         0|
|Normal Forms and ...|      1|         2|        4|        1|      false|      1|    4|         0|
|The Principles of...|      1|         0|        4|        1|      false|      3|    4|         0|
|The Principles of...|      1|         2|        4|        0|      false|      2|    4|         0|
|Introduction to B...|      1|         0|        4|        1|      false|      3|    4|         0|
|        Learning Git|      1|         0|        5|        0|       true|      5|    5|         0|
|Git Fundamentals ...|      1|         0|        1|        0|       true|      1|    1|         0|
|Introduct

##### Question 1: How many assessments are in this dataset?

In [17]:
spark.sql("select count(distinct exam_name) from assessments").show()

+-------------------------+
|count(DISTINCT exam_name)|
+-------------------------+
|                      103|
+-------------------------+



In [18]:
spark.sql("select count(distinct base_exam_id) from assessments").show()

+----------------------------+
|count(DISTINCT base_exam_id)|
+----------------------------+
|                         107|
+----------------------------+



Number of unique `'base_exam_id'` values is greater than the number of unique `'exam_names'` values. Check number of unique pairs of `'base_exam_id'` and `'exam_names'`.

In [19]:
spark.sql("select count(distinct exam_name, base_exam_id) from assessments").show()

+---------------------------------------+
|count(DISTINCT exam_name, base_exam_id)|
+---------------------------------------+
|                                    107|
+---------------------------------------+



The number of unique pairs of `'base_exam_id'` and `'exam_names'` is the same as the number of unique `'base_exam_id'` values. There are **107** assessments in this dataset.

##### Question 2: What is the least common exam taken? And the most common?

In [20]:
spark.sql("select exam_name, count(exam_name) as total_count from assessments group by exam_name order by total_count desc").show()

+--------------------+-----------+
|           exam_name|total_count|
+--------------------+-----------+
|        Learning Git|        394|
|Introduction to P...|        162|
|Introduction to J...|        158|
|Intermediate Pyth...|        158|
|Learning to Progr...|        128|
|Introduction to M...|        119|
|Software Architec...|        109|
|Beginning C# Prog...|         95|
|    Learning Eclipse|         85|
|Learning Apache M...|         80|
|Beginning Program...|         79|
|       Mastering Git|         77|
|Introduction to B...|         75|
|Advanced Machine ...|         67|
|Learning Linux Sy...|         59|
|JavaScript: The G...|         58|
|        Learning SQL|         57|
|Practical Java Pr...|         53|
|    HTML5 The Basics|         52|
|   Python Epiphanies|         51|
+--------------------+-----------+
only showing top 20 rows



In [21]:
spark.sql("select exam_name, count(exam_name) as total_count from assessments group by exam_name order by total_count ASC").show()

+--------------------+-----------+
|           exam_name|total_count|
+--------------------+-----------+
|Nulls, Three-valu...|          1|
|Native Web Apps f...|          1|
|Learning to Visua...|          1|
|Operating Red Hat...|          1|
|The Closed World ...|          2|
|Client-Side Data ...|          2|
|Arduino Prototypi...|          2|
|Understanding the...|          2|
|Hibernate and JPA...|          2|
|Learning Spring P...|          2|
|What's New in Jav...|          2|
|Building Web Serv...|          3|
|Using Web Components|          3|
|Service Based Arc...|          3|
| Mastering Web Views|          3|
|Getting Ready for...|          3|
|Using Storytellin...|          4|
|       View Updating|          4|
|Modeling for Soft...|          5|
|Example Exam For ...|          5|
+--------------------+-----------+
only showing top 20 rows



The most common exam is `'Learning Git'`. The least common exams are `'Nulls, Three-valu...'`, `'Learning to Visua...'`, and `'Native Web Apps f...'`.

##### Question 3: How times was the Learning Git exam taken?

From the query in Question 2, the `'Learning Git'` exam was taken **394** times.

## Write to HDFS

### Extracted data into a parquet file

In [None]:
new_extracted_assessments.write.parquet("/tmp/new_extracted_assessments")

### Fully flattened table based on new_Schema

In [23]:
full_flatten_assessment = spark.sql("select base_exam_id, certification, exam_name, keen_created_at, \
                 keen_id, keen_timestamp, max_attempts, sequences.attempt, sequences.counts.incomplete, \
                 sequences.counts.submitted, sequences.counts.incorrect, sequences.counts.all_correct, \
                 sequences.counts.correct, sequences.counts.total, sequences.counts.unanswered, \
                 started_at, user_exam_id from assessments limit 15")

In [24]:
full_flatten_assessment.write.parquet("/tmp/full_flatten_assessment")

### Check if landed in hadoop

```
(base) jupyter@python-20210105-192601:~/w205/project-2-rochelleli$ docker-compose exec cloudera hadoop fs -ls /tmp/
Found 4 items
drwxr-xr-x   - root   supergroup          0 2021-03-07 08:58 /tmp/full_flatten_assessment
drwxrwxrwt   - mapred mapred              0 2018-02-06 18:27 /tmp/hadoop-yarn
drwx-wx-wx   - root   supergroup          0 2021-03-07 01:08 /tmp/hive
drwxr-xr-x   - root   supergroup          0 2021-03-07 08:47 /tmp/new_extracted_assessments
```

In [28]:
read_new_extracted_assessments = sqlContext.read.parquet("/tmp/new_extracted_assessments")
read_new_extracted_assessments.show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|[1,[1,4,1,false,2...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|[1,[2,4,1,false,1...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

In [29]:
read_full_flatten_assessment = sqlContext.read.parquet("/tmp/full_flatten_assessment")
read_full_flatten_assessment.show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+-------+----------+---------+---------+-----------+-------+-----+----------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|attempt|incomplete|submitted|incorrect|all_correct|correct|total|unanswered|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+-------+----------+---------+---------+-----------+-------+-----+----------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|      1|         1|        4|        1|      false|      2|    4|         0|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30