# MIDS W205 Project 2 Report - Ruby Han (Spring 2021)

## Summary
In this project, you work at an ed tech firm. You've created a service that delivers assessments, and now lots of different customers (e.g., Pearson) want to publish their assessments on it. You need to get ready for data scientists who work for these customers to run queries on the data.

### Business Questions and Answers:

**1) How many assessments are in the dataset?**

3280

**2) What's the name of your Kafka topic? How did you come up with that name?**

`assessments` - We've created a service to deliver assessments hence naming the topic `assessments` make intuitive sense.

**3) How many people took Learning Git?**

394

## Data Pipeline

This section provides a high-level overview on tools/platforms used to extract, query and process data.

- **Docker:** Provides contained environment to work in
- **Kafka:** Message broker/Ingests data in a topic (**assessments** in this case)
- **Spark:** Processes and reads data from Kafka in JSON format and extracts it
- **HDFS (Hadoop Distributed File System):** Stores data in a format that is retrievable by other data scientists to query the assessment data and answer business questions

The below section will dive deep into multiple sets of commands used to process data and spinning up the pipeline.

## Linux Command Lines
### Outlines housekeeping and set-up of docker containers

### $\underline{\text{Perform below commands only once:}}$
- Copy in yml file
```
cp ~/w205/course-content/08-Querying-Data/docker-compose.yml .
```
- Download the assessments file
```
curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp
```

### $\underline{\text{Docker:}}$
- Change into project 2 directory
```
cd ~w205/project-2-ruby-han/
```
- Bring up cluster and check
```
docker-compose up -d
```
- Check stray containers
```
docker-compose ps
```
- Remove stray containers
```
docker rm -f <NAMES>
```
- Do not remove below
```
0aacfde6644a   gcr.io/inverting-proxy/agent       "/bin/sh -c '/opt/bi…"   2 hours ago     Up 2 hours                                                                                          proxy-agent
```
- Check containers
```
docker ps -a
```
- Should output below if correct
```
CONTAINER ID   IMAGE                              COMMAND                  CREATED         STATUS         PORTS                                                                                NAMES
a7c92bf83ade   confluentinc/cp-kafka:latest       "/etc/confluent/dock…"   5 minutes ago   Up 5 minutes   9092/tcp, 29092/tcp                                                                  project-2-ruby-han_kafka_1
2ead54715098   midsw205/spark-python:0.0.5        "docker-entrypoint.s…"   5 minutes ago   Up 5 minutes   0.0.0.0:8888->8888/tcp                                                               project-2-ruby-han_spark_1
24aa343687f5   midsw205/cdh-minimal:latest        "cdh_startup_script.…"   5 minutes ago   Up 5 minutes   8020/tcp, 8088/tcp, 8888/tcp, 9090/tcp, 11000/tcp, 11443/tcp, 19888/tcp, 50070/tcp   project-2-ruby-han_cloudera_1
20c60035ecd8   midsw205/base:latest               "/bin/bash"              5 minutes ago   Up 5 minutes   8888/tcp                                                                             project-2-ruby-han_mids_1
b5c8374014f9   confluentinc/cp-zookeeper:latest   "/etc/confluent/dock…"   5 minutes ago   Up 5 minutes   2181/tcp, 2888/tcp, 3888/tcp, 32181/tcp                                              project-2-ruby-han_zookeeper_1
0aacfde6644a   gcr.io/inverting-proxy/agent       "/bin/sh -c '/opt/bi…"   2 hours ago     Up 2 hours                                                                                          proxy-agent
```
- To spin down cluster
```
docker-compose down
```
- Verify clusters are down
```
docker-compose ps
docker ps -a
```

