# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>
- TAs: Tong Zeng <tozeng@syr.edu>, Priya Matnani <psmatnan@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your http://notebook.acuna.io workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

In [2]:
# load these packages
import pyspark
from pyspark.ml import feature, classification
from pyspark.ml import Pipeline
from pyspark.sql import functions as fn
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import pandas as pd

# Part 2: Random Forest

In these questions, we will examine the famous Titanic dataset

In [3]:
# read-only
drop_cols = ['boat', 'body']
titanic_df = spark.read.csv('/datasets/titanic_original.csv', header=True, inferSchema=True).\
    drop(*drop_cols).\
    fillna('O').\
    dropna(subset=['pclass', 'age', 'sibsp', 'parch', 'fare', 'survived']).\
    select((fn.col('sex') == 'male').alias('is_male').cast('float'),           
           'pclass',
           'age',
           'sibsp',
           'parch',
           'fare',
           'survived')
training_df, validation_df, testing_df = titanic_df.randomSplit([0.6, 0.3, 0.1], seed=0)
titanic_df.printSchema()

root
 |-- is_male: float (nullable = false)
 |-- pclass: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- fare: double (nullable = true)
 |-- survived: integer (nullable = true)



# Question 1: (10 pts)

Create three pipelines that contain three different random forest classifiers that take in all features from the `titanic_df` (`is_male`, `pclass`, `age`, `sibsp`, `parch`, and `fare`) to predict whether someone survived (`survived`). Fit these pipelines to the training data

- `pipe_rf1`: Random forest with `maxDepth=1` and `numTrees=60`
- `pipe_rf2`: Random forest with `maxDepth=3` and `numTrees=40`
- `pipe_rf3`: Random forest with `maxDepth=6`, `numTrees=20`

In [4]:
# create the fitted pipelines `pipe_rf1`, `pipe_rf2`, and `pipe_rf3` here
# YOUR CODE HERE
# build a pipeline for analysis
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
va = VectorAssembler().setInputCols(training_df.columns[:-1]).setOutputCol('features')
rf1 =  RandomForestClassifier(maxDepth = 1, numTrees=60).setFeaturesCol('features').setLabelCol('survived')
rf2 =  RandomForestClassifier(maxDepth = 3, numTrees=40).setFeaturesCol('features').setLabelCol('survived')
rf3 =  RandomForestClassifier(maxDepth = 6, numTrees=20).setFeaturesCol('features').setLabelCol('survived')
pipe_rf1 = Pipeline(stages=[va, rf1]).fit(training_df)
pipe_rf2 = Pipeline(stages=[va, rf2]).fit(training_df)
pipe_rf3 = Pipeline(stages=[va, rf3]).fit(training_df)
# raise NotImplementedError()

