# Project 2: Tracking User Activity

### Linux Commands Used

#### Setting up my assignment by copying/downloading files
```
cp ~/w205/course-content/08-Querying-Data/docker-compose.yml .
curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp

```

#### Bring up the cluster and verify it's running without any stray clusters
```
docker-compose up -d
docker-compose ps
docker ps -a
```

#### Creating a symbolic link
```
docker-compose exec spark bash
ln -s /w205 w205
exit
```

#### Running my Jupyter notebook in PySpark
```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
```

#### Checking if Hadoop is good
```
docker-compose exec cloudera hadoop fs -ls /tmp/
```

#### Creating a topic and then publishing that topic to kafka
```
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
docker-compose exec mids bash -c "cat /w205/project-2-jcweaver/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"
```

#### Checking that the data is on the topic
```
docker-compose exec mids bash -c "kafkacat -C -b kafka:29092 -t assessments -o beginning -e"
```

#### Breaking down the cluster
```
docker-compose down
```

## Checking Spark freebie commands

In [1]:
spark

In [2]:
sc

## Setup

Before beginning to answer business questions with this data, we need to set it up for querying. This involves:
* Subscribing to the topic, assessments, we added to Kafka
* Extracting the binary json data from this topic into a string in a new dataframe
* Registering the dataframe as a temporary table to then use SQL to query against

### Subscribing to the topic

In [1]:
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [2]:
raw_assessments.printSchema() #This is the kafka schema

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### Extracting the binary json data into a string in a new dataframe

In [104]:
#Extracting the binary json data into a string in a new dataframe
assessments = raw_assessments.select(raw_assessments.value.cast('string'))
assessments.printSchema()

root
 |-- value: string (nullable = true)



In [8]:
#Checking there's stuff in the dataframe
assessments.show() 

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



### Unrolling the JSON into a new dataframe

In [4]:
import json

from pyspark.sql import Row

In [5]:
#First lambda transform to make a dataframe
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()
extracted_assessments.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



In [6]:
#Looking at what's inside the dataframe I just made
extracted_assessments.show() #Map shows up if nested

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false

### Turning this dataframe into a temporary table

In [7]:
#Making this dataframe a temptable that I can query from
extracted_assessments.registerTempTable('assessments')

In [8]:
spark.sql("select keen_id from assessments limit 10").show()

+--------------------+
|             keen_id|
+--------------------+
|5a6745820eb8ab000...|
|5a674541ab6b0a000...|
|5a67999d3ed3e3000...|
|5a6799694fc7c7000...|
|5a6791e824fccd000...|
|5a67a0b6852c2a000...|
|5a67b627cc80e6000...|
|5a67ac8cb0a5f4000...|
|5a67a9ba060087000...|
|5a67ac54411aed000...|
+--------------------+



# Business Questions

In the following sections, I will answer the following business questions using spark SQL against the temporary table I created above and an additional temporary table I will create below.

* How many different exams were offered per course?
* How many exams had incompletely answered questions?
* What courses had the highest ratios of incompletes, corrects, and incorrects?

## How many different exams were offered per course?

First, let's look at some of the courses in this dataset. We can use the temporary table that I defined above.

In [51]:
spark.sql("select exam_name from assessments limit 20").show(truncate=False)

+--------------------------------------------+
|exam_name                                   |
+--------------------------------------------+
|Normal Forms and All That Jazz Master Class |
|Normal Forms and All That Jazz Master Class |
|The Principles of Microservices             |
|The Principles of Microservices             |
|Introduction to Big Data                    |
|Learning Git                                |
|Git Fundamentals for Web Developers         |
|Introduction to Python                      |
|Intermediate Python Programming             |
|Introduction to Python                      |
|A Practical Introduction to React.js        |
|Git Fundamentals for Web Developers         |
|Introduction to Modern Front-End Development|
|Python Epiphanies                           |
|Introduction to Python                      |
|Python Data Structures                      |
|Python Data Structures                      |
|Working with Algorithms in Python           |
|Learning iPy

We can see that some of them are repeated so let's see some distinct course names.

In [52]:
spark.sql("select DISTINCT exam_name from assessments limit 20").show(truncate=False)