### $\underline{\text{Connecting Jupyter Notebook to Pyspark:}}$
- Check docker-compose.yml file to ensure that Spark container has an expose section and ports section with below:
```
expose:
  - "8888"
ports:
  - "8888:8888"
```
- Execute a bash shell into Spark container
```
docker-compose exec spark bash
```
- Create symbolic link from Spark directory to /w205
```
ln -s /w205 w205
```
- Exit container
```
exit
```
- Use Jupyter Notebook for Pyspark kernel and set IP address to 0.0.0.0
```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
```
- Copy URL and change IP address to external IP address from Google Cloud VM to access Jupyter Notebook using Google Chrome brower in incognito mode
```
http://35.230.62.182:8888/?token=696f835d5d3612d431aeaaf8cf7af8ffebb15402cc49b7a9
```
- Check Hadoop is running correctly
```
docker-compose exec cloudera hadoop fs -ls /tmp/
```
- Should output below if it Hadoop is running correctly
```
Found 2 items
drwxrwxrwt   - mapred mapred              0 2018-02-06 18:27 /tmp/hadoop-yarn
drwx-wx-wx   - root   supergroup          0 2021-03-07 04:38 /tmp/hive
```

## Kafka
### Commands for Kafka to ingest data

- Create a topic named "assessments"
```
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```
- Verify topic
```
docker-compose exec kafka kafka-topics --describe --topic assessments --zookeeper zookeeper:32181
```
- Expected output
```
Topic: assessments      PartitionCount: 1       ReplicationFactor: 1    Configs: 
        Topic: assessments      Partition: 0    Leader: 1       Replicas: 1     Isr: 1
```
- Kafkacat to produce test messages to "assessments" topic
```
docker-compose exec mids bash -c "cat /w205/project-2-ruby-han/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"
```
- Kafkacat to publish test messages for verification
```
docker-compose exec mids bash -c "cat /w205/project-2-ruby-han/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments && echo 'Produced 100 messages.'"
```
- Kafkacat to read all messages (hard to read)
```
docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t assessments -o beginning -e"
```
or
```
docker-compose exec mids bash -c "cat /w205/project-2-ruby-han/assessment-attempts-20180128-121051-nested.json
```
- Kafkacat to read individual messages (hard to read)
```
docker-compose exec mids bash -c "cat /w205/project-2-ruby-han/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c"
```
- Kafkacat to print messages in a readable format
```
docker-compose exec mids bash -c "cat /w205/project-2-ruby-han/assessment-attempts-20180128-121051-nested.json | jq '.'"
```
- Kafkacat to consume messages and print word count
```
docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t assessments -o beginning -e" | wc -l
```

## Pyspark (Using Jupyter Notebook)
### Process data from Kafka

### Data Field Descriptions and Assumptions

This section describes data fields and assumptions made based on given information.

- **base_exam_id:** 
    - String type
    - Exam ID
    
- **certification:**
    - String type
    - Assumed as certification achieved or attempted
    
- **exam_name:**
    - String type
    - Name of exam
    - There is a mismatch between base_exam_id and exam_name
    - Misinterpretation of these fields could occur or there could be different versions of exams not recorded in the exam name
    - Regardless, remedying the mismatch is outside the scope of this project
    - This should be kept in mind when interpreting the results and analyses
    
- **keen_created_at:**
    - Timestamp type
    - Timestamp for when 'keen' is created
    - Unsure what 'keen' refers to

- **keen_id:**
    - String type
    - Assumed to be unique identifier of assessment-takers

- **keen_timestamp:**
    - Timestamp type
    - Second instance of timestamp
    - Unsure what 'keen' refers to

- **max_attempts:**
    - Integer type
    - Assumed to be number of maximum attempts allowed

- **sequences:**
    - Nested structure
    - Drilled down levels:    
        - **attempt:** number of attempts
        - **counts:** another level of nested structure
            - **all_correct:** all questions answered correctly or not
            - **correct:** number of correct answers
            - **incomplete:** number of incomplete questions
            - **incorrect:** number of incorrect answers
            - **submitted:** number of submitted answers
            - **total:** total number of attempted questions
            - **unanswered:** number of unanswered questions (unsure of the difference between this field vs incomplete)
        - **id:** ID (unsure what this unique identifier refers to)
        - **questions:** another level of nested structure
            - **id:** question id
            - **options:** another level of nested structure
                - **at:** start timestamp of attempt
                - **checked:** tracking checked question
                - **correct:** tracking correctly answered question
                - **id:** ID (unsure what this refers to)
                - **submitted:** tracking submitted question     
            - **user_correct:** whether user answered questions correctly or not
            - **user_incomplete:** whether user complete questions or not
            - **user_result:** user result
            - **user_submitted:** user submission