In [5]:
# tests for 10 pts
np.testing.assert_equal(type(pipe_rf1.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(pipe_rf2.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(pipe_rf3.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(pipe_rf1.stages[1]), classification.RandomForestClassificationModel)
np.testing.assert_equal(type(pipe_rf2.stages[1]), classification.RandomForestClassificationModel)
np.testing.assert_equal(type(pipe_rf3.stages[1]), classification.RandomForestClassificationModel)
np.testing.assert_equal(type(pipe_rf1.transform(training_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_equal(type(pipe_rf2.transform(training_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_equal(type(pipe_rf3.transform(training_df)), pyspark.sql.dataframe.DataFrame)

# Question 2 (10 pts)

Use the following evaluator to compute the area under the curve of the models on validation data. Print the AUC of the three models and assign the best one (i.e., the best pipeline) to a variable `best_model`

In [6]:
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='survived')
# use it as follows:
# evaluator.evaluate(fitted_pipeline.transform(df)) -> AUC

In [7]:
# YOUR CODE HERE
AUC1 = evaluator.evaluate(pipe_rf1.transform(validation_df))
print("Model 1 AUC: ", AUC1)
AUC2 = evaluator.evaluate(pipe_rf2.transform(validation_df))
print("Model 2 AUC: ", AUC2)
AUC3 = evaluator.evaluate(pipe_rf3.transform(validation_df))
print("Model 3 AUC: ", AUC3)

# raise NotImplementedError()
#Model 1 AUC:  0.8171184392265194
#Model 2 AUC:  0.8422177140883977
#Model 3 AUC:  0.8411818024861878

Model 1 AUC:  0.8219311118784529
Model 2 AUC:  0.8363907113259668
Model 3 AUC:  0.8342541436464084


In [8]:
best_model = pipe_rf2 #Since the AUC of model 2 is highest, we assign second model to best_model variable

In [9]:
# tests for 10 pts
np.testing.assert_equal(type(best_model.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(best_model.stages[1]), classification.RandomForestClassificationModel)
np.testing.assert_equal(type(best_model.transform(validation_df)), pyspark.sql.dataframe.DataFrame)

# Question 3: 5 pts

Compute the AUC of the model on testing data, print it, and assign it to variable `AUC_best`

In [10]:
# create AUC_best below
# YOUR CODE HERE
AUC_best = evaluator.evaluate(pipe_rf2.transform(testing_df))
print("best_model AUC: ", AUC_best)
# raise NotImplementedError()

best_model AUC:  0.876068376068376


In [11]:
# tests for 5 pts
np.testing.assert_array_less(AUC_best, 1.)
np.testing.assert_array_less(0.5, AUC_best)

# Question 4: 5 pts

Using the parameters of the best model, create a new pipeline called `final_model` and fit it to the entire data (`titanic_df`)

In [12]:
# create the fitted pipeline `final_model` here
# YOUR CODE HERE
final_model = Pipeline(stages=[va, rf2]).fit(titanic_df)
# raise NotImplementedError()

In [13]:
# tests for 5 pts
np.testing.assert_equal(type(final_model.stages[0]), feature.VectorAssembler)
np.testing.assert_equal(type(final_model.stages[1]), classification.RandomForestClassificationModel)
np.testing.assert_equal(type(final_model.transform(titanic_df)), pyspark.sql.dataframe.DataFrame)

# Question 5: 10 + 5 pts

Create a pandas dataframe `feature_importance` with the columns `feature` and `importance` which contains the names of the features (`is_male`, `pclass`, etc.) and their feature importance as determined by the random forest of the final model. Sort the dataframe by `importance` in descending order.

In [14]:
# create feature_importance below
# YOUR CODE HERE
feature_importance = pd.DataFrame(list(zip(titanic_df.columns[:-1], final_model.stages[-1].featureImportances.toArray())),
            columns = ['feature', 'importance']).sort_values('importance', ascending=False)
# raise NotImplementedError()

In [15]:
# display it here
feature_importance

Unnamed: 0,feature,importance
0,is_male,0.648356
1,pclass,0.142416
5,fare,0.105381
2,age,0.052654
4,parch,0.030571
3,sibsp,0.020621


In [16]:
# tests for 10 pts
assert type(feature_importance) == pd.core.frame.DataFrame
np.testing.assert_array_equal(list(feature_importance.columns), ['feature', 'importance'])
np.testing.assert_array_equal(list(feature_importance.columns), ['feature', 'importance'])

**(5 pts)** Comment below on the importance that random forest has given to each feature. Are they reasonable? Do they tell you anything valuable about the titanic dataset? Answer in the cell below

I feel it is reasonable to assign importance to each feature, as it helps us in feature selection. It helps us identify the effect of each feature on the output label. In case of titanic data, with help of feature importance, we can conclude that male survival rate was higher tha female survival rate. Also, the class and age of traveller affected the person's survival chances. The results of this model are similar to the stories heard about titanic survivors, i.e. children and first class travellers were given a preference. Also, since Men are physically stronger than women, may be they survival rate was more. 

# Question 6:  5 pts.

Pick any of the trees from the final model and assign its `toDebugString` property to a variable `example_tree`. Print this variable and add comments to the cell describing how you think this particular tree is fitting the data

In [17]:
# create a variable example_tree with the toDebugString property of a tree from final_model.
# print this string and comment in this same cell about the branches that this tree fit
# YOUR CODE HERE
len(final_model.stages[-1].trees)
example_tree = final_model.stages[-1].trees[7].toDebugString
# raise NotImplementedError()

In [18]:
# tests for 5 points
assert type(example_tree) == str
assert 'DecisionTreeClassificationModel' in example_tree
assert 'feature 0' in example_tree
assert 'If' in example_tree
assert 'Else' in example_tree
assert 'Predict' in example_tree