# IST 718: Big Data Analytics

- Professor: Willard Williamson <wewillia@syr.edu>
- Faculty Assistant: Palaniappan Muthukkaruppan
## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers from your classmates.  Short code snippets are allowed from the internet.  Any code is allowed from the class text books or class provided code.__
- Please do not change the file names. The FAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and FAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`).

In [1]:
# create spark and sparkcontext objects
from pyspark.sql import SparkSession
from pyspark.sql import Row
import numpy as np

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

## Warning: Use exclusively Spark 1.6 when asked to do so and Spark 2.0 (dataframes) only in the last question. Do not use Pandas at all in this assignment

# Part 2: Data cleaning and basic analyses

In this part, you will learn to read data from non-standard formats, clean data, and produce some basic analysis of it.

We will use Spark 1.6 (`sparkContext` on variable `sc`) to load text files from which we will extract features that are predictive of a target value. Unfortunately, the data is stored in some non-standard format where each line contains the customer index, the feature index, and the value of the feature for that customer. Similarly, the target files contain in each line the customer index and the target value. We will load these files into two RDDs:

In [2]:
# Adjust the following lines as needed to read the data files
# If running on databricks, you will need to upload the data to databricks and then
# adjust the file path as demonstrated in class.
import os
import pandas as pd

db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")

def get_training_filename(data_file_name):    
    
    grading_env = os.getenv("GRADING_RUNTIME_ENV")
    
    if db_env != None:

        full_path_name = "/FileStore/tables/%s" % data_file_name

    elif grading_env != None:

        full_path_name = "%s/%s" % (grading_env, data_file_name)

    else:

        full_path_name = data_file_name
    

    return full_path_name
  
raw_features_rdd = sc.textFile(get_training_filename('hw2_dataset_features.txt'))
raw_target_rdd = sc.textFile(get_training_filename('hw2_dataset_targets.txt'))

An issue with the data is that there were problems transmitting the features and targets. If this happened, then a text `ERROR` or `ERROR TRANSFERRING` replaced the data. If you look at the first 10 values of the features and target RDD, you will see these types of lines:

```python
raw_features_rdd.take(10)
```

```
['<customer_feature customer_id=0 feature_id=0>-0.57</customer_feature>',
 '<customer_feature customer_id=0 feature_id=1>-0.38</customer_feature>',
 '<customer_feature customer_id=0 feature_id=2>0.00</customer_feature>',
 '<customer_feature customer_id=0 feature_id=3>-0.07</customer_feature>',
 '<customer_feature customer_id=0 feature_id=4>-0.28</customer_feature>',
 '<customer_feature customer_id=0 feature_id=5>-0.79</customer_feature>',
 'ERROR',
 '<customer_feature customer_id=0 feature_id=7>0.28</customer_feature>',
 '<customer_feature customer_id=0 feature_id=8>1.65</customer_feature>',
 '<customer_feature customer_id=0 feature_id=9>0.57</customer_feature>']
```

```python
raw_target_rdd.take(35)
```

```
['<customer_target customer_id=0>-252.49</customer_target>',
 '<customer_target customer_id=1>36.67</customer_target>',
 '<customer_target customer_id=2>138.02</customer_target>',
 '<customer_target customer_id=3>-429.54</customer_target>',
 '<customer_target customer_id=4>-18.23</customer_target>',
 '<customer_target customer_id=5>-5.52</customer_target>',
 '<customer_target customer_id=6>-31.96</customer_target>',
 'ERROR TRANSFERRING',
 'ERROR TRANSFERRING',
 '<customer_target customer_id=9>-111.88</customer_target>']
