# Project 2: Tracking User Activity

This notebook serves as a step-by-step tuide to build the pipeline, consume messages in Spark, and run transformations to land assessments data so that it could be queried by clients.

### JSON file

**1. access the data by using curl and explore the data structure**

In [2]:
!curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 9096k  100 9096k    0     0  20.2M      0 --:--:-- --:--:-- --:--:-- 63.2M


In [3]:
!cat assessment-attempts-20180128-121051-nested.json | jq '.[0]'

[1;39m{
  [0m[34;1m"keen_timestamp"[0m[1;39m: [0m[0;32m"1516717442.735266"[0m[1;39m,
  [0m[34;1m"max_attempts"[0m[1;39m: [0m[0;32m"1.0"[0m[1;39m,
  [0m[34;1m"started_at"[0m[1;39m: [0m[0;32m"2018-01-23T14:23:19.082Z"[0m[1;39m,
  [0m[34;1m"base_exam_id"[0m[1;39m: [0m[0;32m"37f0a30a-7464-11e6-aa92-a8667f27e5dc"[0m[1;39m,
  [0m[34;1m"user_exam_id"[0m[1;39m: [0m[0;32m"6d4089e4-bde5-4a22-b65f-18bce9ab79c8"[0m[1;39m,
  [0m[34;1m"sequences"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"questions"[0m[1;39m: [0m[1;39m[
      [1;39m{
        [0m[34;1m"user_incomplete"[0m[1;39m: [0m[0;39mtrue[0m[1;39m,
        [0m[34;1m"user_correct"[0m[1;39m: [0m[0;39mfalse[0m[1;39m,
        [0m[34;1m"options"[0m[1;39m: [0m[1;39m[
          [1;39m{
            [0m[34;1m"checked"[0m[1;39m: [0m[0;39mtrue[0m[1;39m,
            [0m[34;1m"at"[0m[1;39m: [0m[0;32m"2018-01-23T14:23:24.670Z"[0m[1;39m,
            [0m[34;1m"id"[0m[1;39m: 

### Kafka: publish and consume messages

**2. spin up cluster using docker-compose (extra step: check the containers with ps)**

In [13]:
!docker-compose up -d

Creating network "project-2-redcarrott_default" with the default driver
Creating project-2-redcarrott_zookeeper_1 ... 
Creating project-2-redcarrott_mids_1      ... 
Creating project-2-redcarrott_cloudera_1  ... 
[3BCreating project-2-redcarrott_kafka_1     ... mdone[0m[3A[2K
[2BCreating project-2-redcarrott_spark_1     ... mdone[0m
[1Bting project-2-redcarrott_spark_1     ... [32mdone[0m[1A[2K

In [14]:
!docker-compose ps

         Name                   Command           State           Ports         
--------------------------------------------------------------------------------
project-2-redcarrott_c   cdh_startup_script.sh    Up      11000/tcp, 11443/tcp, 
loudera_1                                                 19888/tcp, 50070/tcp, 
                                                          8020/tcp, 8088/tcp,   
                                                          8888/tcp, 9090/tcp    
project-2-redcarrott_k   /etc/confluent/docker/   Up      29092/tcp, 9092/tcp   
afka_1                   run                                                    
project-2-redcarrott_m   /bin/bash                Up      8888/tcp              
ids_1                                                                           
project-2-redcarrott_s   docker-entrypoint.sh     Up      0.0.0.0:8888->8888/tcp
park_1                   bash                             ,:::8888->8888/tcp    
project-2-redcarrott_z   /et

**3. create assessments topic + check that the topic has been properly created**

In [17]:
!docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181

Created topic assessments.


In [18]:
!docker-compose exec kafka kafka-topics --describe --topic assessments --zookeeper zookeeper:32181

Topic: assessments	TopicId: z5m2o9BFQMGuofP_CULjyw	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: assessments	Partition: 0	Leader: 1	Replicas: 1	Isr: 1


**4. publish messages to the assessments topic**

In [19]:
!docker-compose exec mids bash -c "cat /w205/project-2-redcarrott/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"

In [20]:
! docker-compose exec mids bash -c "ls /w205/"

annot_fpid.json  lp_data.csv	       redis-standalone
course-content	 project-2-redcarrott  spark-with-kafka
kafka		 redis-cluster	       spark-with-kafka-and-hdfs


### Spark

**5. launch Spark session in Jupyter notebook**

In [22]:
!docker-compose exec spark pyspark

Python 3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:09:58) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/04 22:02:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/11/04 22:03:05 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
21/11/04 22:03:06 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
21/11/04 22:03:08 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _

**6. transform messages**

6.1 consume messages from kafka into spark

In [None]:
data = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe", "assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()

In [None]:
data.cache()

*cache to cut back on warnings

6.2 look at schema of kafka data import 

In [None]:
data.printSchema()

6.3 explore the raw_assessments data structure

In [None]:
data.show()

6.4 select *value* and cast it as string

In [None]:
exam_data = data.select(data.value.cast('string'))

6.5 explore the *assessments* data structure

In [None]:
exam_data.show()

6.6 see what the first line of the table looks like:

In [None]:
exam_data.collect()[0]

**7. convert json to dataframe**

In [None]:
import json
import warnings
warnings.filterwarnings("ignore")

7.1 each row is a dictionary, so load dictionary using json package and conver to datarame

In [None]:
exam = exam_data.rdd.map(lambda x: json.loads(x.value)).toDF()

In [None]:
exam.printSchema()

7.2 after looking at the schema of the new dataframe, we see that the "sequences" column contains keys and values while other only have values. 

In [None]:
exam.select('sequences').collect()[87]

7.3 after looking at exames of what "sequence" contains, we see missing values. Since, data is not complete in this column, we decide to drop. 

In [None]:
exam = exam.drop('sequences')

In [None]:
exam.printSchema()

**8. land table to HDFS**

8.1 Now that the table is structured, move cloudera HDFS. 

In [None]:
exam.write.parquet("/tmp/exam")

8.2 register temporary view of table so that spark SQL queries can be used to answer our business questions. 

In [None]:
exam.registerTempTable('exam_data')

### SQL queries

**Q1: How many assesstments are in the dataset?**

In [None]:
spark.sql('''select count(distinct(user_exam_id)) as num_of_assessments from exam_data''').show()

+------------------+
|num_of_assessments|
+------------------+
|              3242|
+------------------+

**Q2: How many people took Learning Git?**

In [None]:
spark.sql('''select count(distinct(user_exam_id)) as num_of_people_took_Learning_Git from exam_data where exam_name = 'Learning Git' ''').show()

+-------------------------------+
|num_of_people_took_Learning_Git|
+-------------------------------+
|                            390|
+-------------------------------+

**Q3: How courses are in the dataset?**

In [None]:
spark.sql('''select count(distinct exam_name) as num_of_courses from exam_data''').show()

+--------------+   
|num_of_courses|
+--------------+
|           103|
+--------------+

This is the end of Project 2. 