- **started_at:**
    - Timestamp type
    - Start timestamp of exam

- **user_exam_id:**
    - String type
    - User ID for exam attempt

In [1]:
# run spark
spark

In [2]:
sc

In [3]:
# process data from kafka using pyspark
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()

In [4]:
# cache to reduce warnings
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [5]:
# print schema
raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# display messages
raw_assessments.show()

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 6B 65 65 6...|assessments|        0|     0|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     1|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     2|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     3|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     4|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     5|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     6|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     7|1969-12-31 23:59:...|   

In [7]:
# cast string type
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [8]:
# take a look
assessments

DataFrame[value: string]

In [9]:
# print schema
assessments.printSchema()

root
 |-- value: string (nullable = true)



In [10]:
# display messages
assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



In [11]:
# truncate=False to show full column
assessments.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [12]:
# import json to unroll data
import json
from pyspark.sql import Row

In [13]:
# pull out first message
first_assessment = json.loads(assessments.select('value').take(1)[0].value)
first_assessment

{'base_exam_id': '37f0a30a-7464-11e6-aa92-a8667f27e5dc',
 'certification': 'false',
 'exam_name': 'Normal Forms and All That Jazz Master Class',
 'keen_created_at': '1516717442.735266',
 'keen_id': '5a6745820eb8ab00016be1f1',
 'keen_timestamp': '1516717442.735266',
 'max_attempts': '1.0',
 'sequences': {'attempt': 1,
  'counts': {'all_correct': False,
   'correct': 2,
   'incomplete': 1,
   'incorrect': 1,
   'submitted': 4,
   'total': 4,
   'unanswered': 0},
  'id': '5b28a462-7a3b-42e0-b508-09f3906d1703',
  'questions': [{'id': '7a2ed6d3-f492-49b3-b8aa-d080a8aad986',
    'options': [{'at': '2018-01-23T14:23:24.670Z',
      'checked': True,
      'correct': True,
      'id': '49c574b4-5c82-4ffd-9bd1-c3358faf850d',
      'submitted': 1},
     {'at': '2018-01-23T14:23:25.914Z',
      'checked': True,
      'correct': True,
      'id': 'f2528210-35c3-4320-acf3-9056567ea19f',
      'submitted': 1},
     {'checked': False,
      'correct': True,
      'id': 'd1bf026f-554f-4543-bdd2-54dcf10

In [14]:
print(first_assessment['keen_id'])

5a6745820eb8ab00016be1f1


In [15]:
# write assessments in current form to HDFS
assessments.write.mode("overwrite").parquet("/tmp/assessments")

- Go to terminal to see results
```
docker-compose exec cloudera hadoop fs -ls /tmp/
```
- Expected output
```
Found 3 items
drwxr-xr-x   - root   supergroup          0 2021-03-07 06:30 /tmp/assessments
drwxrwxrwt   - mapred mapred              0 2018-02-06 18:27 /tmp/hadoop-yarn
drwx-wx-wx   - root   supergroup          0 2021-03-07 04:38 /tmp/hive
```
- Check out assessments
```
docker-compose exec cloudera hadoop fs -ls /tmp/assessments
```
- Expected output
```
Found 2 items
-rw-r--r--   1 root supergroup          0 2021-03-07 06:30 /tmp/assessments/_SUCCESS
-rw-r--r--   1 root supergroup    5019331 2021-03-07 06:30 /tmp/assessments/part-00000-e91a832d-1b09-4d40-a84c-9187b30a9c4d-c000.snappy.parquet
```

## Issues encountered when extracting the data

In [16]:
# so what did we actually write?
assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



In [17]:
# the above isn't a very helpful structure
# use RDD to load json, unroll and save these to extracted assessments
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

In [18]:
# unrolled version
extracted_assessments.show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

- `sequences` column is still nested because it has 'Map' in its value

In [19]:
# print schema
extracted_assessments.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



- `sequences` is confirmed to be nested
- there are missing values in some JSON objects

In [20]:
# create 'assessments' table
extracted_assessments.registerTempTable('assessments')

In [21]:
# use SQL syntax to query data
spark.sql("select keen_id from assessments limit 10").show()

+--------------------+
|             keen_id|
+--------------------+
|5a6745820eb8ab000...|
|5a674541ab6b0a000...|
|5a67999d3ed3e3000...|
|5a6799694fc7c7000...|
|5a6791e824fccd000...|
|5a67a0b6852c2a000...|
|5a67b627cc80e6000...|
|5a67ac8cb0a5f4000...|
|5a67a9ba060087000...|
|5a67ac54411aed000...|
+--------------------+



In [22]:
# used dot notation with the [] operator to pull single item from a list
# note that sequences.questions is a list (multi-valued)
spark.sql("select keen_timestamp, sequences.questions[0].user_incomplete from assessments limit 10").show()

+------------------+-------------------------------------------------------+
|    keen_timestamp|sequences[questions] AS `questions`[0][user_incomplete]|
+------------------+-------------------------------------------------------+
| 1516717442.735266|                                                   true|
| 1516717377.639827|                                                  false|
| 1516738973.653394|                                                  false|
|1516738921.1137421|                                                  false|
| 1516737000.212122|                                                  false|
| 1516740790.309757|                                                  false|
|1516746279.3801291|                                                  false|
| 1516743820.305464|                                                  false|
|  1516743098.56811|                                                  false|
| 1516743764.813107|                                                  false|

### Nested multi-value as a dictionary
Sequence.id is extracted through writing a custom lambda transform function, creating a separate data frame, registering it as a temp table and using Spark SQL to join it to the outer nesting layer.

In [23]:
# function to unnest sequence.id
def my_lambda_sequences_id(x):
    raw_dict = json.loads(x.value)
    my_dict = {"keen_id" : raw_dict["keen_id"], "sequences_id" : raw_dict["sequences"]["id"]}
    return Row(**my_dict)

my_sequences = assessments.rdd.map(my_lambda_sequences_id).toDF()

In [24]:
# print schema
my_sequences.printSchema()

root
 |-- keen_id: string (nullable = true)
 |-- sequences_id: string (nullable = true)



In [25]:
# display result
my_sequences.show()

+--------------------+--------------------+
|             keen_id|        sequences_id|
+--------------------+--------------------+
|5a6745820eb8ab000...|5b28a462-7a3b-42e...|
|5a674541ab6b0a000...|5b28a462-7a3b-42e...|
|5a67999d3ed3e3000...|b370a3aa-bf9e-4c1...|
|5a6799694fc7c7000...|b370a3aa-bf9e-4c1...|
|5a6791e824fccd000...|04a192c1-4f5c-4ac...|
|5a67a0b6852c2a000...|e7110aed-0d08-4cb...|
|5a67b627cc80e6000...|5251db24-2a6e-424...|
|5a67ac8cb0a5f4000...|066b5326-e547-4da...|
|5a67a9ba060087000...|8ac691f8-8c1a-403...|
|5a67ac54411aed000...|066b5326-e547-4da...|
|5a67ad9b2ff312000...|083844c5-772f-48d...|
|5a67b610baff90000...|5251db24-2a6e-424...|
|5a67ac9837b82b000...|b68128a9-6b50-41f...|
|5a67aaa4f21cc2000...|67457eec-4cad-416...|
|5a67ac46f7bce8000...|066b5326-e547-4da...|
|5a67aedaf34e85000...|7b754bca-91a1-4aa...|
|5a67aefef5e149000...|7b754bca-91a1-4aa...|
|5a67ae3f0c5f48000...|42a1e4c5-7a08-469...|
|5a67ad579d5057000...|d51a016b-0122-452...|
|5a67aae6753fd6000...|67457eec-4

In [26]:
# create table
my_sequences.registerTempTable('sequences')

In [27]:
# query top 10 result
spark.sql("select sequences_id from sequences limit 10").show()

+--------------------+
|        sequences_id|
+--------------------+
|5b28a462-7a3b-42e...|
|5b28a462-7a3b-42e...|
|b370a3aa-bf9e-4c1...|
|b370a3aa-bf9e-4c1...|
|04a192c1-4f5c-4ac...|
|e7110aed-0d08-4cb...|
|5251db24-2a6e-424...|
|066b5326-e547-4da...|
|8ac691f8-8c1a-403...|
|066b5326-e547-4da...|
+--------------------+



In [28]:
# join and flatten table
spark.sql("select a.keen_id, a.keen_timestamp, s.sequences_id from assessments a join sequences s on a.keen_id = s.keen_id limit 10").show()

+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|        sequences_id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|8ac691f8-8c1a-403...|
|5a26ee9cbf5ce1000...|1512500892.4166169|9bd87823-4508-4e0...|
|5a29dcac74b662000...|1512692908.8423469|e7110aed-0d08-4cb...|
|5a2fdab0eabeda000...|1513085616.2275269|cd800e92-afc3-447...|
|5a30105020e9d4000...|1513099344.8624721|8ac691f8-8c1a-403...|
|5a3a6fc3f0a100000...| 1513779139.354213|e7110aed-0d08-4cb...|
|5a4e17fe08a892000...|1515067390.1336551|9abd5b51-6bd8-11e...|
|5a4f3c69cc6444000...| 1515142249.858722|083844c5-772f-48d...|
|5a51b21bd0480b000...| 1515303451.773272|e7110aed-0d08-4cb...|
|5a575a85329e1a000...| 1515674245.348099|25ca21fe-4dbb-446...|
+--------------------+------------------+--------------------+



### Nested multi-value as a list
Values from the list are extracted through writing a custom lambda transform function, creating a separate data frame, registering it as a temp table and using Spark SQL to join it to the outer nesting layer.

In [29]:
# function to unnest sequence.question
def my_lambda_questions(x):
    raw_dict = json.loads(x.value)
    my_list = []
    my_count = 0
    for l in raw_dict["sequences"]["questions"]:
        my_count += 1
        my_dict = {"keen_id" : raw_dict["keen_id"], "my_count" : my_count, "id" : l["id"]}
        my_list.append(Row(**my_dict))
    return my_list

my_questions = assessments.rdd.flatMap(my_lambda_questions).toDF()

In [30]:
# print schema
my_questions.printSchema()

root
 |-- id: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- my_count: long (nullable = true)



In [31]:
# display result
my_questions.show()

+--------------------+--------------------+--------+
|                  id|             keen_id|my_count|
+--------------------+--------------------+--------+
|7a2ed6d3-f492-49b...|5a6745820eb8ab000...|       1|
|bbed4358-999d-446...|5a6745820eb8ab000...|       2|
|e6ad8644-96b1-461...|5a6745820eb8ab000...|       3|
|95194331-ac43-454...|5a6745820eb8ab000...|       4|
|95194331-ac43-454...|5a674541ab6b0a000...|       1|
|bbed4358-999d-446...|5a674541ab6b0a000...|       2|
|e6ad8644-96b1-461...|5a674541ab6b0a000...|       3|
|7a2ed6d3-f492-49b...|5a674541ab6b0a000...|       4|
|b9ff2e88-cf9d-4bd...|5a67999d3ed3e3000...|       1|
|bec23e7b-4870-49f...|5a67999d3ed3e3000...|       2|
|1ba75b31-64a4-4bd...|5a67999d3ed3e3000...|       3|
|1f7c5def-904b-483...|5a67999d3ed3e3000...|       4|
|1f7c5def-904b-483...|5a6799694fc7c7000...|       1|
|bec23e7b-4870-49f...|5a6799694fc7c7000...|       2|
|1ba75b31-64a4-4bd...|5a6799694fc7c7000...|       3|
|b9ff2e88-cf9d-4bd...|5a6799694fc7c7000...|   

In [32]:
# create table
my_questions.registerTempTable('questions')

In [33]:
# query top 10 result
spark.sql("select id, my_count from questions limit 10").show()

+--------------------+--------+
|                  id|my_count|
+--------------------+--------+
|7a2ed6d3-f492-49b...|       1|
|bbed4358-999d-446...|       2|
|e6ad8644-96b1-461...|       3|
|95194331-ac43-454...|       4|
|95194331-ac43-454...|       1|
|bbed4358-999d-446...|       2|
|e6ad8644-96b1-461...|       3|
|7a2ed6d3-f492-49b...|       4|
|b9ff2e88-cf9d-4bd...|       1|
|bec23e7b-4870-49f...|       2|
+--------------------+--------+



In [34]:
# join and flatten table
spark.sql("select q.keen_id, a.keen_timestamp, q.id from assessments a join questions q on a.keen_id = q.keen_id limit 10").show()

+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|                  id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|803fc93f-7eb2-412...|
|5a17a67efa1257000...|1511499390.3836269|f3cb88cc-5b79-41b...|
|5a17a67efa1257000...|1511499390.3836269|32fe7d8d-6d89-4db...|
|5a17a67efa1257000...|1511499390.3836269|5c34cf19-8cfd-4f5...|
|5a26ee9cbf5ce1000...|1512500892.4166169|0603e6f4-c3f9-4c2...|
|5a26ee9cbf5ce1000...|1512500892.4166169|26a06b88-2758-45b...|
|5a26ee9cbf5ce1000...|1512500892.4166169|25b6effe-79b0-4c4...|
|5a26ee9cbf5ce1000...|1512500892.4166169|6de03a9b-2a78-46b...|
|5a26ee9cbf5ce1000...|1512500892.4166169|aaf39991-fa83-470...|
|5a26ee9cbf5ce1000...|1512500892.4166169|aab2e817-73dc-4ff...|
+--------------------+------------------+--------------------+



### Handling missing data in JSON data
When unnesting JSON for the assessments data, if a key in a dictionary does not exist for all the items, it will produce an error when referencing in the cases it does not exist. Mitigation of 'holes' in data is to default it to zero if it does not exist. However, if average or standard deviation values were to be computed, the zeroes will skew the results. In order to rectify this behavior, instead of using the 'map()' Spark function, 'flatMap()' will be used instead which removes a level of indirection at the end.

For example, the below illustrates what 'flatmap()' does:

Here is how the flat map works in this case. Suppose A, B, C, and D are all dictionaries:

( (A), ( ), (B), ( ) , ( ), (C), (D), ( ) )

flat maps to:

(A, B, C, D)

In [35]:
# function to flatten nested structure
def my_lambda_correct_total(x):
    # load json
    raw_dict = json.loads(x.value)
    
    # initialize empty list
    my_list = []
    
    # if sequences in dictionary
    if "sequences" in raw_dict:
        
        # if counts are in sequences
        if "counts" in raw_dict["sequences"]:
            
            # if correct and total are in counts
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:
                    
                # extract exam nameand count correctly answered questions and total
                my_dict = {"correct": raw_dict["sequences"]["counts"]["correct"], 
                           "total": raw_dict["sequences"]["counts"]["total"]}
                
                # append row to list
                my_list.append(Row(**my_dict))
    
    # return list output
    return my_list

my_correct_total = assessments.rdd.flatMap(my_lambda_correct_total).toDF()

In [36]:
# print schema
my_correct_total.printSchema()

root
 |-- correct: long (nullable = true)
 |-- total: long (nullable = true)



In [37]:
# display result
my_correct_total.show()

+-------+-----+
|correct|total|
+-------+-----+
|      2|    4|
|      1|    4|
|      3|    4|
|      2|    4|
|      3|    4|
|      5|    5|
|      1|    1|
|      5|    5|
|      4|    4|
|      0|    5|
|      3|    4|
|      1|    1|
|      4|    6|
|      4|    6|
|      4|    5|
|      3|    4|
|      3|    4|
|      4|    4|
|      2|    4|
|      6|    6|
+-------+-----+
only showing top 20 rows



In [38]:
# create table
my_correct_total.registerTempTable('ct')

In [39]:
# query top 10 result
spark.sql("select * from ct limit 10").show()

+-------+-----+
|correct|total|
+-------+-----+
|      2|    4|
|      1|    4|
|      3|    4|
|      2|    4|
|      3|    4|
|      5|    5|
|      1|    1|
|      5|    5|
|      4|    4|
|      0|    5|
+-------+-----+



In [40]:
# query top 10 result
spark.sql("select correct / total as score from ct limit 10").show()

+-----+
|score|
+-----+
|  0.5|
| 0.25|
| 0.75|
|  0.5|
| 0.75|
|  1.0|
|  1.0|
|  1.0|
|  1.0|
|  0.0|
+-----+



In [41]:
# query average exam score
spark.sql("select avg(correct / total)*100 as avg_score from ct limit 10").show()

+-----------------+
|        avg_score|
+-----------------+
|62.65699745547047|
+-----------------+



In [42]:
# query exam score standard deviation
spark.sql("select stddev(correct / total) as standard_deviation from ct limit 10").show()

+-------------------+
| standard_deviation|
+-------------------+
|0.31086692286170553|
+-------------------+



In [43]:
# write current form to HDFS
my_correct_total.write.mode("overwrite").parquet("/tmp/correct_total")

In [44]:
# write current form to HDFS
assessments.write.mode("overwrite").parquet("/tmp/assessments")

In [45]:
# write current form to HDFS
my_questions.write.mode("overwrite").parquet("/tmp/questions")

## Business Questions

Assumptions of the dataset:
- Each row is unique and there are no duplicates or erroneous datapoint
- The fields are aptly named and self-explanatory as described above

1) How many assessments are in the dataset?

In [46]:
num_assessments = spark.sql("select count(keen_id) as num_assessments from assessments")

In [47]:
num_assessments.show()

+---------------+
|num_assessments|
+---------------+
|           3280|
+---------------+



There are 3280 assessments. I used the `keen_id` field with the assumption that these are the profiles/instances created by assessment takers in order to record an assessment attempt.

2) What's the name of your Kafka topic? How did you come up with that name?

The name of my Kafka topic is `assessments`. It makes intuitive sense to name it `assessments` as we've created a service at our ed tech firm to deliver assessments for our customers. In addition, the raw data consist of assessment information.

3) How many people took Learning Git?

In [48]:
num_ppl_learning_git = spark.sql("select count(exam_name) as num_people from assessments where exam_name = 'Learning Git'")

In [49]:
num_ppl_learning_git.show()

+----------+
|num_people|
+----------+
|       394|
+----------+



There are 394 people that haven taken 'Learning Git'. I queried this using the `exam_name` field.

## Query and Store Business Question Answers

You are able to store the below in HDFS using the below shown examples.

In [50]:
# write to Hadoop
num_assessments.write.mode("overwrite").parquet("/tmp/num_assessments")

In [51]:
# write to Hadoop
num_ppl_learning_git.write.mode("overwrite").parquet("/tmp/num_ppl_learning_git")

### Verify results in Hadoop using following commands:

- Verify all files
```
docker-compose exec cloudera hadoop fs -ls /tmp/
```
- Verify specific file
```
docker-compose exec cloudera hadoop fs -ls /tmp/num_assessments
```