+------------------------------------------------+
|exam_name                                       |
+------------------------------------------------+
|Learning Data Modeling                          |
|Networking for People Who Hate Networking       |
|Introduction to Java 8                          |
|Learning Apache Hadoop                          |
|Learning Spring Programming                     |
|Learning iPython Notebook                       |
|Introduction to Python                          |
|Learning C# Best Practices                      |
|Introduction to Architecting Amazon Web Services|
|A Practical Introduction to React.js            |
|I'm a Software Architect, Now What?             |
|Introduction to Big Data                        |
|View Updating                                   |
|Mastering Python - Networking and Security      |
|Intermediate C# Programming                     |
|Starting a Grails 3 Project                     |
|Introduction to Apache Spark  

Let's see how many distinct course names there are.

In [36]:
spark.sql("select COUNT(DISTINCT exam_name) from assessments").show()

+-------------------------+
|count(DISTINCT exam_name)|
+-------------------------+
|                      103|
+-------------------------+



Let's see how many distinct exam ids there are.

In [44]:
spark.sql("select COUNT(DISTINCT base_exam_id) from assessments").show()

+----------------------------+
|count(DISTINCT base_exam_id)|
+----------------------------+
|                         107|
+----------------------------+



There's a few more distinct exam ids than course names, which means some courses gave more than one exam. Let's now see which courses gave more than one exam.

In [109]:
spark.sql("select COUNT( DISTINCT base_exam_id) as count, exam_name from assessments GROUP BY exam_name ORDER BY count DESC").show(truncate=False)

+-----+----------------------------------------------------+
|count|exam_name                                           |
+-----+----------------------------------------------------+
|2    |Great Bash                                          |
|2    |Being a Better Introvert                            |
|2    |Introduction to Python                              |
|2    |Architectural Considerations for Hadoop Applications|
|1    |Introduction to Java 8                              |
|1    |Learning Spring Programming                         |
|1    |Learning Apache Hadoop                              |
|1    |Introduction to Architecting Amazon Web Services    |
|1    |Learning C# Best Practices                          |
|1    |Learning iPython Notebook                           |
|1    |Networking for People Who Hate Networking           |
|1    |JavaScript Templating                               |
|1    |A Practical Introduction to React.js                |
|1    |Learning DNS     

We can see that 4 courses: Introduction to Python, Great Bash, Being a Better Introvert, and Architectural Considerations for Hadoop Applications had 2 different exams. Next, I'll write this to HDFS in case we want to use later.

In [110]:
exam_counts_per_course = spark.sql("select COUNT( DISTINCT base_exam_id) as count, exam_name from assessments GROUP BY exam_name ORDER BY count DESC")
exam_counts_per_course.write.parquet("/tmp/exam_counts_per_course")

## How many exams had incompletely answered questions?

First, I'll define a lambda function that will help us unroll more of the JSON that contains the counts of question responses.

In [86]:
def my_lambda_results_total(x):
    
    raw_dict = json.loads(x.value)
    my_list = []
    
    if "sequences" in raw_dict:
        
        if "counts" in raw_dict["sequences"]:
            
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"] and "incomplete" in raw_dict["sequences"]["counts"] and "incorrect" in raw_dict["sequences"]["counts"] and "unanswered" in raw_dict["sequences"]["counts"] :
                    
                my_dict = {"correct": raw_dict["sequences"]["counts"]["correct"], 
                           "total": raw_dict["sequences"]["counts"]["total"],
                          "incomplete" : raw_dict["sequences"]["counts"]["incomplete"],
                          "incorrect" : raw_dict["sequences"]["counts"]["incorrect"],
                          "unanswered" : raw_dict["sequences"]["counts"]["unanswered"],
                          "exam_name" : raw_dict["exam_name"]}
                my_list.append(Row(**my_dict))
    
    return my_list

In [87]:
#Use the lambda function to create a dataframe
results_total = assessments.rdd.flatMap(my_lambda_results_total).toDF()

In [88]:
#Register the dataframe as a temptable to use for SQL
results_total.registerTempTable('results')

Let's look at the different counts we have for exam results.

In [89]:
spark.sql("select * from results limit 10").show()

+-------+--------------------+----------+---------+-----+----------+
|correct|           exam_name|incomplete|incorrect|total|unanswered|
+-------+--------------------+----------+---------+-----+----------+
|      2|Normal Forms and ...|         1|        1|    4|         0|
|      1|Normal Forms and ...|         2|        1|    4|         0|
|      3|The Principles of...|         0|        1|    4|         0|
|      2|The Principles of...|         2|        0|    4|         0|
|      3|Introduction to B...|         0|        1|    4|         0|
|      5|        Learning Git|         0|        0|    5|         0|
|      1|Git Fundamentals ...|         0|        0|    1|         0|
|      5|Introduction to P...|         0|        0|    5|         0|
|      4|Intermediate Pyth...|         0|        0|    4|         0|
|      0|Introduction to P...|         1|        0|    5|         4|
+-------+--------------------+----------+---------+-----+----------+



Let's look at the ratio of incompletely answered questions over the total questions. We are most interested in the exams that had a high ratio, which means that more of the questions were incomplete. This could be an indication that the questions are poorly worded and hard for students to comprehend how to fully answer. It could also mean the questions are difficult.

In [90]:
spark.sql("select COUNT(total), incomplete/total as incomplete_ratio from results GROUP BY incomplete_ratio ORDER BY incomplete_ratio DESC").show()

+------------+-------------------+
|count(total)|   incomplete_ratio|
+------------+-------------------+
|           4|                1.0|
|           1|                0.8|
|           8|               0.75|
|           1| 0.7142857142857143|
|          21| 0.6666666666666666|
|           8|                0.6|
|           4| 0.5714285714285714|
|          40|                0.5|
|          29|0.42857142857142855|
|          44|                0.4|
|          78| 0.3333333333333333|
|           1|                0.3|
|          38| 0.2857142857142857|
|         457|               0.25|
|         137|                0.2|
|          24|0.16666666666666666|
|          43|0.14285714285714285|
|          15|              0.125|
|           4|                0.1|
|        2318|                0.0|
+------------+-------------------+



We can see that there are 4 instances of exams where all the questions were marked as incomplete. Most of these different ratios have pretty lower numbers of counts but there are 457 exams that had 25% of the questions answered incompletely and 137 exams that had 20% of the questions answered incompletely. These might be worth looking into further.

To further investigate, let's also print the total number of questions. If the total is larger, having a smaller ratio of incompletes still means there are a lot of incompletely answered questions.

In [82]:
spark.sql("select COUNT(total), incomplete/total as incomplete_ratio, total from results GROUP BY incomplete_ratio, total ORDER BY incomplete_ratio DESC").show()

+------------+-------------------+-----+
|count(total)|   incomplete_ratio|total|
+------------+-------------------+-----+
|           1|                1.0|    4|
|           3|                1.0|    1|
|           1|                0.8|    5|
|           8|               0.75|    4|
|           1| 0.7142857142857143|    7|
|           1| 0.6666666666666666|    6|
|          20| 0.6666666666666666|    3|
|           8|                0.6|    5|
|           4| 0.5714285714285714|    7|
|           4|                0.5|    2|
|           1|                0.5|    8|
|          35|                0.5|    4|
|          29|0.42857142857142855|    7|
|          44|                0.4|    5|
|          13| 0.3333333333333333|    6|
|          65| 0.3333333333333333|    3|
|           1|                0.3|   10|
|          38| 0.2857142857142857|    7|
|         452|               0.25|    4|
|           5|               0.25|    8|
+------------+-------------------+-----+
only showing top

Most of the exams with all of the questions marked incompletely had only 1 question, so it's not as big of a deal that all of the questions were answered incomplete. It is worth looking into the exam that had 4 incompletely answered questions out of 4. Next I'll write this to HDFS in case we want to use later.

In [112]:
incomplete_ratios = spark.sql("select COUNT(total) as count, incomplete/total as incomplete_ratio, total from results GROUP BY incomplete_ratio, total ORDER BY incomplete_ratio DESC")
incomplete_ratios.write.parquet("/tmp/incomplete_ratios")

## What courses had the highest ratios of incompletes, corrects, and incorrects?

In [92]:
spark.sql("select COUNT(total), incomplete/total as incomplete_ratio, exam_name from results GROUP BY incomplete_ratio, exam_name ORDER BY incomplete_ratio DESC").show(truncate=False)

