# Project 2: Tracking User Activity

## W205 Section 4 - Kevin Crook

## Prepared by Kevin Hartman - 3/10/2020


#### Problem Statement

- You work at an ed tech firm. You've created a service that delivers assessments, and now lots of different customers (e.g., Pearson) want to publish their assessments on it. You need to get ready for data scientists who work for these customers to run queries on the data.

#### Tasks

Prepare the infrastructure to land the data in the form and structure it needs
to be to be queried.  You will need to:

- Publish and consume messages with Kafka
- Use Spark to transform the messages
- Use Spark to transform the messages so that you can land them in HDFS

#### Deliverables
 - The `docker-compose.yml` used for spinning up the pipeline
 - The history of the console (e.g. `history > (user-name)-history.txt` )
 - A notebook containing the results of sample query executions in Spark
   - Include answers to 1 to 3 basic business questions for demonstration
   - Also include lines relevant to the pipeline initialization process for reproducibility
 - Additional files and scripts used for processing

### Perform pre-step

This notebook is designed to be run from a pyspark container.

In order to set up your enviroment, follow the instructions in the [README-Setup](README-Setup.md) markdown file.

The setup will perform the following tasks in a shell script:


1. Launch the container (performed in `project2-stack.sh`)
```
docker-compose -f docker-compose.yml up
```
2. Create the kafka topic for the assessment data (performed in `startup.sh`).
```
docker-compose exec kafka kafka-topics --create --topic assessments --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181
```
3. Establish a symbolic link back to the main directory (so we can access files)
```
docker-compose exec spark ln -s /w205 w205
```
4. Download the assessment data (a json file)
```
mkdir data
curl -L -o data/assessment-attempts-20180128-121051-nested.json https://goo.gl/ME6hjp
```
5. Publish the data onto the kafka topic we created in step 2.
```
docker-compose exec mids bash -c "cat /w205/project-2-kevin-hartman/data/assessment-attempts-20180128-121051-nested.json | jq '.[]' -c | kafkacat -P -b kafka:29092 -t assessments"
```
6. Launch our Jupyter Notebook through pyspark (so we have access to the pyspark environment in this notebook).
```
docker-compose exec spark env PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port 8888 --ip 0.0.0.0 --allow-root' pyspark
```

### Confirm access to environment

After following the steps in the README-Setup the environment should be ready to go. We'll confirm by executing a few spark command statements.


In [1]:
spark

In [2]:
sc

### Create access to the assessments topic

Now we'll set up an RDD that reads from a kafka topic called 'assessments'. The assessments topic was created in the setup script and has records that were published from file containing a list of assessments.

In [3]:
raw_assessments = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","assessments").option("startingOffsets", "earliest").option("endingOffsets", "latest").load() 

raw_assessments.cache()

raw_assessments.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



The 'values' element contains our assessment payload. Let's extract them and create a temp table for easy querying.

In [4]:
assessments = raw_assessments.select(raw_assessments.value.cast('string'))

In [5]:
import json

from pyspark.sql import Row

extracted_assessments = assessments.rdd.map(lambda x: Row(**json.loads(x.value))).toDF()

In [6]:
extracted_assessments.registerTempTable('assessments')

### Run some queries

Let's test it by seeing how many assessments there are.

In [7]:
spark.sql("select count(*) from assessments").show()

+--------+
|count(1)|
+--------+
|    3280|
+--------+



How many courses?

In [8]:
spark.sql("select count(*) from (select distinct base_exam_id from assessments)").show()

+--------+
|count(1)|
+--------+
|     107|
+--------+



How many people took the course *Introduction to Python*?

In [9]:
spark.sql("select exam_name as course, count(*) as num_takers from assessments where exam_name like 'Introduction to Python' group by exam_name").show()


+--------------------+----------+
|              course|num_takers|
+--------------------+----------+
|Introduction to P...|       162|
+--------------------+----------+



