# HackerRank Lead Data Engineer Interview

## ETL Pipeline Development

During this interview, we aim to evaluate several skills:
1. Transform data using SQL
2. Manage dataframes with PySpark or similar technologies
3. Identify and troubleshoot inconsistencies in data

## Problem Statement

When you are solving a test on the HackerRank platform, our platform collects click stream data on certain user actions. For example, we receive ping data when you run code, submit code, or view different questions. In this notebook, you will be manipulating a similar set of the click stream data to extract certain features.

## Import

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Create Spark Session

In [2]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("etl_pipeline_development") \
                    .enableHiveSupport() \
                    .getOrCreate()

## Questions

### Relevant Tables

\* **Perform any cleaning, exploratory analysis, and/or visualizations for the provided data as needed.**

There are several `csv` files we will be using for this exercise:
1. `ping_events.csv`
2. `company_candidates.csv`

### Question 1:

`ping_events` schema:

|  column name  |  data type  |
|---------------|-------------|
|  attempt_id   |  int        |
|  event_id     |  int        |
|  inserttime   |  datetime   |
|  metadata     |  string     |

In the `metadata` column, `qno` represents the question number within the test and `question_id` represents the question id stored in our database.

Sample rows from `ping_events`:

|  attempt_id  |  event_id  |  inserttime  |  metadata                      |
|:------------:|:----------:|:------------:|:-------------------------------|
|      15      |  2         |  05:40       | {"question_id": 101, "qno": 1} |
|      15      |  2         |  05:45       | {"question_id": 103, "qno": 3} |
|      15      |  3         |  05:46       | {"question_id": 204, "qno": 2} |
|      15      |  3         |  05:50       | {"question_id": 101, "qno": 1} |
|      15      |  1         |  05:55       | {}                             |
|      16      |  3         |  06:20       | {"question_id": 103, "qno": 2} |
|      16      |  3         |  06:50       | {"question_id": 101, "qno": 1} |
|      16      |  1         |  07:10       | {}                             |

Write executable PySpark SQL to create a table with `event_id`, `inserttime`, and `metadata` as an array of tuples grouped by `attempt_id` in a column named `data`:

\* **Note that the data is in array form under one `attempt_id`**

|   attempt_id   |   data                                       |
|:--------------:|:---------------------------------------------|
|       15       | [                                            |
|                |  (2, 05:40, {"question_id": 101, "qno": 1}), |
|                |  (2, 05:45, {"question_id": 103, "qno": 3}), |
|                |  (3, 05:46, {"question_id": 204, "qno": 2}), |
|                |  (3, 05:50, {"question_id": 101, "qno": 1}), |
|                |  (1, 05:55, {})                              |
|                | ]                                            |
|       16       | [                                            |
|                |  (3, 06:20, {"question_id": 103, "qno": 2}), |
|                |  (3, 06:50, {"question_id": 101, "qno": 1}), |
|                |  (1, 07:10, {})                              |
|                | ]                                            |

### Solution:

In [7]:
ping_events = spark.read.csv("./ping_events.csv", header=True)

In [8]:
ping_events.show()