```

In [3]:
# try it yourself: 
raw_features_rdd.take(10)
raw_target_rdd.take(10)

['<customer_target customer_id=0>-252.49</customer_target>',
 '<customer_target customer_id=1>36.67</customer_target>',
 '<customer_target customer_id=2>138.02</customer_target>',
 '<customer_target customer_id=3>-429.54</customer_target>',
 '<customer_target customer_id=4>-18.23</customer_target>',
 '<customer_target customer_id=5>-5.52</customer_target>',
 '<customer_target customer_id=6>-31.96</customer_target>',
 'ERROR TRANSFERRING',
 'ERROR TRANSFERRING',
 '<customer_target customer_id=9>-111.88</customer_target>']

## Question 2.1 (10 pts):

Filter out the lines that contain errors and store them in `raw_features2_rdd` and `raw_targets_rdd` respectively. 

In [4]:
# create raw_features2_rdd and raw_targets2_rdd below
raw_features2_rdd = raw_features_rdd.filter(lambda x: 'ERROR' not in x)

raw_target2_rdd = raw_target_rdd.filter(lambda x: 'ERROR TRANSFERRING' not in x)

In [5]:
# check that things work
print(raw_features2_rdd.count())
print(raw_target2_rdd.count())

95036
8968


In [6]:
"""10 pts: Check that the lines are properly discarded"""
assert raw_features2_rdd.count() == 95036
assert raw_target2_rdd.count() == 8968

## Question 2.2 (10 pts):
You will further process `raw_features2_rdd` such that you will create a key-value RDD of the following form: the key is the customer index as an integer and the value is a dictionary whose key is a string `f_0`, `f_1`, ..., `f_9` for feature index 0, 1, ... 9, respetively, and the value is a floating point number of the feature value. 

Define a function `map_features2` that performs such key-value pair creation.

In [7]:
def map_features2(line):
    
    x = line.split()
    
    return [int(x[1][12:]), {'f_'+x[2][11]: float(x[2][13:-19])}]

For example, for the input element:

`'<customer_feature customer_id=4 feature_id=0>-0.79</customer_feature>'`

it should generate
```python
[4, {'f_0': -0.79}]
```

In [8]:
# test it here
raw_features2_rdd.\
    map(map_features2).\
    take(10)

[[0, {'f_0': -0.57}],
 [0, {'f_1': -0.38}],
 [0, {'f_2': 0.0}],
 [0, {'f_3': -0.07}],
 [0, {'f_4': -0.28}],
 [0, {'f_5': -0.79}],
 [0, {'f_7': 0.28}],
 [0, {'f_8': 1.65}],
 [0, {'f_9': 0.57}],
 [1, {'f_0': 0.5}]]

In [9]:
"""10 pts: Check that the new raw_features2_rdd and raw_target2_rdd RDDs are correct"""
# key is an integer
np.testing.assert_equal(type(raw_features2_rdd.map(map_features2).first()[0]), int)
# value is a dictionary
np.testing.assert_equal(type(raw_features2_rdd.map(map_features2).first()[1]), dict)

## Question 2.3 (5 pts):

You will create a function `map_target2` that will be applied to `raw_target2_rdd`. This function will create key-value pair where the key is the customer index as an integer and the value is the floating point representation of the target. Assign the resulting RDD to `raw_target3_rdd`. 

In [10]:
def map_target2(line):
    
    x = line.split()
    y = x[1].split('>')
    
    return (int(y[0][12:]), float(y[1][0:-17]))

# make the assignment here
raw_target3_rdd = raw_target2_rdd.map(map_target2)

A sample of results:

```python
raw_target2_rdd.map(map_target2).sortByKey().take(5)
```

```
[(0, -252.49), (1, 36.67), (2, 138.02), (3, -429.54), (4, -18.23)]
```

In [11]:
# try it yourself
raw_target2_rdd.map(map_target2).sortByKey().take(5)

[(0, -252.49), (1, 36.67), (2, 138.02), (3, -429.54), (4, -18.23)]

In [12]:
"""5 pts: Check that raw_target3_rdd contains the right values"""
# check types
np.testing.assert_equal(type(raw_target3_rdd.keys().first()), int)
np.testing.assert_equal(type(raw_target3_rdd.values().first()), float)
# the sum of all targets
np.testing.assert_approx_equal(raw_target3_rdd.values().sum(), -179351.71, significant=1)

## Question 2.4 (10 pts):

In this question, you will use map reduce to produce an RDD of key-value pairs where the key is the customer index and the value is a dictionairy with all the features and values associated with that customer. Notice that the map part of the map-reduce is already defined by `map_features2` on `raw_features2_rdd`. Therefore, define the proper `reduce_features2` function to produce the desired results. Create a RDD named `raw_features3_rdd` with the results:

In [13]:
def reduce_features2(v1, v2):
    
    return {**v1, **v2}

# Apply mapreduce to produce the raw_features3_rdd from raw_features2_rdd
raw_features3_rdd = raw_features2_rdd.map(map_features2).reduceByKey(reduce_features2)

Running the map reduce should produce the following example result:
```python
raw_features3_rdd.sortByKey().take(2)
```

```console
[(0,
  {'f_0': -0.57,
   'f_1': -0.38,
   'f_2': 0.0,
   'f_3': -0.07,
   'f_4': -0.28,
   'f_5': -0.79,
   'f_7': 0.28,
   'f_8': 1.65,
   'f_9': 0.57}),
 (1,
  {'f_0': 0.5,
   'f_1': 0.8,
   'f_2': -0.49,
   'f_3': 0.25,
   'f_4': 0.37,
   'f_5': 0.73,
   'f_6': -0.43,
   'f_7': 0.89,
   'f_8': -1.85,
   'f_9': -0.44})]
