## Project 2: Tracking User Activity
- ### Class: W205 - Summer 2021
- ### Author: Matt Pribadi

----

 *Getting to this notebook, it was essential to create the infrastructure of the data pipeline in an organized fashion.
 This was done in a series of steps (see the history file for the complete commands) in order to ensure the data is available for analysis and presentation in a legible format.*
 
#### Setting up the data repositories:
 - A separate folder was created in order to properly create a Docker container
 - This Docker container utilizes the following cloud instances:
   - **Kafka**: To produce and consume messages
   - **Cloudera**: Is the most popular format of Hadoop. Used as a distributed file system (DFS) To store the messages in the queue and to perform queries
   - **Spark**: To perform advanced sql queries and exploratory data analysis using the pyspark package.
 
#### Producing and Consuming data
 - The file as brought in through a *curl* command and involves a highly nested json file
 - Docker-compose was used to spin up the container
 - The data was pulled into the pipeline using *kafkacat* and is queried in this notebook

#### Summary of Data
 - Properties of the data includes a heavily nested json file, with each main heading as a key-value parameter for each new assessment
 - Within each assessment, there is data associated with the assessment such as
   - Unique Identifier
   - Exam Name
   - Timestamps
   - Number of Questions
   - Number of Correct/Incorrect responses
   - and more
 - Multiple queues are required to get the datasets of interest for this topic

### Setting up the Python Notebook

In [50]:
#Importing libraries for reading and displaying data
import pandas as pd
import json
import pprint
from pyspark.sql import Row

In [3]:
p = pprint.PrettyPrinter(indent=1)

In [4]:
file = open("assessment-attempts-20180128-121051-nested.json","r")

In [5]:
s = file.read()

In [6]:
json_data = json.loads(s)

In [8]:
file.close()

### Understanding the nesting of the data

In [9]:
len(json_data)

3280

In [10]:
# this will pretty print the json in alphabetic order which may or may not match the file order
p.pprint(json_data[0])