+----------+--------+-------------------+--------------------+
|attempt_id|event_id|         inserttime|            metadata|
+----------+--------+-------------------+--------------------+
|         1|       1|12/31/2013 05:32:33|                  {}|
|         1|       2|12/31/2013 05:32:33|                  {}|
|         1|       3|12/31/2013 05:32:46|{'qid': '1455', '...|
|         1|       6|12/31/2013 05:33:33|                  {}|
|         1|       6|12/31/2013 05:34:33|                  {}|
|         1|       6|12/31/2013 05:35:33|                  {}|
|         1|       6|12/31/2013 05:36:34|                  {}|
|         1|       6|12/31/2013 05:37:34|                  {}|
|         1|       3|12/31/2013 05:37:53|{'qid': '117086',...|
|         1|       2|12/31/2013 05:38:07|                  {}|
|         1|       3|12/31/2013 05:38:09|{'qid': '117086',...|
|         1|       3|12/31/2013 05:38:15|{'qid': '1455', '...|
|         1|       6|12/31/2013 05:38:33|              

In [9]:
ping_events.createOrReplaceTempView("ping_events")

In [27]:
collected_data = spark.sql("""
    select
        int(attempt_id) as attempt_id,
        collect_list(struct(int(event_id) as event_id, inserttime, metadata)) as data
    from ping_events
    group by attempt_id
    order by int(attempt_id)
""")

In [28]:
collected_data.show()

+----------+--------------------+
|attempt_id|                data|
+----------+--------------------+
|         1|[[1,12/31/2013 05...|
|         2|[[1,12/31/2013 05...|
|         3|    [[null,null,{}]]|
|         4|    [[null,null,{}]]|
|         5|[[1,01/12/2014 04...|
|         6|    [[null,null,{}]]|
|         7|    [[null,null,{}]]|
|         8|[[6,01/18/2014 05...|
|         9|[[1,01/21/2014 12...|
|        10|    [[null,null,{}]]|
|        11|    [[null,null,{}]]|
|        12|[[1,02/13/2014 14...|
|        13|[[1,02/19/2014 09...|
|        14|    [[null,null,{}]]|
|        15|[[1,03/05/2014 12...|
|        16|[[6,03/05/2014 16...|
|        17|[[1,03/07/2014 13...|
|        18|[[1,03/13/2014 13...|
|        19|[[1,03/26/2014 16...|
|        20|[[1,04/04/2014 04...|
+----------+--------------------+
only showing top 20 rows



In [29]:
collected_data.printSchema()

root
 |-- attempt_id: integer (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- event_id: integer (nullable = true)
 |    |    |-- inserttime: string (nullable = true)
 |    |    |-- metadata: string (nullable = true)



In [30]:
pd_data = collected_data.toPandas()

In [31]:
pd_data

Unnamed: 0,attempt_id,data
0,1,"[(1, 12/31/2013 05:32:33, {}), (2, 12/31/2013 ..."
1,2,"[(1, 12/31/2013 05:37:14, {}), (2, 12/31/2013 ..."
2,3,"[(None, None, {})]"
3,4,"[(None, None, {})]"
4,5,"[(1, 01/12/2014 04:34:55, {}), (2, 01/12/2014 ..."
5,6,"[(None, None, {})]"
6,7,"[(None, None, {})]"
7,8,"[(6, 01/18/2014 05:13:06, {}), (6, 01/18/2014 ..."
8,9,"[(1, 01/21/2014 12:24:18, {}), (2, 01/21/2014 ..."
9,10,"[(None, None, {})]"


In [33]:
pd_data.to_csv("./collected_data.csv", header=True, index=False)

### Question 2:

Using PySpark, process the resulting table above to determine the time spent in seconds on each attempt. Do not use Pandas to perform this transformation.

Each `inserttime` event denotes a user-action. The method we use to calculate the `time_spent` transformation is by looking at the difference between the latest ping and the earliest ping `inserttime`s. For attempt 15, the candidate first looked at `qno 1` at 05:40 then ended the test at 05:55. So we know the user spent 15 minutes or 900 seconds on the test.

Using this method, the above table should be transformed into:

|   attempt_id   |   test_time_spent   |
|:--------------:|:--------------------|
|     15         | 900                 |
|     16         | 3000                |

### Notes

- You may need more imports for this section. We have included a few to help you out!
- The `datetime` module will be very helpful here: https://docs.python.org/2/library/datetime.html

In [127]:
from pyspark.sql import Row
from pyspark.sql.types import StringType, IntegerType, BooleanType, FloatType, ArrayType, StructType, StructField, DateType, TimestampType
from pyspark.sql.functions import col, first, last, asc, desc, mean, avg, count, explode, udf

from datetime import datetime, timedelta
import json

### Solution:

In [148]:
collected_data.rdd.take(1)

[Row(attempt_id=1, data=[Row(event_id=1, inserttime='12/31/2013 05:32:33', metadata='{}'), Row(event_id=2, inserttime='12/31/2013 05:32:33', metadata='{}'), Row(event_id=3, inserttime='12/31/2013 05:32:46', metadata="{'qid': '1455', 'qno': '1'}"), Row(event_id=6, inserttime='12/31/2013 05:33:33', metadata='{}'), Row(event_id=6, inserttime='12/31/2013 05:34:33', metadata='{}'), Row(event_id=6, inserttime='12/31/2013 05:35:33', metadata='{}'), Row(event_id=6, inserttime='12/31/2013 05:36:34', metadata='{}'), Row(event_id=6, inserttime='12/31/2013 05:37:34', metadata='{}'), Row(event_id=3, inserttime='12/31/2013 05:37:53', metadata="{'qid': '117086', 'qno': '2'}"), Row(event_id=2, inserttime='12/31/2013 05:38:07', metadata='{}'), Row(event_id=3, inserttime='12/31/2013 05:38:09', metadata="{'qid': '117086', 'qno': '2'}"), Row(event_id=3, inserttime='12/31/2013 05:38:15', metadata="{'qid': '1455', 'qno': '1'}"), Row(event_id=6, inserttime='12/31/2013 05:38:33', metadata='{}'), Row(event_id=

In [160]:
def process_data(row):
    attempt_id = row['attempt_id']
    data = row['data']
    
    try:
        starttime = datetime.strptime(data[0]['inserttime'], '%m/%d/%Y %H:%M:%S')
    except:
        starttime = datetime(2018,1,1,12,0,0)
        
    try:
        endtime = datetime.strptime(data[-1]['inserttime'], '%m/%d/%Y %H:%M:%S')
    except:
        endtime = datetime(2018,1,1,12,0,0)
    
    time_spent = (endtime - starttime).total_seconds()
    
    return [Row(attempt_id=attempt_id, test_time_spent=time_spent)]

In [161]:
solve_times_rdd = collected_data.rdd.flatMap(lambda x: process_data(x))

In [162]:
solve_times = spark.createDataFrame(solve_times_rdd)

In [163]:
solve_times.show()

+----------+---------------+
|attempt_id|test_time_spent|
+----------+---------------+
|         1|         5341.0|
|         2|         5399.0|
|         3|            0.0|
|         4|            0.0|
|         5|         5401.0|
|         6|            0.0|
|         7|            0.0|
|         8|          -59.0|
|         9|           61.0|
|        10|            0.0|
|        11|            0.0|
|        12|         1431.0|
|        13|         4989.0|
|        14|            0.0|
|        15|         1840.0|
|        16|         3749.0|
|        17|         5399.0|
|        18|         5393.0|
|        19|         5226.0|
|        20|         5392.0|
+----------+---------------+
only showing top 20 rows



In [169]:
st_pandas = solve_times.toPandas()

In [171]:
st_pandas.to_csv("attempt_solve_times.csv", header=True, index=False)

### Question 3:

We want to create a table with `company_id` as well:

|   company_id   |   attempt_id   |   time_spent   |
|:--------------:|:--------------:|:---------------|
|   1            |     15         | 900            |
|   2            |     16         | 3000           |

To get the `company_id` associated with each `attempt_id`, you can use `company_candidates.csv`. After you generate the above table, please store it as an interim table called `attempt_times`. 

`company_candidates` schema:

|  column name  |  data type  |
|---------------|-------------|
|  attempt_id   |  int        |
|  company_id   |  int        |

### Solution:

In [154]:
company_candidates = spark.read.csv("./company_candidates.csv", header=True)

In [155]:
company_candidates.show()

+----------+----------+
|attempt_id|company_id|
+----------+----------+
|         1|         1|
|         2|         2|
|         3|         4|
|         4|         4|
|         5|         1|
|         6|         3|
|         7|         2|
|         8|         4|
|         9|         1|
|        10|         3|
|        11|         2|
|        12|         3|
|        13|         4|
|        14|         3|
|        15|         3|
|        16|         1|
|        17|         3|
|        18|         3|
|        19|         1|
|        20|         4|
+----------+----------+
only showing top 20 rows



In [165]:
solve_times.createOrReplaceTempView("solve_times")
company_candidates.createOrReplaceTempView("company_candidates")

In [166]:
candidates_test_times = spark.sql("""
    select
        cc.*,
        st.test_time_spent
    from company_candidates cc
    left join solve_times st on cc.attempt_id=st.attempt_id
    order by st.attempt_id
""")

In [168]:
candidates_test_times.show()

+----------+----------+---------------+
|attempt_id|company_id|test_time_spent|
+----------+----------+---------------+
|         1|         1|         5341.0|
|         2|         2|         5399.0|
|         3|         4|            0.0|
|         4|         4|            0.0|
|         5|         1|         5401.0|
|         6|         3|            0.0|
|         7|         2|            0.0|
|         8|         4|          -59.0|
|         9|         1|           61.0|
|        10|         3|            0.0|
|        11|         2|            0.0|
|        12|         3|         1431.0|
|        13|         4|         4989.0|
|        14|         3|            0.0|
|        15|         3|         1840.0|
|        16|         1|         3749.0|
|        17|         3|         5399.0|
|        18|         3|         5393.0|
|        19|         1|         5226.0|
|        20|         4|         5392.0|
+----------+----------+---------------+
only showing top 20 rows



### Bonus:

Using PySpark, process the resulting table above to determine the time spent on each question. The dictionaries should follow this format: `{question_id: time_in_seconds}`. Do not use Pandas to perform this transformation.

Each `inserttime` event denotes a user-action. The method we use to calculate the `time_spent` transformation is by looking at the difference between consecutive ping `inserttime`s. For attempt 15, the candidate first looked at `qno 1` at 05:40 then `qno 3` at 05:45. So we know the user spent 5 minutes so far on `qno 1`.

Using this method, the above table should be transformed into:

|   attempt_id   |   time_spent              |
|:--------------:|:--------------------------|
|     15         | {"1": 10, "2": 4, "3": 1} |
|     16         | {"1": 20, "2": 30}        |

### Solution: