# Project 2: Tracking User Activity (Part II)

This jupyter notebook contains the step-by-step procedures adopted to consume the messages in Spark and run the transformations required to land the assessments data in the form and structure it needs to be to be queried by our clients.

The document is organized in three chapters:
- [Use Spark to transform the messages](#use_spark_to_transform_the_messages)
- [Query the data using Spark SQL](#query_the_data_using_spark_sql)
- [Land transformed data into HDFS](#land_transformed_data_into_HDFS)

<a id='use_spark_to_transform_the_messages'></a>
## 4. Use spark to transform the messages

### 4.1 Consume the messages from kafka into spark

In [1]:
raw_assessments = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe", "assessments") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load() 

### 4.2 Cache to cut back on warnings

In [2]:
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

### 4.3 Print the schema in which data was imported from kafka

In [3]:
raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### 4.4 Explore the *raw_assessments* data structure

This is the way data was imported from kafka into spark. Each assessment (JSON entry) was imported as a row in the field value. The keys have null values, and the other fields: topic, partition, offset, timestamp, and timestampType are metadata that we don't need to keep in our queryable database.

In [4]:
raw_assessments.show()

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[7B 22 6B 65 65 6...|assessments|        0|     0|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     1|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     2|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     3|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     4|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     5|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     6|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|assessments|        0|     7|1969-12-31 23:59:...|   

### 4.5 Select field *value* and cast it as string

In [5]:
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

### 4.6 Explore the *assessments* data structure

We now can see clearly that each one of our assessment entries is represented as a line in the assessments dataframe. At this stage tough all information is compacted in a single field which makes this an unqueryable format. We will need to perform some transformations into the data before loading it into HDFS.

In [6]:
assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



### 4.7 Get first line of the table

In [7]:
assessments.collect()[0]

Row(value='{"keen_timestamp":"1516717442.735266","max_attempts":"1.0","started_at":"2018-01-23T14:23:19.082Z","base_exam_id":"37f0a30a-7464-11e6-aa92-a8667f27e5dc","user_exam_id":"6d4089e4-bde5-4a22-b65f-18bce9ab79c8","sequences":{"questions":[{"user_incomplete":true,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:24.670Z","id":"49c574b4-5c82-4ffd-9bd1-c3358faf850d","submitted":1,"correct":true},{"checked":true,"at":"2018-01-23T14:23:25.914Z","id":"f2528210-35c3-4320-acf3-9056567ea19f","submitted":1,"correct":true},{"checked":false,"correct":true,"id":"d1bf026f-554f-4543-bdd2-54dcf105b826"}],"user_submitted":true,"id":"7a2ed6d3-f492-49b3-b8aa-d080a8aad986","user_result":"missed_some"},{"user_incomplete":false,"user_correct":false,"options":[{"checked":true,"at":"2018-01-23T14:23:30.116Z","id":"a35d0e80-8c49-415d-b8cb-c21a02627e2b","submitted":1},{"checked":false,"correct":true,"id":"bccd6e2e-2cef-4c72-8bfa-317db0ac48bb"},{"checked":true,"at":"2018-01-23T14:23:41.

### 4.8 Provide the data schema to spark to extract only fields of interest

As this is a quite nested JSON file, with many levels, it will be hard for spark to get the implied schema from the data. So we will opt to provide the schema to spark (forced schema). We will take this opportunity also to select only the fields that should be more of interest of our clients to analyse.

In this sense, we are keeping only the following fields:
- **keen_id**: unique id to identify the assessment taken by a user, on a given exam, on a given timeframe
- **base_exam_id**: id of the exam (tracking the exams is very important to understand which of our exams are the most popular or most difficult, for example)
- **user_exam_id**: id of the user (tracking users is very important to understand user behaviour and enable personalization)
- **exam_name**: name of the assessment
- **certification**: certified vs. free assessments
- **started_at**: timestamp of assessment (important to keep tracking of metrics over time or understand user behaviour in specific timeframes - ex. weekdays vs. weekends)
- **max_attempts**: maximum number of attempts allowed
- **unanswered**: number of questions unaswered
- **incomplete**: number of questions not completed
- **incorrect**: number of questions answered incorrecly
- **correct**: number of questions answered correctly
- **total**: total number of question in the exam

We will not include on our final database other timestamps or ids that would be of lower interest. We will not include as well any of the details around questions and alternatives. Later on, if it were of interest of our clients to have access to that information we can retrieve the data from kafka again and save it in different tables with shared keys with our main asssesments table.

In [8]:
import json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [9]:
final_schema = StructType([StructField('keen_id', StringType(), True),
                     StructField('base_exam_id', StringType(), True),
                     StructField('user_exam_id', StringType(), True),
                     StructField('exam_name', StringType(), True),
                     StructField('certification', StringType(), True),
                     StructField('started_at', StringType(), True),
                     StructField('max_attempts', StringType(), True),
                     StructField('sequences', StructType([
                         StructField('counts', StructType([
                             StructField('unanswered', IntegerType(), True),
                             StructField('incomplete', IntegerType(), True),
                             StructField('incorrect', IntegerType(), True),
                             StructField('correct', IntegerType(), True),
                             StructField('total', IntegerType(), True),
                         ]))]))])

### 4.9 Unwrap JSON mapping json.loads function into RDD (forced schema)

In [10]:
focused_extracted_assessments = assessments.rdd.map(lambda x: json.loads(x.value)).toDF(schema=final_schema)

### 4.10 Check if JSON unwrapping worked

In [11]:
focused_extracted_assessments.show(5)

+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+------------+-------------+
|             keen_id|        base_exam_id|        user_exam_id|           exam_name|certification|          started_at|max_attempts|    sequences|
+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+------------+-------------+
|5a6745820eb8ab000...|37f0a30a-7464-11e...|6d4089e4-bde5-4a2...|Normal Forms and ...|        false|2018-01-23T14:23:...|         1.0|[[0,1,1,2,4]]|
|5a674541ab6b0a000...|37f0a30a-7464-11e...|2fec1534-b41f-441...|Normal Forms and ...|        false|2018-01-23T14:21:...|         1.0|[[0,2,1,1,4]]|
|5a67999d3ed3e3000...|4beeac16-bb83-4d5...|8edbc8a8-4d26-429...|The Principles of...|        false|2018-01-23T20:22:...|         1.0|[[0,0,1,3,4]]|
|5a6799694fc7c7000...|4beeac16-bb83-4d5...|c0ee680e-8892-4e6...|The Principles of...|        false|2018-01-23T20

### 4.11 Check out the new dataframe schema

*Note:* I kept the *max_attempts* field as string as my tentative to force it to integer did not work. I also kept the *started_at* field as string as my tentative to force it to timestamp did not work.

In [12]:
focused_extracted_assessments.printSchema()

root
 |-- keen_id: string (nullable = true)
 |-- base_exam_id: string (nullable = true)
 |-- user_exam_id: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: struct (nullable = true)
 |    |-- counts: struct (nullable = true)
 |    |    |-- unanswered: integer (nullable = true)
 |    |    |-- incomplete: integer (nullable = true)
 |    |    |-- incorrect: integer (nullable = true)
 |    |    |-- correct: integer (nullable = true)
 |    |    |-- total: integer (nullable = true)



<a id='query_the_data_using_spark_sql'></a>
## 5. Query the data using Spark SQL

### 5.1 Transforming data into a queryable TempTable

In [13]:
focused_extracted_assessments.registerTempTable('focused_assessments')

### 5.2 Using Spark SQL to query data

In this part of the report I will run some queries to showcase to our clients some of the questions they can answer using our data table.

#### 5.2.1 How many assesstments are in the dataset?

There are a total of 3,280 assessments in the dataset.

In [14]:
spark.sql("select count(keen_id) from focused_assessments").show()

+--------------+
|count(keen_id)|
+--------------+
|          3280|
+--------------+



#### 5.2.2 What are the most popular assessments in the dataset?

In [15]:
spark.sql("select exam_name, count(keen_id) as count from focused_assessments group by exam_name order by count desc").show()

+--------------------+-----+
|           exam_name|count|
+--------------------+-----+
|        Learning Git|  394|
|Introduction to P...|  162|
|Introduction to J...|  158|
|Intermediate Pyth...|  158|
|Learning to Progr...|  128|
|Introduction to M...|  119|
|Software Architec...|  109|
|Beginning C# Prog...|   95|
|    Learning Eclipse|   85|
|Learning Apache M...|   80|
|Beginning Program...|   79|
|       Mastering Git|   77|
|Introduction to B...|   75|
|Advanced Machine ...|   67|
|Learning Linux Sy...|   59|
|JavaScript: The G...|   58|
|        Learning SQL|   57|
|Practical Java Pr...|   53|
|    HTML5 The Basics|   52|
|   Python Epiphanies|   51|
+--------------------+-----+
only showing top 20 rows



#### 5.2.3 What are the most difficult assessments (lower student performance)?

In [16]:
spark.sql("select exam_name, round(sum(sequences.counts.correct)/sum(sequences.counts.total), 2) as pct_correct from focused_assessments group by exam_name order by pct_correct").show()

+--------------------+-----------+
|           exam_name|pct_correct|
+--------------------+-----------+
|Example Exam For ...|       null|
|Client-Side Data ...|        0.2|
|       View Updating|       0.25|
|Native Web Apps f...|       0.25|
|Arduino Prototypi...|       0.33|
|Mastering Advance...|       0.36|
|           Nullology|       0.38|
|Building Web Serv...|       0.42|
| Mastering Web Views|       0.42|
|Web & Native Work...|       0.42|
|Cloud Computing W...|       0.43|
|         Offline Web|       0.44|
|Learning C# Best ...|       0.46|
|Design Patterns i...|       0.47|
|  Learning Java EE 7|       0.48|
|Software Architec...|       0.48|
|Data Visualizatio...|       0.49|
|Being a Better In...|        0.5|
|Hibernate and JPA...|        0.5|
|Learning iPython ...|        0.5|
+--------------------+-----------+
only showing top 20 rows



#### 5.2.4 Is there any users taking more than one assessment?

In [17]:
spark.sql("select user_exam_id, count(keen_id) as count from focused_assessments group by user_exam_id order by count desc").show()

+--------------------+-----+
|        user_exam_id|count|
+--------------------+-----+
|1e325cc1-47a9-480...|    3|
|b7ac6d15-97e1-4e9...|    3|
|a244c11a-d890-4e3...|    3|
|3d63ec69-8d97-4f9...|    3|
|028ad26f-a89f-4a6...|    3|
|fa23b287-0d0a-468...|    3|
|00745aef-f3af-412...|    3|
|cdc5859d-b332-4fb...|    3|
|37cf5b0c-4807-421...|    3|
|ac80a11a-2e79-40e...|    3|
|c320d47f-60d4-49a...|    3|
|949aa36c-74c7-4fc...|    3|
|d4ab4aeb-1368-486...|    3|
|bd96cfbe-1532-4ba...|    3|
|a7e6fc04-245f-4e3...|    3|
|a45b5ee6-a4ed-4b1...|    3|
|66d91177-c436-4ee...|    3|
|6132da16-2c0c-436...|    3|
|c1eb4d4a-d6ef-43e...|    2|
|6e4889ab-5978-44b...|    2|
+--------------------+-----+
only showing top 20 rows



<a id='land_transformed_data_into_HDFS'></a>
## 6. Land transformed data into HDFS

### 6.1 Saving data in parquet format into HDFS

In [18]:
focused_extracted_assessments.write.parquet("/tmp/assessments")

### 6.2 Check data in HDFS tmp folder

In [49]:
!docker-compose exec cloudera hadoop fs -ls /tmp/

Found 3 items
drwxr-xr-x   - root   supergroup          0 2020-10-17 20:55 /tmp/assessments
drwxrwxrwt   - mapred mapred              0 2018-02-06 18:27 /tmp/hadoop-yarn
drwx-wx-wx   - root   supergroup          0 2020-10-17 20:52 /tmp/hive
