# Spark Process

In [1]:
import json
import pandas as pd
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import warnings

In [2]:
raw_assessment = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe","assessment") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load() 

In [3]:
raw_assessment.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [4]:
raw_assessment.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
assessment = raw_assessment.select(raw_assessment.value.cast('string'))
assessment.show(4)

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 4 rows



In [6]:
# write to hdfs storage 
assessment.write.parquet("/tmp/assessment")

In [7]:
# Read from parquet
read_assessment = spark.read.parquet('/tmp/assessment')
read_assessment.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



Parsing Json

In [8]:
extracted_assessment = read_assessment.rdd.map(lambda x: json.loads(x.value)).toDF()



In [9]:
extracted_assessment.show(2)

+--------------------+-------------+--------------------+-----------------+--------------------+-----------------+------------+--------------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|  keen_created_at|             keen_id|   keen_timestamp|max_attempts|           sequences|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+-----------------+--------------------+-----------------+------------+--------------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...|1516717442.735266|5a6745820eb8ab000...|1516717442.735266|         1.0|Map(questions -> ...|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...|1516717377.639827|5a674541ab6b0a000...|1516717377.639827|         1.0|Map(questions -> ...|2018-01-23T14:21:...|2fec1534-b41f-441...|
+--------------------+-------------+---------

In [10]:
# Read using spark SQL 
extracted_assessment.registerTempTable('assessment')

Question 1 - What were the top 5 exams and how many people tool them?

In [11]:
# spark.sql("select exam_name, user_exam_id from assessment").show()
spark.sql("select exam_name, count(*) from assessment group by exam_name order by count(1) DESC LIMIT 5").show()

+--------------------+--------+
|           exam_name|count(1)|
+--------------------+--------+
|        Learning Git|     394|
|Introduction to P...|     162|
|Introduction to J...|     158|
|Intermediate Pyth...|     158|
|Learning to Progr...|     128|
+--------------------+--------+