{'base_exam_id': '37f0a30a-7464-11e6-aa92-a8667f27e5dc',
 'certification': 'false',
 'exam_name': 'Normal Forms and All That Jazz Master Class',
 'keen_created_at': '1516717442.735266',
 'keen_id': '5a6745820eb8ab00016be1f1',
 'keen_timestamp': '1516717442.735266',
 'max_attempts': '1.0',
 'sequences': {'attempt': 1,
               'counts': {'all_correct': False,
                          'correct': 2,
                          'incomplete': 1,
                          'incorrect': 1,
                          'submitted': 4,
                          'total': 4,
                          'unanswered': 0},
               'id': '5b28a462-7a3b-42e0-b508-09f3906d1703',
               'questions': [{'id': '7a2ed6d3-f492-49b3-b8aa-d080a8aad986',
                              'options': [{'at': '2018-01-23T14:23:24.670Z',
                                           'checked': True,
                                           'correct': True,
                                           'id': '

In [11]:
def recursive_walk_json_object(j, level):
    """recursively walk through a json object to explore the structure
       dictionaries will be put in alphabetic order to match the pretty print above"""
    
    level += 1
    
    if type(j) is dict:
        dict_2_list = list(j.keys())
        dict_2_list.sort()
        for k in dict_2_list:
            print("   " * level + "L" + str(level), k)
            recursive_walk_json_object(j[k], level)  
    elif type(j) is list:
        for (i, l) in enumerate(j):
            print("  " * level + "  [" + str(i) + "]")
            recursive_walk_json_object(l, level)         
    else:
        print("   " * level + " value:", j)

In [12]:
recursive_walk_json_object(json_data[0], -1)

L0 base_exam_id
    value: 37f0a30a-7464-11e6-aa92-a8667f27e5dc
L0 certification
    value: false
L0 exam_name
    value: Normal Forms and All That Jazz Master Class
L0 keen_created_at
    value: 1516717442.735266
L0 keen_id
    value: 5a6745820eb8ab00016be1f1
L0 keen_timestamp
    value: 1516717442.735266
L0 max_attempts
    value: 1.0
L0 sequences
   L1 attempt
       value: 1
   L1 counts
      L2 all_correct
          value: False
      L2 correct
          value: 2
      L2 incomplete
          value: 1
      L2 incorrect
          value: 1
      L2 submitted
          value: 4
      L2 total
          value: 4
      L2 unanswered
          value: 0
   L1 id
       value: 5b28a462-7a3b-42e0-b508-09f3906d1703
   L1 questions
      [0]
         L3 id
             value: 7a2ed6d3-f492-49b3-b8aa-d080a8aad986
         L3 options
          [0]
               L5 at
                   value: 2018-01-23T14:23:24.670Z
               L5 checked
                   value: True
               L

### Pulling in data from Kafka

In [13]:
raw_assessments = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe","assessments") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest").load() 

In [14]:
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [15]:
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [16]:
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

In [29]:
extracted_assessments

DataFrame[base_exam_id: string, certification: string, exam_name: string, keen_created_at: string, keen_id: string, keen_timestamp: string, max_attempts: string, sequences: map<string,array<map<string,boolean>>>, started_at: string, user_exam_id: string]

### Adding a table to be queried

In [30]:
extracted_assessments.registerTempTable('assessments')

#### Question 1: How many total assessments are there?

In [38]:
spark.sql("select count(exam_name) from assessments").show()

+----------------+
|count(exam_name)|
+----------------+
|            3280|
+----------------+



There are 3280 assessments in this data table

#### Question 2: How many people took "Learning Git"?

In [42]:
spark.sql("select count(exam_name) from assessments where exam_name like 'Learning Git'").show()

+----------------+
|count(exam_name)|
+----------------+
|             394|
+----------------+



There were 394 instances of Learning Git assessments in this dataset

#### Question 3: What is the least common course taken? And the most common?

In [67]:
spark.sql("SELECT exam_name, count(exam_name) as CountOf \
               FROM assessments \
               GROUP BY exam_name \
               ORDER BY CountOf DESC").show()

+--------------------+-------+
|           exam_name|CountOf|
+--------------------+-------+
|        Learning Git|    394|
|Introduction to P...|    162|
|Introduction to J...|    158|
|Intermediate Pyth...|    158|
|Learning to Progr...|    128|
|Introduction to M...|    119|
|Software Architec...|    109|
|Beginning C# Prog...|     95|
|    Learning Eclipse|     85|
|Learning Apache M...|     80|
|Beginning Program...|     79|
|       Mastering Git|     77|
|Introduction to B...|     75|
|Advanced Machine ...|     67|
|Learning Linux Sy...|     59|
|JavaScript: The G...|     58|
|        Learning SQL|     57|
|Practical Java Pr...|     53|
|    HTML5 The Basics|     52|
|   Python Epiphanies|     51|
+--------------------+-------+
only showing top 20 rows



In [65]:
spark.sql("SELECT exam_name, count(exam_name) as CountOf \
               FROM assessments \
               GROUP BY exam_name \
               ORDER BY CountOf").show()

+--------------------+-------+
|           exam_name|CountOf|
+--------------------+-------+
|Learning to Visua...|      1|
|Native Web Apps f...|      1|
|Nulls, Three-valu...|      1|
|Operating Red Hat...|      1|
|The Closed World ...|      2|
|Client-Side Data ...|      2|
|Arduino Prototypi...|      2|
|Understanding the...|      2|
|Hibernate and JPA...|      2|
|What's New in Jav...|      2|
|Learning Spring P...|      2|
| Mastering Web Views|      3|
|Using Web Components|      3|
|Service Based Arc...|      3|
|Getting Ready for...|      3|
|Building Web Serv...|      3|
|       View Updating|      4|
|Using Storytellin...|      4|
|An Introduction t...|      5|
|Example Exam For ...|      5|
+--------------------+-------+
only showing top 20 rows



In [68]:
df = spark.sql("SELECT exam_name, count(exam_name) as CountOf \
               FROM assessments \
               GROUP BY exam_name \
               ORDER BY CountOf").toPandas()

**The most popular class taken was** *Learning Git* **at 394 assessments taken.** 

**The classes with the least assessments were:**

In [71]:
df[df['CountOf'] == 1]

Unnamed: 0,exam_name,CountOf
0,Learning to Visualize Data with D3.js,1
1,"Nulls, Three-valued Logic and Missing Information",1
2,Native Web Apps for Android,1
3,Operating Red Hat Enterprise Linux Servers,1


#### Project Question: Which assessments had the most INCOMPLETE responses sorted by the highest?

In [92]:
def my_lambda_correct_total(x):

    raw_dict = json.loads(x.value)
    my_list = []

    if "sequences" in raw_dict:

        if "counts" in raw_dict["sequences"]:

            if "incomplete" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:

                my_dict = {"exam_name":raw_dict['exam_name'],
                           "incomplete": raw_dict["sequences"]["counts"]["incomplete"], 
                           "total": raw_dict["sequences"]["counts"]["total"]}
                my_list.append(Row(**my_dict))

    return my_list

In [93]:
my_correct_total = assessments.rdd.flatMap(my_lambda_correct_total).toDF()

In [100]:
my_correct_total.registerTempTable('ct')

In [101]:
spark.sql("select * from ct").show()

+--------------------+----------+-----+
|           exam_name|incomplete|total|
+--------------------+----------+-----+
|Normal Forms and ...|         1|    4|
|Normal Forms and ...|         2|    4|
|The Principles of...|         0|    4|
|The Principles of...|         2|    4|
|Introduction to B...|         0|    4|
|        Learning Git|         0|    5|
|Git Fundamentals ...|         0|    1|
|Introduction to P...|         0|    5|
|Intermediate Pyth...|         0|    4|
|Introduction to P...|         1|    5|
|A Practical Intro...|         1|    4|
|Git Fundamentals ...|         0|    1|
|Introduction to M...|         1|    6|
|   Python Epiphanies|         0|    6|
|Introduction to P...|         1|    5|
|Python Data Struc...|         0|    4|
|Python Data Struc...|         0|    4|
|Working with Algo...|         0|    4|
|Learning iPython ...|         0|    4|
|   Python Epiphanies|         0|    6|
+--------------------+----------+-----+
only showing top 20 rows



In [114]:
df_incomplete = spark.sql("SELECT exam_name, count(exam_name), CAST(incomplete/total AS DECIMAL(18,2)) as per_inc \
               FROM ct \
               GROUP BY exam_name, per_inc\
               ORDER BY per_inc DESC").toPandas()

In [115]:
df_incomplete.head()

Unnamed: 0,exam_name,count(exam_name),per_inc
0,"I'm a Software Architect, Now What?",1,1.0
1,Introduction to Hadoop YARN,1,1.0
2,View Updating,2,1.0
3,Design Patterns in Java,1,0.8
4,Building Web Services with Java,1,0.75


What this query shows is what assessments do students start, but are incomplete. These assessments may be too difficult, not be properly defined in scope, or have some other issue that is preventing students from completing the assessment. Therefore, when evaluating which assessments we want to keep or revamp, this would be a good starting list on what to work on. Specifically the three exams where there are issues.

 - I'm a Software Architect, Now What?
 - Introduction to Hadoop YARN
 - View Updating
 
One limitation is that there is very few instances of these exams being taken (therefore, it could be just issues with that one or two students).