# Description of Data Pipeline for Education Assessment Service

## Summary
### In this document I will discuss the mechanisms involved in ingesting, storing, transforming, and analyzing data for my education assessment service. This jupyter notebook is running in Spark, so Spark commands are run natively in this notebook. Commands for Kafka and Hadoop are shown through screenshots and are only discussed in the initial pipeline since they are not modified in the following pipeline.

## Who this document is for
### This document is geared toward data scientists, who will be able to utilize this framework to access, transform, store, and query the data.

## Data Pipeline
### The data pipeline is composed of Kafka for data publishing and subscribing, Spark for data transformation and analysis, and Hadoop for storage.

## Contents
### (1) Prelude: Experimental Setup
### (2) Initial Pipeline Setup
### (3) Necessary Modifications to Pipeline Setup
### (4) Business Question 1
### (5) Business Question 2
### (6) Additional tools for Exploratory Data Analysis (EDA)


## 1. Prelude: Experimental Setup

### Neccessary libraries to run this jupyter notebook

In [54]:
import json
import pprint
from pyspark.sql import Row

### To discuss this service, I will utilize a moderately complex JSON file for the input assessment data.

In [13]:
# Using the bang (!) command I was able to run curl as if I was on the linux command line. Here we download the data.
! curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 9096k  100 9096k    0     0  10.0M      0 --:--:-- --:--:-- --:--:-- 10.0M


## 2. Initial Pipeline Setup

### Spin-up the containers: 
### docker-compose exec cloudera hadoop fs -ls /tmp/. Results show map reduce and supergroup

![check_hadoop.png](attachment:check_hadoop.png)

### Create a topic: 
### Our topic is called assessments
![create_assessment_topic.png](attachment:create_assessment_topic.png)

### Produce messages to kafka:
### Produce messages to assessments topic using kafkacat
![produce_test_messages_with_kafkacat.png](attachment:produce_test_messages_with_kafkacat.png)

### Read data into spark:
### Spin up pyspark to read the data from kafka. For this example, the Spark analysis is conducted in this notebook.
![spin_up_spark.png](attachment:spin_up_spark.png)

In [38]:
## Read from kafka
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 


In [39]:
## Cache raw_assessments to keep the warnings down
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [41]:
## Checking out the schema in raw_assessments
raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [79]:
## Cast values as strings for ease of analysis
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

### Storing data in Hadoop:
### Write the content from pyspark to hdfs

In [46]:
assessments.write.parquet("/tmp/assessments_2")

## Here is a screenshot from opening another terminal showing we were successful writing to hadoop
![hadoop_results_success.png](attachment:hadoop_results_success.png)

### Here is a view of what we actually wrote in hadoop hdfs. This result is not immediately useable, so more transformations of the data are needed.

In [49]:
## Here is a view of what we actually wrote in hadoop hdfs
assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



## 3. Necessary Modifications to Pipeline Setup

### Although our pipeline is working, the values in hdfs are not informative for analysis. Here is a method to create a more informative dataset for analysis

In [55]:
## First we apply a lambda transform Extract JSON Fields
extracted_assessment_fields = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

In [52]:
## let's check to see what the schema looks like now.
extracted_assessment_fields.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



In [74]:
## Here is the much more informative result of the show command
extracted_assessment_fields.show()

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

### Use TempTable for queries

In [81]:
## To conduct queries on your assessments in this service, the preferred way is to use registerTempTable
extracted_assessment_fields.registerTempTable('assessments')

In [82]:
## Here is an example of showing a selection of keen_id from assessments using the tempTable
spark.sql("select keen_id from assessments limit 10").show()

+--------------------+
|             keen_id|
+--------------------+
|5a6745820eb8ab000...|
|5a674541ab6b0a000...|
|5a67999d3ed3e3000...|
|5a6799694fc7c7000...|
|5a6791e824fccd000...|
|5a67a0b6852c2a000...|
|5a67b627cc80e6000...|
|5a67ac8cb0a5f4000...|
|5a67a9ba060087000...|
|5a67ac54411aed000...|
+--------------------+



