# Education Technology Firm Business Analysis
### Ziling Huang

## Introduction
The goal of this report is to create a data pipeline using clients assessments data. The data will be used to provide insights to the data scientists for our clients to support product strategy and development decisions.

## Contents
#### 1. Step-by-Step Approach to Building the Pipeline   

#### 2. Business Questions

#### 3. Data Exploration : Steps and Assumptions

#### 4. Data Insights : Conclusion and Recommendations

#### 5. Data Issues

### 1. Step-by-Step Approach to Building the Pipeline  

Event logs from client assessments are stored in a nested json file called "assessments-attempts-20180128-121051-nested.json". I created a pipeline and sent the data record as one message in Kafka to publish and consume messages on topics for the users. Next, the data is read into Pyspark and transformed into a SQL query friendly format.

<b> Step 1 </b> : Spin up a docker container cluster using the following in Command line:  

`docker-compose up -d`  

This spins up a container cluster using the configuration specifications in the docker-compose.yml file and -d runs it in the background. The yml file contains ports for Zookeeper - resource manager, Kafka - Data manager, Cloudera Hadoop - HDFS  for Data Storage , Spark for Data transformation. 

<b> Step 2 </b> : Next I check the logs to make sure there is no error :

`docker-compose logs -f kafka`

<b> Step 3 </b> : Create a topic "assessments" to help store data for Kafka to pick up and send for processing :

`docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181`


<b> Step 4 </b> : Using curl I retrieve the json file and write it to the skh folder:

`curl -L -o assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp`


<b> Step 5 </b>: Read the data from the json file through jq formatting and then send it to Kafka cat in producer mode to produce message and send them to Kafka

`docker-compose exec mids bash -c "cat /w205/project-2-zilingh8/skh/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"`

 
<b> Step 6 </b>: Spin up pyspark to pick up the information from Kafka and transform data :

 `docker-compose exec spark pyspark`
 
<b> Step 7 </b> : Read data from Kafka from start to end and load in Pyspark :

`raw_assess=spark.read.format("kafka").option("kafka.bootstrap.servers","kafka:29092").option("subscribe","assessments").option("startingOffsets","earliest").option("endingOffsets","latest").load()`
 
 
<b> Step 8 </b> : Save cache of the data in Pyspark to suppress warnings. This is because Spark typically does lazy evaluations and caching forces spark to touch all the data:

`raw_assess.cache()`
 
<b> Step 9 </b> : Review the data schema :  

`raw_assess.printSchema()`

<b> Step 10 </b> : Retrieve the value in the data and cast it as string:

`assess_s= raw_assess.select(raw_assess.value.cast('string'))`

<b> Step 11 </b> : Write the data to HDFS in parquet format:

`assess_s.write.parquet("/tmp/assessments")`

<b> Step 12 </b> : Use library for linux work to switch file to uni encoding :

`import sys
sys.stdout=open(sys.stdout.fileno(),mode='w',encoding='utf8',buffering=1)
import json `

<b> Step 13 </b>: From spark sql use row function. Get RDD distributed datset format and apply a map to it and then convert it back into a dataframe

`from pyspark.sql import Row
extracted_assess= assess_s.rdd.map(lambda x : json.loads(x.value)).toDF()
extracted_assess.show()`




### 2. Business Questions

The goal of this report is to answer a few business questions, namely:

1. What constitutes an assessment and how many assessments are there in this dataset?
2. What is the total number of unique exam types available on the portal?
3. What are the top 5 most popular exams?
4. What are the bottom 5 least popular exams?
5. What is the time period range of the dataset?

### 3.a Data Exploration : Steps

<b>Step 1 </b>: Create a spark temp table to query extracted data for data exploration

`extracted_assess.registerTempTable('assess_temptab')`

<b> Step 2 </b> : Write spark sql to query data and understand relationship between ids and exam_names and what constitutes one assessment

`spark.sql("select base_exam_id, exam_name, user_exam_id, keen_id from assess_temptab limit 10").show()`

Count number of records to get number of assessments provided on our portal.
Answer is 3,280 assessments records taken during this time period.

`spark.sql("select count(user_exam_id) from assess_temptab").show()`

<div align="left">
<table>
    
|count(user_exam_id)|
|-------------------|
|               3280|
    
</table>
</div>



Check if each user_exam_id is a unique assessment. The number of distinct user_exam_ids is 3242 and is lower than the total number of records. 
 
 `spark.sql("select count(distinct(user_exam_id)) from assess_temptab").show()`
                                                 
|count(DISTINCT user_exam_id)|
|----------------------------|
|                        3242|


Find records where user_exam_id is more than 1 and examine them:


`d1=spark.sql("select exam_name, user_exam_id, count(user_exam_id) as count_uaid from assess_temptab GROUP BY exam_name, user_exam_id")
d1.registerTempTable('d2')
spark.sql("select * from d2 where count_uaid > 1 ").show()`
                         