```

In [14]:
"""10 pts: Check that raw_features3_rdd has the correct format and values. There could be hidden tests!"""
# key is an integer
np.testing.assert_equal(type(raw_features3_rdd.first()[0]), int)
# value is a dictionary
np.testing.assert_equal(type(raw_features3_rdd.first()[1]), dict)

## Question 2.5 (20 pts):

Join the two RDDs `raw_target3_rdd` and `raw_features3_rdd` to create an RDD with elements of the form

`[customer_index, (target, feature_dict)]`

where `target` comes from `raw_target3_rdd`, and `feature_dict` is the dictionary with features from `raw_features3_rdd`.

Assign to `rdd_complete` a filtered version of the join for customers who have all 10 features and a target.

Finally use `rdd_complete_rows` to create an RDD of `Row` objects where you map each entry to the following format:

`Row(customer_index,  f_0,  f_1,  f_2,  f_3,  f_4,  f_5,  f_6,  f_7,  f_8,  f_9, target)`

For example, from `rdd_complete`:

```python
rdd_complete.sortByKey().first()
```
should return
```
(1,
 (36.67,
  {'f_0': 0.5,
   'f_1': 0.8,
   'f_2': -0.49,
   'f_3': 0.25,
   'f_4': 0.37,
   'f_5': 0.73,
   'f_6': -0.43,
   'f_7': 0.89,
   'f_8': -1.85,
   'f_9': -0.44}))