+------------+------------------+-----------------------------------------------------------------------+
|count(total)|incomplete_ratio  |exam_name                                                              |
+------------+------------------+-----------------------------------------------------------------------+
|2           |1.0               |View Updating                                                          |
|1           |1.0               |I'm a Software Architect, Now What?                                    |
|1           |1.0               |Introduction to Hadoop YARN                                            |
|1           |0.8               |Design Patterns in Java                                                |
|1           |0.75              |Building Web Services with Java                                        |
|3           |0.75              |Cloud Computing With AWS                                               |
|4           |0.75              |I'm a Softwar

We can see that the highest incomplete ratios came from the courses: View Updating, I'm a Software Architect, Now What?, and Introduction to Hadoop YARN. These might be courses to avoid due to the difficulty answering questions on these courses' exams.
Next, I'll write this to HDFS in case we want to use later.

In [116]:
incomplete_ratios_by_exam = spark.sql("select COUNT(total) as count, incomplete/total as incomplete_ratio, exam_name from results GROUP BY incomplete_ratio, exam_name ORDER BY incomplete_ratio DESC")
incomplete_ratios_by_exam.write.parquet("/tmp/incomplete_ratios_by_exam")

Let's do the same for the courses with the highest correct ratios. This is probably an indicator of easier courses to take.

In [100]:
spark.sql("select COUNT(total) AS count, correct/total as correct_ratio, exam_name from results GROUP BY correct_ratio, exam_name ORDER BY correct_ratio DESC, count DESC").show(truncate=False)

+-----+-------------+-----------------------------------------------------------+
|count|correct_ratio|exam_name                                                  |
+-----+-------------+-----------------------------------------------------------+
|130  |1.0          |Learning Git                                               |
|94   |1.0          |Introduction to Java 8                                     |
|46   |1.0          |Introduction to Machine Learning                           |
|29   |1.0          |Beginning Programming with JavaScript                      |
|24   |1.0          |Advanced Machine Learning                                  |
|23   |1.0          |Intermediate Python Programming                            |
|21   |1.0          |Learning Apache Maven                                      |
|21   |1.0          |Git Fundamentals for Web Developers                        |
|21   |1.0          |Learning Eclipse                                           |
|20   |1.0      

Learning Git had 130 people get all the questions correct. Introduction to Java 8 had 94 people get all questions correct. These seem to be the classes to take.

Next, I'll write this to HDFS to look at later.

In [114]:
correct_ratios_by_exam = spark.sql("select COUNT(total) AS count, correct/total as correct_ratio, exam_name from results GROUP BY correct_ratio, exam_name ORDER BY correct_ratio DESC, count DESC")
correct_ratios_by_exam.write.parquet("/tmp/correct_ratios_by_exam")

Lastly, let's look at courses with high ratios of incorrect.

In [103]:
spark.sql("select COUNT(total) AS count, incorrect/total as incorrect_ratio, exam_name from results GROUP BY incorrect_ratio, exam_name ORDER BY incorrect_ratio DESC, count DESC").show(truncate=False)

+-----+---------------+-----------------------------------------------------------+
|count|incorrect_ratio|exam_name                                                  |
+-----+---------------+-----------------------------------------------------------+
|7    |1.0            |Git Fundamentals for Web Developers                        |
|7    |1.0            |Learning Git                                               |
|5    |1.0            |An Introduction to d3.js: From Scattered to Scatterplot    |
|5    |1.0            |Software Architecture Fundamentals Beyond The Basics       |
|5    |1.0            |Nullology                                                  |
|4    |1.0            |Intermediate Python Programming                            |
|3    |1.0            |Introduction to Apache Hive                                |
|3    |1.0            |Being a Better Introvert                                   |
|3    |1.0            |Intermediate C# Programming                          

Git Fundamentals for Web Developers and Learning Git each had 7 students get all the questions incorrect on the exams. Even though we saw above that 130 people got all the questions right on Learning Git, 7 people got them all wrong. This could be concerning and worth investigating further.
Next I'll write this to HDFS in case we want to use later.

In [113]:
incorrect_ratios = spark.sql("select COUNT(total) AS count, incorrect/total as incorrect_ratio, exam_name from results GROUP BY incorrect_ratio, exam_name ORDER BY incorrect_ratio DESC, count DESC")
incorrect_ratios.write.parquet("/tmp/incorrect_ratios")