## W205 Project 2: Pyspark Cluster
#### Author: Kevin Xuan

Now we have entered the pyspark container. Inside this container, we will flatten the messy data ainto a structured table and remove unncessary data so that we or other data scientists can conduct further data analysis on the data and do not have to worry about dealing with unstructured or unnucessary data. First we will read data which is stored in `student-asssessment`.

### Read and Understand Data

In [1]:
# Read data that is stored in the topic student-assessment
data = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe","student-assessment") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load() 

To understand the command, we again are retrieving data from other containers, so we need to connect to kafka again with the `29092` port number. Since data is stored in kafka topic `student-assessment`, we have to specify that in our parameter. Next, we want to sort data based on time, so we specify startingoffsets and endingoffsets.

In [5]:
# Cache this to Remove Warning messages which will appear later
data.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [14]:
# Print Schema for this dataset
data.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
# Display the first 5 rows for the data
data.show(5)

+----+--------------------+------------------+---------+------+--------------------+-------------+
| key|               value|             topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------------------+---------+------+--------------------+-------------+
|null|[7B 22 6B 65 65 6...|student-assessment|        0|     0|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|student-assessment|        0|     1|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|student-assessment|        0|     2|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|student-assessment|        0|     3|1969-12-31 23:59:...|            0|
|null|[7B 22 6B 65 65 6...|student-assessment|        0|     4|1969-12-31 23:59:...|            0|
+----+--------------------+------------------+---------+------+--------------------+-------------+
only showing top 5 rows



We are only interested in the value column, so we select the "value" column. We see that "value" column is encoded, so we need to convert the column to string format so that the texts are readable. To do so, we perform the command below:

In [6]:
# Convert "value" column to string format
exam_data = data.select(data.value.cast('string'))

In [None]:
# Display the first 5 rows of the data
exam_data.show(5)

After converting the "value" column to string format, we see element in each row is a dictionary containing many keys and values. To make the data look better and into a structured table with many columns, for each row we load the dictionary and convert to a dataframe.

### Convert json column to dataframe

In [9]:
# import json python package
import json
import warnings
warnings.filterwarnings("ignore")

In [10]:
# We see that each row is a dictionary, so for each row we load the dictionary using json package
# and convert to dataframe.
exam = exam_data.rdd.map(lambda x: json.loads(x.value)).toDF()

In [None]:
# Display first 5 rows of the data
exam.show(5)

In [20]:
# Print the schema of the new dataframe
exam.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- sequences: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: map (containsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: boolean (valueContainsNull = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



By looking at the schema of the exam dataset, we see that only the "sequences" column still consists keys and values while all others consist only values. if we remove the "sequences" column, then our data becomes a structured table. Hence, we will evaluate the "sequences" column and see if it provides extremely important information. If not, we can just remove it, which saves us a lot of work. We will first look at the "sequences" value of some of the rows.

In [12]:
# Example 1: sequence of the first assessment
exam.select('sequences').collect()[0]

Row(sequences={'questions': [{'options': None, 'user_correct': False, 'user_incomplete': True, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': False, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}], 'id': None, 'attempt': None, 'counts': None})

In [14]:
# Example 2: sequence of the 88th assessment
exam.select('sequences').collect()[87]

Row(sequences={'questions': [{'options': None, 'user_correct': False, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': False, 'user_incomplete': True, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': False, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': False, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': False, 'user_incomplete': False, 'id': None, 'user_result': None, 

In [13]:
# Example 3: sequence of the 100th assessment 
exam.select('sequences').collect()[99]

Row(sequences={'questions': [{'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}, {'options': None, 'user_correct': True, 'user_incomplete': False, 'id': None, 'user_result': None, 'user_submitted': True}], 'id': None, 'attempt': None, 'counts': None})

From the 3 examples above, we can see that many of the data under "sequences" columns are missing. For instance, although we know student answers a multiple choice question correctly (that is `user_correct = True`), we do not know which exact multiple choice question the student answers correctly (that is `id = None`). With this information, we can not derive which question is easy or which question is challenging. Moreover, the information under "sequences" column does not answer our business questions which are asked later, and therefore we can drop this column. 

In [15]:
# Since "sequence" column does not help answer our business questions, we can remove the column from the table
exam = exam.drop('sequences')

Now we have removed the column, and we look at the schema of the data again.

In [17]:
exam.printSchema()

root
 |-- base_exam_id: string (nullable = true)
 |-- certification: string (nullable = true)
 |-- exam_name: string (nullable = true)
 |-- keen_created_at: string (nullable = true)
 |-- keen_id: string (nullable = true)
 |-- keen_timestamp: string (nullable = true)
 |-- max_attempts: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- user_exam_id: string (nullable = true)



By looking at the schema of the data, we see that all columns are value based, which indicates that we now have obtained a structured table in which each rectangular box contains only values and not a list or dictionary. Now we view the first 5 rows of the data to get a sense of what our data looks like.

In [34]:
# Display the first 5 rows of the data
exam.show(5)

+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+
|        base_exam_id|certification|           exam_name|   keen_created_at|             keen_id|    keen_timestamp|max_attempts|          started_at|        user_exam_id|
+--------------------+-------------+--------------------+------------------+--------------------+------------------+------------+--------------------+--------------------+
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717442.735266|5a6745820eb8ab000...| 1516717442.735266|         1.0|2018-01-23T14:23:...|6d4089e4-bde5-4a2...|
|37f0a30a-7464-11e...|        false|Normal Forms and ...| 1516717377.639827|5a674541ab6b0a000...| 1516717377.639827|         1.0|2018-01-23T14:21:...|2fec1534-b41f-441...|
|4beeac16-bb83-4d5...|        false|The Principles of...| 1516738973.653394|5a67999d3ed3e3000...| 1516738973.653394|         1.0|2018-01-23T

Since now we have a structured table, we can land table to HDFS and name the table `exam`. To do so, we run the following command:

In [18]:
# Land table to HDFS
exam.write.parquet("/tmp/exam")

We have successfully save the `exam` table to cloudera HDFS with the name of `exam`. Now we register a temporary view with the table so that we can write spark SQL queries to find business insights about the data and answer our business questions.

In [19]:
# Register a temporary view for the table "extracted_exam"
exam.registerTempTable('exam_data')

# Questions

#### Question 1: How many assessments are in the dataset?

In [20]:
spark.sql('''
    select count(distinct(user_exam_id)) as num_of_assessments
    from exam_data'''
    ).show()

+------------------+
|num_of_assessments|
+------------------+
|              3242|
+------------------+



Answer: There are 3,242 assessments in the dataset.

#### Question 2: What's the name of your Kafka topic? How did you come up with that name?

Answer: The name of my Kafka topic is `student_assessment`. I come up with that name because the data is revolved around exam/assessment. Since I see that all these assessments are machine learning related, people who are taking the tests are most likely students. Hence I name the topic `student_assessment`.

#### Question 3: How many people took Learning Git?

In [21]:
spark.sql('''
    select count(distinct(user_exam_id)) as num_of_people_took_Learning_Git
    from exam_data
    where exam_name = 'Learning Git'
    ''').show()

+-------------------------------+
|num_of_people_took_Learning_Git|
+-------------------------------+
|                            390|
+-------------------------------+



Answer: 390 people took Learning Git.

#### Question 4: What is the least common course taken? And the most common?

In [59]:
# least common course taken
spark.sql('''
    select exam_name, count(distinct(user_exam_id)) as num_of_students
    from exam_data
    group by exam_name
    order by count(exam_name)
    limit 1
    ''').collect()

[Row(exam_name='Learning to Visualize Data with D3.js', num_of_students=1)]

In [60]:
# most common course
# use collect to show entire name of the field
spark.sql('''
    select exam_name, count(distinct(user_exam_id)) as num_of_students
    from exam_data
    group by exam_name
    order by count(exam_name) DESC
     limit 1
    ''').collect()

[Row(exam_name='Learning Git', num_of_students=390)]

Answer: The least common course taken is "Learning to Visualize Data with D3.js". The most common course taken is "Learning Git".

####  5. How many courses are there in the dataset?

In [23]:
spark.sql('''
    select count(distinct exam_name) as num_of_courses
    from exam_data
    ''').show()

+--------------+
|num_of_courses|
+--------------+
|           103|
+--------------+



Answer: There are 103 different courses in the dataset.

#### 6. What's the earliest and latest date for an assessment?

In [39]:
spark.sql('''
    select year(started_at) as year, month(started_at) as month, day(started_at) as day
    from exam_data
    where started_at = (select min(started_at) as min from exam_data)
    ''').show()

+----+-----+---+
|year|month|day|
+----+-----+---+
|2017|   11| 21|
+----+-----+---+



In [40]:
spark.sql('''
    select year(started_at) as year, month(started_at) as month, day(started_at) as day
    from exam_data
    where started_at = (select max(started_at) as max from exam_data)
    ''').show()

+----+-----+---+
|year|month|day|
+----+-----+---+
|2018|    1| 28|
+----+-----+---+



Answer: The earliest exam is on Novemember 21st in 2017, and the latest exam is January 28th on 2018.

#### 7. How many assessments have failed certification?

In [47]:
spark.sql('''
    select certification, count(distinct(user_exam_id)) as num_of_students
    from exam_data
    group by certification
    ''').show()

+-------------+----------------------------+
|certification|count(DISTINCT user_exam_id)|
+-------------+----------------------------+
|         null|                         132|
|        false|                        3110|
+-------------+----------------------------+



Answer: we see that 3,110 of the 3,242 assessments have failed certification, and the other 132 assessments have empty values. If the "certification" column represents that whether a student pass or fail the assessment, then this indicates that the dataset only records assessments in which students fail. This is an extremely important information, especially if our target variable is whether a student will pass the exam or not.