In [59]:
## Show another analysis from a sample of the data
spark.sql("select keen_timestamp, sequences.questions[0].user_incomplete from assessments limit 10").show()

+------------------+-------------------------------------------------------+
|    keen_timestamp|sequences[questions] AS `questions`[0][user_incomplete]|
+------------------+-------------------------------------------------------+
| 1516717442.735266|                                                   true|
| 1516717377.639827|                                                  false|
| 1516738973.653394|                                                  false|
|1516738921.1137421|                                                  false|
| 1516737000.212122|                                                  false|
| 1516740790.309757|                                                  false|
|1516746279.3801291|                                                  false|
| 1516743820.305464|                                                  false|
|  1516743098.56811|                                                  false|
| 1516743764.813107|                                                  false|

## 4. Business Question 1:
### How complex is your data structure? For example, if you have a multi-level JSON file be careful when attempting to query the data. For example. The following query does not work because the sequences value is a dictionary, so the ID is a key of the nested dictionary.

In [61]:
spark.sql("select sequence.id from assessments limit 10").show()

AnalysisException: "cannot resolve '`sequence.id`' given input columns: [exam_name, sequences, user_exam_id, base_exam_id, started_at, certification, keen_id, keen_created_at, max_attempts, keen_timestamp]; line 1 pos 7;\n'GlobalLimit 10\n+- 'LocalLimit 10\n   +- 'Project ['sequence.id]\n      +- SubqueryAlias assessments\n         +- LogicalRDD [base_exam_id#482, certification#483, exam_name#484, keen_created_at#485, keen_id#486, keen_timestamp#487, max_attempts#488, sequences#489, started_at#490, user_exam_id#491]\n"

### In order to extract the sequence ID field, you can apply this lambda transform function, which creates a separate data frame and registers it as a temp table. Once the temp table is created it uses spark sql to join it to the outer nesting layer. See below:

In [62]:
## Lambda function to extract sequence ID field

def my_lambda_sequences_id(x):
    raw_dict = json.loads(x.value)
    my_dict = {"keen_id" : raw_dict["keen_id"], "sequences_id" : raw_dict["sequences"]["id"]}
    return Row(**my_dict)

my_sequences = assessments.rdd.map(my_lambda_sequences_id).toDF()

my_sequences.registerTempTable('sequences')


In [63]:
## Showing sequence ids extracted
spark.sql("select sequences_id from sequences limit 10").show()

+--------------------+
|        sequences_id|
+--------------------+
|5b28a462-7a3b-42e...|
|5b28a462-7a3b-42e...|
|b370a3aa-bf9e-4c1...|
|b370a3aa-bf9e-4c1...|
|04a192c1-4f5c-4ac...|
|e7110aed-0d08-4cb...|
|5251db24-2a6e-424...|
|066b5326-e547-4da...|
|8ac691f8-8c1a-403...|
|066b5326-e547-4da...|
+--------------------+



In [65]:
# Showing keen_id, keen_timestamp, and sequences_id
spark.sql("select a.keen_id, a.keen_timestamp, s.sequences_id from assessments a join sequences s on a.keen_id = s.keen_id limit 10").show()

+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|        sequences_id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|8ac691f8-8c1a-403...|
|5a26ee9cbf5ce1000...|1512500892.4166169|9bd87823-4508-4e0...|
|5a29dcac74b662000...|1512692908.8423469|e7110aed-0d08-4cb...|
|5a2fdab0eabeda000...|1513085616.2275269|cd800e92-afc3-447...|
|5a30105020e9d4000...|1513099344.8624721|8ac691f8-8c1a-403...|
|5a3a6fc3f0a100000...| 1513779139.354213|e7110aed-0d08-4cb...|
|5a4e17fe08a892000...|1515067390.1336551|9abd5b51-6bd8-11e...|
|5a4f3c69cc6444000...| 1515142249.858722|083844c5-772f-48d...|
|5a51b21bd0480b000...| 1515303451.773272|e7110aed-0d08-4cb...|
|5a575a85329e1a000...| 1515674245.348099|25ca21fe-4dbb-446...|
+--------------------+------------------+--------------------+