What is the *least* popular course?

In [10]:
spark.sql("select exam_name as course, count(*) as num_takers from assessments group by exam_name order by num_takers asc limit 1").show()

+--------------------+----------+
|              course|num_takers|
+--------------------+----------+
|Learning to Visua...|         1|
+--------------------+----------+



What is the *most* popular course?

In [11]:
spark.sql("select exam_name as course, count(*) as num_takers from assessments group by exam_name order by num_takers desc limit 1").show()

+------------+----------+
|      course|num_takers|
+------------+----------+
|Learning Git|       394|
+------------+----------+



What is the average score and total number of exams taken (including retries) for the 20 most popular courses?

First we'll define a lambda function to compute the correct totals and average scores.

In [12]:
def my_lambda_correct_total(x):
    
    raw_dict = json.loads(x.value)
    my_list = []
    
    if "sequences" in raw_dict:
        
        if "counts" in raw_dict["sequences"]:
            
            if "correct" in raw_dict["sequences"]["counts"] and "total" in raw_dict["sequences"]["counts"]:
                    
                my_dict = {"keen_id" : raw_dict["keen_id"],
                           "correct" : raw_dict["sequences"]["counts"]["correct"], 
                           "total" : raw_dict["sequences"]["counts"]["total"]}
                my_list.append(Row(**my_dict))
    
    return my_list

my_correct_total = assessments.rdd.flatMap(my_lambda_correct_total).toDF()

my_correct_total.registerTempTable('ct')

Now we'll execute the query.

In [13]:
spark.sql("select a.exam_name as exam, " +
          "count(ct.correct) as num_takers, " +
          "avg(ct.correct / ct.total)*100 as avg_score " +
          "from assessments a join ct on a.keen_id = ct.keen_id " +
          "group by a.exam_name " +
          "order by num_takers desc " +
          "limit 20").show()

+--------------------+----------+------------------+
|                exam|num_takers|         avg_score|
+--------------------+----------+------------------+
|        Learning Git|       406| 68.27586206896554|
|Introduction to P...|       162| 56.66666666666664|
|Intermediate Pyth...|       162| 50.92592592592593|
|Introduction to J...|       158| 87.59493670886073|
|Beginning C# Prog...|       131|56.297709923664115|
|Learning to Progr...|       128| 54.46428571428572|
|Introduction to M...|       119| 68.69747899159664|
|Software Architec...|       109|47.935779816513765|
|    Learning Eclipse|        85| 70.58823529411762|
|Introduction to B...|        81| 64.50617283950618|
|Learning Apache M...|        80|           60.9375|
|Beginning Program...|        79| 76.58227848101265|
|       Mastering Git|        77| 58.76623376623377|
|Advanced Machine ...|        67| 72.38805970149254|
|Learning Linux Sy...|        59| 55.50847457627118|
|JavaScript: The G...|        58| 65.517241379

### Save our extract data

Finally we'll save our extracted data to a parquet file

In [14]:
extracted_assessments.write.parquet("/tmp/extracted_assessments")

To confirm the file is saved, execute the following command from your console

`docker-compose exec cloudera hadoop fs -ls /tmp/extracted_assessments`

The results should look something like this:

`Found 2 items
-rw-r--r--   1 root supergroup          0 2020-03-10 02:42 /tmp/extracted_assessments/_SUCCESS
-rw-r--r--   1 root supergroup     345388 2020-03-10 02:42 /tmp/extracted_assessments/part-00000-b996162c-e9d8-4393-a0e4-2c8045181937-c000.snappy.parquet`


### Close the notebook and shutdown the containers

To exit this report, kindly halt the kernal on this notebook and follow to the container shutdown instructions on the [README-Setup](README-Setup.md) page.

For convenience, the script to shutdown the containers is here.

```
./bin/shutdown.sh
```

Which will perform the command

```
docker-compose -f docker-compose.yml down
```