```

In [15]:
# create rdd_complete here
rdd_complete = raw_target3_rdd.join(raw_features3_rdd).filter(lambda x: (len(x[1][1]) == 10) and (x[1][0] != 0))

In [16]:
"""10 pts: Test if `rdd1` has the right data. Remember that there could be hidden tests!"""
# number of elements expected
np.testing.assert_equal(rdd_complete.count(), 5379)

In [17]:
# here define the function to_row that a Row object for an entry of rdd_complete
def to_row(e):
    
    return Row(customer_index = e[0], 
               f_0 = e[1][1].get('f_0'), 
               f_1 = e[1][1].get('f_1'), 
               f_2 = e[1][1].get('f_2'), 
               f_3 = e[1][1].get('f_3'), 
               f_4 = e[1][1].get('f_4'), 
               f_5 = e[1][1].get('f_5'), 
               f_6 = e[1][1].get('f_6'), 
               f_7 = e[1][1].get('f_7'), 
               f_8 = e[1][1].get('f_8'), 
               f_9 = e[1][1].get('f_9'), 
               target = e[1][0])

complete_df = rdd_complete.map(to_row).toDF()

In [18]:
# test
rdd_complete.map(to_row).toDF().orderBy('customer_index').show(5)

+--------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------+
|customer_index|  f_0|  f_1|  f_2|  f_3|  f_4|  f_5|  f_6|  f_7|  f_8|  f_9| target|
+--------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------+
|             1|  0.5|  0.8|-0.49| 0.25| 0.37| 0.73|-0.43| 0.89|-1.85|-0.44|  36.67|
|             3| 0.14|-0.87|-0.94| 0.09|-0.69|-0.29|-0.45| -0.6|-1.28|-0.38|-429.54|
|             4|-0.79|-0.28|-0.26| 0.28|-0.17|-0.51| 0.44|-0.62| 1.82|-0.35| -18.23|
|             5|-0.11| 0.86| -1.3|-0.09|-0.12|-0.47| 0.05| 0.98|-1.59|  0.3|  -5.52|
|             9| 0.57| 0.07|-0.31|-0.92|-0.26| -0.9|-0.06|-0.76| 1.39| 0.01|-111.88|
+--------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------+
only showing top 5 rows



In [19]:
# 10 pts test that the creation of the row is successful
assert rdd_complete.map(to_row).toDF()
np.testing.assert_array_equal(rdd_complete.map(to_row).toDF().columns, 
              ['customer_index',
 'f_0',
 'f_1',
 'f_2',
 'f_3',
 'f_4',
 'f_5',
 'f_6',
 'f_7',
 'f_8',
 'f_9',
 'target'])

## Question 2.6 (20 pts):

We will now use the `to_row` function created above to create a dataframe of the data `df`

In [20]:
df = rdd_complete.map(to_row).toDF()

Explore the dataframe a bit:

```python
df.orderBy('customer_index').show(5)
```

```console
+--------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------+
|customer_index|  f_0|  f_1|  f_2|  f_3|  f_4|  f_5|  f_6|  f_7|  f_8|  f_9| target|
+--------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------+
|             1|  0.5|  0.8|-0.49| 0.25| 0.37| 0.73|-0.43| 0.89|-1.85|-0.44|  36.67|
|             3| 0.14|-0.87|-0.94| 0.09|-0.69|-0.29|-0.45| -0.6|-1.28|-0.38|-429.54|
|             4|-0.79|-0.28|-0.26| 0.28|-0.17|-0.51| 0.44|-0.62| 1.82|-0.35| -18.23|
|             5|-0.11| 0.86| -1.3|-0.09|-0.12|-0.47| 0.05| 0.98|-1.59|  0.3|  -5.52|
|             9| 0.57| 0.07|-0.31|-0.92|-0.26| -0.9|-0.06|-0.76| 1.39| 0.01|-111.88|
+--------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------+
only showing top 5 rows
```

In [21]:
# explore it yourself
df.orderBy('customer_index').show(5)

+--------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------+
|customer_index|  f_0|  f_1|  f_2|  f_3|  f_4|  f_5|  f_6|  f_7|  f_8|  f_9| target|
+--------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------+
|             1|  0.5|  0.8|-0.49| 0.25| 0.37| 0.73|-0.43| 0.89|-1.85|-0.44|  36.67|
|             3| 0.14|-0.87|-0.94| 0.09|-0.69|-0.29|-0.45| -0.6|-1.28|-0.38|-429.54|
|             4|-0.79|-0.28|-0.26| 0.28|-0.17|-0.51| 0.44|-0.62| 1.82|-0.35| -18.23|
|             5|-0.11| 0.86| -1.3|-0.09|-0.12|-0.47| 0.05| 0.98|-1.59|  0.3|  -5.52|
|             9| 0.57| 0.07|-0.31|-0.92|-0.26| -0.9|-0.06|-0.76| 1.39| 0.01|-111.88|
+--------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-------+
only showing top 5 rows



The subpackage `pyspark.ml.functions` (aliased as `fn` below) contains many common functions for data analysis. Find the function for computing the __correlation__ (the strength of the linear relationship between two variables) between two columns, the function for computing __absolute__ values, and create a data frame `correlations_df` that contains the following columns in the following order:

1. `c0_target`: correlation between feature 0 and target
1. `c1_target`: correlation between feature 1 and target
1. `c2_target`: correlation between feature 2 and target
1. `c3_target`: correlation between feature 3 and target
1. `c4_target`: correlation between feature 4 and target
1. `c5_target`: correlation between feature 5 and target
1. `c6_target`: correlation between feature 6 and target
1. `c7_target`: correlation between feature 7 and target
1. `c8_target`: correlation between feature 8 and target
1. `c9_target`: correlation between feature 9 and target
1. `sig0`: boolean `true` if the absolute value of the correlation between feature 0 and target is greater than 0.5, `false` o.w.
1. `sig1`: boolean `true` if the absolute value of the correlation between feature 1 and target is greater than 0.5, `false` o.w.
1. `sig2`: boolean `true` if the absolute value of the correlation between feature 2 and target is greater than 0.5, `false` o.w.
1. `sig3`: boolean `true` if the absolute value of the correlation between feature 3 and target is greater than 0.5, `false` o.w.
1. `sig4`: boolean `true` if the absolute value of the correlation between feature 4 and target is greater than 0.5, `false` o.w.
1. `sig5`: boolean `true` if the absolute value of the correlation between feature 5 and target is greater than 0.5, `false` o.w.
1. `sig6`: boolean `true` if the absolute value of the correlation between feature 6 and target is greater than 0.5, `false` o.w.
1. `sig7`: boolean `true` if the absolute value of the correlation between feature 7 and target is greater than 0.5, `false` o.w.
1. `sig8`: boolean `true` if the absolute value of the correlation between feature 8 and target is greater than 0.5, `false` o.w.
1. `sig9`: boolean `true` if the absolute value of the correlation between feature 9 and target is greater than 0.5, `false` o.w.

**Hint: Remember that you can pass a list of columns to `df.select`. You can create such list with list comprehension, saving a lot of code**

In [22]:
# import the package functions as fn
from pyspark.sql import functions as fn

In [23]:
# apply some function to the columns: df.select(...)

In [24]:
# Create the dataframe `correlations_df` here
correlations_df = df.select([fn.corr('f_' + str(i), 'target').alias('c' + str(i) +'_target')  for i in range(10)] + 
                            [(fn.abs(fn.corr('f_' + str(i), 'target')) > 0.5).alias('sig' + str(i)) for i in range(10)])

In [25]:
"""20 pts: Check that the dataframe has the correct columns and values. There could be hidden tests!"""
# check column names
column_names = ['c' + str(fi) + '_target' for fi in range(10)] + \
               ['sig' + str(fi) for fi in range(10)]

# column's names and positions in the right order
np.testing.assert_equal(correlations_df.columns, column_names)

from pyspark.sql.types import DoubleType, BooleanType


# the types are correct
np.testing.assert_equal([type(f.dataType) for f in correlations_df.schema.fields], 
                        10*[DoubleType] + 10*[BooleanType])

# the values are correct
np.testing.assert_approx_equal(correlations_df.toPandas().sum().sum(), 
                               4.963039476979365,
                               significant=1)