### Here is an example of a function that can pull out all values from a list using a custom lambda transform.

In [66]:
def my_lambda_questions(x):
    raw_dict = json.loads(x.value)
    my_list = []
    my_count = 0
    for l in raw_dict["sequences"]["questions"]:
        my_count += 1
        my_dict = {"keen_id" : raw_dict["keen_id"], "my_count" : my_count, "id" : l["id"]}
        my_list.append(Row(**my_dict))
    return my_list

my_questions = assessments.rdd.flatMap(my_lambda_questions).toDF()

my_questions.registerTempTable('questions')



In [67]:
spark.sql("select id, my_count from questions limit 10").show()

+--------------------+--------+
|                  id|my_count|
+--------------------+--------+
|7a2ed6d3-f492-49b...|       1|
|bbed4358-999d-446...|       2|
|e6ad8644-96b1-461...|       3|
|95194331-ac43-454...|       4|
|95194331-ac43-454...|       1|
|bbed4358-999d-446...|       2|
|e6ad8644-96b1-461...|       3|
|7a2ed6d3-f492-49b...|       4|
|b9ff2e88-cf9d-4bd...|       1|
|bec23e7b-4870-49f...|       2|
+--------------------+--------+



In [68]:
spark.sql("select q.keen_id, a.keen_timestamp, q.id from assessments a join questions q on a.keen_id = q.keen_id limit 10").show()

+--------------------+------------------+--------------------+
|             keen_id|    keen_timestamp|                  id|
+--------------------+------------------+--------------------+
|5a17a67efa1257000...|1511499390.3836269|803fc93f-7eb2-412...|
|5a17a67efa1257000...|1511499390.3836269|f3cb88cc-5b79-41b...|
|5a17a67efa1257000...|1511499390.3836269|32fe7d8d-6d89-4db...|
|5a17a67efa1257000...|1511499390.3836269|5c34cf19-8cfd-4f5...|
|5a26ee9cbf5ce1000...|1512500892.4166169|0603e6f4-c3f9-4c2...|
|5a26ee9cbf5ce1000...|1512500892.4166169|26a06b88-2758-45b...|
|5a26ee9cbf5ce1000...|1512500892.4166169|25b6effe-79b0-4c4...|
|5a26ee9cbf5ce1000...|1512500892.4166169|6de03a9b-2a78-46b...|
|5a26ee9cbf5ce1000...|1512500892.4166169|aaf39991-fa83-470...|
|5a26ee9cbf5ce1000...|1512500892.4166169|aab2e817-73dc-4ff...|
+--------------------+------------------+--------------------+



## 5. Business Question 2
### Is there a chance you will have missing data in your assessments. If so, you may consider running this function, which creates a significance level (level of indirection) on top of the dictionary, which will only add the dictionary if it has meaningful data. Below is an example:

In [95]:
def my_lambda_correct_total(x):
    """ Function to only utilize significant values in returns """
    raw_dict = json.loads(x.value)
    my_list = []
    
    if "sequences" in raw_dict:
        
        if "counts" in raw_dict["sequences"]:
            
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:
                    
                my_dict = {"correct": raw_dict["sequences"]["counts"]["correct"], 
                           "total": raw_dict["sequences"]["counts"]["total"]}
                my_list.append(Row(**my_dict))
    
    return my_list

my_correct_total = assessments.rdd.flatMap(my_lambda_correct_total).toDF()

my_correct_total.registerTempTable('ct')

In [96]:
## Example of showing a total result when there are 0 correct
spark.sql("select * from ct limit 10").show()

+-------+-----+
|correct|total|
+-------+-----+
|      2|    4|
|      1|    4|
|      3|    4|
|      2|    4|
|      3|    4|
|      5|    5|
|      1|    1|
|      5|    5|
|      4|    4|
|      0|    5|
+-------+-----+



In [97]:
## Example showing a score that could not have been calculated without my_lambda_correct_total function
spark.sql("select correct / total as score from ct limit 10").show()

