### Creating the Final Table

In this part of the code, I will be going through the pyspark code used to create a table that can be SQL called/put into an HDFS. Comments will be in the form of comments in the code.

In [1]:
# Do necessary imports
import json
from pyspark.sql import Row

# Read in Data From Kafka
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

In [2]:
# Cached to cut back on later warnings
raw_assessments.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [3]:
# Cast it as a string
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [4]:
# Visualize what we have currently
assessments.show()

+--------------------+
|               value|
+--------------------+
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
|{"keen_timestamp"...|
+--------------------+
only showing top 20 rows



Business Questions:

1) How many people took each exam? -- Create unrolled table, can answer overall size, etc

2) What is the average and standard deviation of scores on all exams? -- Create Scores/Questions Table

3) For a particular user_exam_id (use '6d4089e4-bde5-4a22-b65f-18bce9ab79c8' as an example, what are the id's of the questions asked? Sequence Id? -- Create Questions table with sequence id, user_exam_id, question id's

In [41]:
# Question 1 Work

# Unroll assessments from the top (will have nesting in sequences column, don't use for anything there)
# Best for questions only involving outermost layer of Json: base_exam_id to user_exam_id level 
# Called onelayerassessments due to this structure
unroll_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()
unroll_assessments.registerTempTable('assessments')

# Get how many assessments, count by assessment
spark.sql("select count(*) from assessments").show()
spark.sql("select count(*), exam_name from assessments group by exam_name").show()

# To summarize how many people took each exam, find max, min
spark.sql("select count(*), exam_name from assessments group by exam_name order by count(*) desc limit 1").show()
spark.sql("select count(*), exam_name from assessments group by exam_name order by count(*) asc limit 1").show()

# Write to HDFS
unroll_assessments.write.parquet("/tmp/assessments")

+--------+
|count(1)|
+--------+
|    3280|
+--------+

+--------+--------------------+
|count(1)|           exam_name|
+--------+--------------------+
|       9|Learning Data Mod...|
|      15|Networking for Pe...|
|     158|Introduction to J...|
|      16|Learning Apache H...|
|       2|Learning Spring P...|
|      17|Learning iPython ...|
|     162|Introduction to P...|
|      35|Learning C# Best ...|
|      14|Introduction to A...|
|       9|A Practical Intro...|
|      15|I'm a Software Ar...|
|      75|Introduction to B...|
|       4|       View Updating|
|      25|Mastering Python ...|
|      43|Intermediate C# P...|
|       5|Starting a Grails...|
|       9|Introduction to A...|
|      21|JavaScript Templa...|
|      10|Being a Better In...|
|      34|Mastering Advance...|
+--------+--------------------+
only showing top 20 rows

+--------+------------+
|count(1)|   exam_name|
+--------+------------+
|     394|Learning Git|
+--------+------------+

+--------+-------------------

In [42]:
# Question 2 Work
# We remove those without values for correct, total from our table
def lambda_scores(x):
    raw_dict = json.loads(x.value)
    my_list = []
    
    if "sequences" in raw_dict:
        
        if "counts" in raw_dict["sequences"]:
            
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:
                my_dict = {"exam_name": raw_dict["exam_name"],
                           "correct": raw_dict["sequences"]["counts"]["correct"], 
                           "total": raw_dict["sequences"]["counts"]["total"],
                           "percent": 100 * raw_dict["sequences"]["counts"]["correct"] / raw_dict["sequences"]["counts"]["total"]}
                my_list.append(Row(**my_dict))
    
    return my_list

my_scores = assessments.rdd.flatMap(lambda_scores).toDF()

my_scores.registerTempTable('scores')

# Show table produced
spark.sql("select * from scores limit 10").show()

# Get sd, avg of scores to answer Q2
spark.sql("select avg(percent) from scores").show()
spark.sql("select stddev(percent) from scores").show()

# Show the number of each score for perspective
spark.sql("select count(*), percent from scores group by percent order by percent asc").show()

# Show percents for each exam
spark.sql("select avg(percent), exam_name from scores group by exam_name order by exam_name asc").show()

# Write to HDFS
my_scores.write.parquet("/tmp/scores")

+-------+--------------------+-------+-----+
|correct|           exam_name|percent|total|
+-------+--------------------+-------+-----+
|      2|Normal Forms and ...|   50.0|    4|
|      1|Normal Forms and ...|   25.0|    4|
|      3|The Principles of...|   75.0|    4|
|      2|The Principles of...|   50.0|    4|
|      3|Introduction to B...|   75.0|    4|
|      5|        Learning Git|  100.0|    5|
|      1|Git Fundamentals ...|  100.0|    1|
|      5|Introduction to P...|  100.0|    5|
|      4|Intermediate Pyth...|  100.0|    4|
|      0|Introduction to P...|    0.0|    5|
+-------+--------------------+-------+-----+

+------------------+
|      avg(percent)|
+------------------+
|62.656997455470844|
+------------------+

+--------------------+
|stddev_samp(percent)|
+--------------------+
|  31.086692286170475|
+--------------------+

+--------+------------------+
|count(1)|           percent|
+--------+------------------+
|     239|               0.0|
|       3|              10.

In [55]:
# Question 3 Work
def lambda_questions(x):
    raw_dict = json.loads(x.value)
    my_list = []
    my_count = 0
    for l in raw_dict["sequences"]["questions"]:
        my_count += 1
        my_dict = {"user_exam_id" : raw_dict["user_exam_id"],
                   "exam_name": raw_dict["exam_name"],
                   "question_number" : my_count, 
                   "question_id" : l["id"],
                   "sequence_id": raw_dict["sequences"]["id"]
                  }
        my_list.append(Row(**my_dict))
        
    return my_list

my_questions = assessments.rdd.flatMap(lambda_questions).toDF()

my_questions.registerTempTable('questions')

# Look at table created
spark.sql("select * from questions limit 10").show()

# Answer Q3
spark.sql("select * from questions where user_exam_id == '6d4089e4-bde5-4a22-b65f-18bce9ab79c8'").show()

# Write to HDFS
my_questions.write.parquet("/tmp/questions")

+--------------------+--------------------+---------------+--------------------+--------------------+
|           exam_name|         question_id|question_number|         sequence_id|        user_exam_id|
+--------------------+--------------------+---------------+--------------------+--------------------+
|Normal Forms and ...|7a2ed6d3-f492-49b...|              1|5b28a462-7a3b-42e...|6d4089e4-bde5-4a2...|
|Normal Forms and ...|bbed4358-999d-446...|              2|5b28a462-7a3b-42e...|6d4089e4-bde5-4a2...|
|Normal Forms and ...|e6ad8644-96b1-461...|              3|5b28a462-7a3b-42e...|6d4089e4-bde5-4a2...|
|Normal Forms and ...|95194331-ac43-454...|              4|5b28a462-7a3b-42e...|6d4089e4-bde5-4a2...|
|Normal Forms and ...|95194331-ac43-454...|              1|5b28a462-7a3b-42e...|2fec1534-b41f-441...|
|Normal Forms and ...|bbed4358-999d-446...|              2|5b28a462-7a3b-42e...|2fec1534-b41f-441...|
|Normal Forms and ...|e6ad8644-96b1-461...|              3|5b28a462-7a3b-42e...|2f

In [48]:
# Create function to do all transforms, get to final dataframe, for after easy parts to finish


# This was additional for fun/curiosity, not part of the actual project
def lambda_transform(x):
    raw_dict = json.loads(x.value)
    my_list = []
    
    my_dict = {"correct": raw_dict["sequences"]["counts"]["correct"], 
                "total": raw_dict["sequences"]["counts"]["total"],
              "base_exam_id": raw_dict["base_exam_id"]}
    # Try to create final table without merges, else just keep and merge on keen_id
    base_exam_id = raw_dict["base_exam_id"]
    #my_dict = {"base_exam_id": base_exam_id}
    my_list.append(Row(**my_dict))
    
    return my_list

In [51]:
my_final_table = assessments.rdd.flatMap(lambda_transform).toDF()

my_final_table.registerTempTable('finaltable')

In [52]:
spark.sql("select * from finaltable limit 10").show()

+--------------------+-------+-----+
|        base_exam_id|correct|total|
+--------------------+-------+-----+
|37f0a30a-7464-11e...|      2|    4|
|37f0a30a-7464-11e...|      1|    4|
|4beeac16-bb83-4d5...|      3|    4|
|4beeac16-bb83-4d5...|      2|    4|
|6442707e-7488-11e...|      3|    4|
|8b4488de-43a5-4ff...|      5|    5|
|e1f07fac-5566-4fd...|      1|    1|
|7e2e0b53-a7ba-458...|      5|    5|
|1a233da8-e6e5-48a...|      4|    4|
|7e2e0b53-a7ba-458...|      0|    5|
+--------------------+-------+-----+



In [44]:
extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

extracted_assessments.registerTempTable('assessments')

spark.sql("select exam_name from assessments").show()

+--------------------+
|           exam_name|
+--------------------+
|Normal Forms and ...|
|Normal Forms and ...|
|The Principles of...|
|The Principles of...|
|Introduction to B...|
|        Learning Git|
|Git Fundamentals ...|
|Introduction to P...|
|Intermediate Pyth...|
|Introduction to P...|
|A Practical Intro...|
|Git Fundamentals ...|
|Introduction to M...|
|   Python Epiphanies|
|Introduction to P...|
|Python Data Struc...|
|Python Data Struc...|
|Working with Algo...|
|Learning iPython ...|
|   Python Epiphanies|
+--------------------+
only showing top 20 rows