|           exam_name|        user_exam_id|count_uaid|
|--------------------|--------------------|----------|
|Learning C# Best ...|3d63ec69-8d97-4f9...|         3|
|Learning C# Best ...|a45b5ee6-a4ed-4b1...|         3|
|Learning C# Desig...|fa23b287-0d0a-468...|         3|
|An Introduction t...|d4ab4aeb-1368-486...|         3|
|Intermediate C# P...|028ad26f-a89f-4a6...|         3|
|        Learning DNS|bd96cfbe-1532-4ba...|         3|
|Intermediate Pyth...|6e4889ab-5978-44b...|         2|
|Beginning C# Prog...|a244c11a-d890-4e3...|         3|
|Introduction to B...|b7ac6d15-97e1-4e9...|         3|
|Beginning C# Prog...|00745aef-f3af-412...|         3|
|Intermediate Pyth...|c1eb4d4a-d6ef-43e...|         2|
|Learning C# Best ...|ac80a11a-2e79-40e...|         3|
|Beginning C# Prog...|66d91177-c436-4ee...|         3|
|Learning C# Desig...|1e325cc1-47a9-480...|         3|
|        Learning Git|a7e6fc04-245f-4e3...|         3|
|Beginning C# Prog...|c320d47f-60d4-49a...|         3|
|Beginning C# Prog...|37cf5b0c-4807-421...|         3|
|Intermediate C# P...|949aa36c-74c7-4fc...|         3|
|Beginning C# Prog...|6132da16-2c0c-436...|         3|
|        Learning Git|cdc5859d-b332-4fb...|         3|


The data does not contain a unique record identifier and this is an issue because there could be duplicate records. An in-depth discussion on the needed assumptions this creates will provided in the assumption section below.

<b> Step 3 </b> : Use spark sql to understand how many unique exam names (courses) there are:

`spark.sql("select count(distinct(exam_name)) from assess_temptab").show()`

Answer is 103 unique exam names





<b> Step 4 </b> : Find the most popular exams and least popular exams.

Create a table and save to temp table :

`t1=spark.sql("select distinct(exam_name), count(exam_name) as count from assess_temptab GROUP BY exam_name")
t1.registerTempTable('t2')`

Show this table in descending order :

`spark.sql("select exam_name,count from t2 ORDER BY count desc").show()`

                                                  
|           exam_name|count|
|--------------------|-----|
|        Learning Git|  394|
|Introduction to P...|  162|
|Introduction to J...|  158|
|Intermediate Pyth...|  158|
|Learning to Progr...|  128|


Show in ascending order:

`spark.sql("select exam_name,count from t2 ORDER BY count").show()`

                                                   
|           exam_name|count|
|--------------------|-----|
|Learning to Visua...|    1|
|Native Web Apps f...|    1|
|Nulls, Three-valu...|    1|
|Operating Red Hat...|    1|
|The Closed World ...|    2|


Show how many times people took the foundational course "Learning Git":

`spark.sql("select exam_name, count from t2 WHERE exam_name='Learning Git'").show()`
                                                         
|   exam_name|count|
|------------|-----|
|Learning Git|  394|



<b> Step 5 </b> : Check the date time duration of the dataset
 
`c2= extracted_assess.select(extracted_assess.started_at.cast('timestamp'))
 c2.registerTempTable('c3')
 spark.sql("select min(started_at),max(started_at) from c3").show()`
 

|     min(started_at)|     max(started_at)|
|--------------------|--------------------|
|2017-11-21 00:42:...|2018-01-28 19:17:...|


 


### 3.b Data Exploration : Assumptions

<b> Assumption </b> : There were only 3,242 assessments taken over the period from Nov 11, 2017 to Jan 28, 2018.

Explanation : There are 3,280 records in the dataset but no unique identifier for each assessment taken. The max number of attempts for all records is 1.We have to assume that there are duplicate records in the data. Assuming that each user_exam_id should be a unique record, there are 38 duplicate records in the data given that the count of unique exam ids is 3,242.


### 4. Data Insights : Conclusion and Recommendations¶

1. The most popular courses are foundational courses like Introduction to Git, Introduction to Python, Intermediate Python, Introduction to Java 8 and Learning to Program with R. More resources can be devoted to marketing and developing these to improve the curriculum for users. Additionally, it looks like the primary user group of this portal are beginners or intermediate learners of data science tools and subjects.

2. The least popular courses are Learning to Visualize Data with D3, Native Web Apps for Android, Nulls, Three-valued Logic and Missing Information, Operating Red Hat Enterprise Linux Servers and The Closed World Assumption. Consideration needs to be given to identify the reasons for low demand through customer surveys, for example customer type served, marketing, quality of exam and course and improvement made. Otherwise, the course should be shut down and have its resources shifted to new opportunities.

3. There were 103 courses on the portal, this means on average each course generated 3 user assessments over the 2 months period. Courses which perform below this estimate can be considered below average and the opposite is true where the metric is above. A better metric can be devised if we had data on the length and complexity of the course which could also affect the number of user assessments taken over a given period of time.


### 5. Data Issues

A few data issues were identified during the exploration process. Namely:

1. Lack of a unique record identifier or possible duplicate assessment records see explanation under Section 3: Data Exploration and assumption made.

2. Errors in unnesting data in the sequences field of the data table due to multiple levels of nesting in arrays. 

I attempted to retrieve the data nested in arrays under the count field in order to find out how users are faring on these exams on average specifically by taking the correctly answered number of questions per user divided by the total number of questions per assessment.

The structure of the sequence field is as follows:

Map : [Questions] String, Array(Map(String,Boolean)) , [Count] String, Array(String)

An attempt to unnest this using the explode function in PySpark SQL did not work due to the multiple nesting structure under more than one field components which have arrays. When the explode function is used, the exploded items which still have arrays under them lose the granular array data under them.

`from pyspark.sql.functions import explode
extracted_assess.select(extracted_assess.sequences,explode(extracted_assess.sequences)).show(truncate=False)`

` a1=extracted_assess.select(explode(extracted_assess.sequences))
a2=a1.select(a1.key,explode(a1.value))
a3=a2.select(explode(a2.col))`