+-----+
|score|
+-----+
|  0.5|
| 0.25|
| 0.75|
|  0.5|
| 0.75|
|  1.0|
|  1.0|
|  1.0|
|  1.0|
|  0.0|
+-----+



In [98]:
## Example of an average score that could not have been calculated without my_lambda_correct_total_function
spark.sql("select avg(correct / total)*100 as avg_score from ct limit 10").show()

+-----------------+
|        avg_score|
+-----------------+
|62.65699745547047|
+-----------------+



In [99]:
## Example of a standard deviation calculation that could not have been calculated without my_lambda_correct_total_function
spark.sql("select stddev(correct / total) as standard_deviation from ct limit 10").show()

+-------------------+
| standard_deviation|
+-------------------+
|0.31086692286170553|
+-------------------+



## 6. Additional tools for Exploratory Data Analysis (EDA)
### In the next 4 lines we pull the JSON data into an object so we can manipulate it.

In [5]:
f = open("assessment-attempts-20180128-121051-nested.json", "r")

In [6]:
s = f.read()

In [7]:
json_data = json.loads(s)

In [8]:
f.close()

In [78]:
p = pprint.PrettyPrinter(indent=1)

### The one-line command below uses the PPrint library to display a full record. 

In [10]:
p.pprint(json_data[0])

{'base_exam_id': '37f0a30a-7464-11e6-aa92-a8667f27e5dc',
 'certification': 'false',
 'exam_name': 'Normal Forms and All That Jazz Master Class',
 'keen_created_at': '1516717442.735266',
 'keen_id': '5a6745820eb8ab00016be1f1',
 'keen_timestamp': '1516717442.735266',
 'max_attempts': '1.0',
 'sequences': {'attempt': 1,
               'counts': {'all_correct': False,
                          'correct': 2,
                          'incomplete': 1,
                          'incorrect': 1,
                          'submitted': 4,
                          'total': 4,
                          'unanswered': 0},
               'id': '5b28a462-7a3b-42e0-b508-09f3906d1703',
               'questions': [{'id': '7a2ed6d3-f492-49b3-b8aa-d080a8aad986',
                              'options': [{'at': '2018-01-23T14:23:24.670Z',
                                           'checked': True,
                                           'correct': True,
                                           'id': '

### The function below provides a function which recursively walks through the JSON format.

In [11]:
def recursive_walk_json_object(j, level):
    
    level += 1
    
    if type(j) is dict:
        dict_2_list = list(j.keys())
        dict_2_list.sort()
        for k in dict_2_list:
            print("  " * level + "L" + str(level), k)
            recursive_walk_json_object(j[k], level)
            
    elif type(j) is list:
        for (i, l) in enumerate(j):
            print("  " * level + " [" + str(i) + "]")
            recursive_walk_json_object(l, level)
            
    else:
        print("  " * level + " value:", j)

### Example of using recursive walk function

In [77]:
# Here is an example of using the recursive_walk_json_object function.
recursive_walk_json_object(json_data[0], -1)

L0 base_exam_id
   value: 37f0a30a-7464-11e6-aa92-a8667f27e5dc
L0 certification
   value: false
L0 exam_name
   value: Normal Forms and All That Jazz Master Class
L0 keen_created_at
   value: 1516717442.735266
L0 keen_id
   value: 5a6745820eb8ab00016be1f1
L0 keen_timestamp
   value: 1516717442.735266
L0 max_attempts
   value: 1.0
L0 sequences
  L1 attempt
     value: 1
  L1 counts
    L2 all_correct
       value: False
    L2 correct
       value: 2
    L2 incomplete
       value: 1
    L2 incorrect
       value: 1
    L2 submitted
       value: 4
    L2 total
       value: 4
    L2 unanswered
       value: 0
  L1 id
     value: 5b28a462-7a3b-42e0-b508-09f3906d1703
  L1 questions
     [0]
      L3 id
         value: 7a2ed6d3-f492-49b3-b8aa-d080a8aad986
      L3 options
         [0]
          L5 at
             value: 2018-01-23T14:23:24.670Z
          L5 checked
             value: True
          L5 correct
             value: True
          L5 id
             value: 49c574b4-5c82-4ffd