# IST 718: Big Data Analytics

- Professor: Daniel Acuna <deacuna@syr.edu>

## General instructions:

- You are welcome to discuss the problems with your classmates but __you are not allowed to copy any part of your answers either from your classmates or from the internet__
- You can put the homework files anywhere you want in your http://notebook.acuna.io workspace but _do not change_ the file names. The TAs and the professor use these names to grade your homework.
- Remove or comment out code that contains `raise NotImplementedError`. This is mainly to make the `assert` statement fail if nothing is submitted.
- The tests shown in some cells (i.e., `assert` and `np.testing.` statements) are used to grade your answers. **However, the professor and TAs will use __additional__ test for your answer. Think about cases where your code should run even if it passess all the tests you see.**
- Before downloading and submitting your work through Blackboard, remember to save and press `Validate` (or go to 
`Kernel`$\rightarrow$`Restart and Run All`). 
- Good luck!

In [1]:
# load these packages
import pyspark
from pyspark.ml import feature, classification
from pyspark.ml import Pipeline, pipeline
from pyspark.sql import functions as fn
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import pandas as pd

We will analyze the Mid-atlantic wage dataset (https://rdrr.io/cran/ISLR/man/Wage.html). 

In [2]:
# read-only
drop_cols = ['_c0', 'logwage', 'sex', 'region']
wage_df = spark.read.csv('/datasets/ISLR/Wage.csv', header=True, inferSchema=True).drop(*drop_cols)
training_df, validation_df, testing_df = wage_df.randomSplit([0.6, 0.3, 0.1], seed=0)
wage_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- maritl: string (nullable = true)
 |-- race: string (nullable = true)
 |-- education: string (nullable = true)
 |-- jobclass: string (nullable = true)
 |-- health: string (nullable = true)
 |-- health_ins: string (nullable = true)
 |-- wage: double (nullable = true)



In [3]:
# explore the data
wage_df.limit(10).toPandas()

Unnamed: 0,year,age,maritl,race,education,jobclass,health,health_ins,wage
0,2006,18,1. Never Married,1. White,1. < HS Grad,1. Industrial,1. <=Good,2. No,75.043154
1,2004,24,1. Never Married,1. White,4. College Grad,2. Information,2. >=Very Good,2. No,70.47602
2,2003,45,2. Married,1. White,3. Some College,1. Industrial,1. <=Good,1. Yes,130.982177
3,2003,43,2. Married,3. Asian,4. College Grad,2. Information,2. >=Very Good,1. Yes,154.685293
4,2005,50,4. Divorced,1. White,2. HS Grad,2. Information,1. <=Good,1. Yes,75.043154
5,2008,54,2. Married,1. White,4. College Grad,2. Information,2. >=Very Good,1. Yes,127.115744
6,2009,44,2. Married,4. Other,3. Some College,1. Industrial,2. >=Very Good,1. Yes,169.528538
7,2008,30,1. Never Married,3. Asian,3. Some College,2. Information,1. <=Good,1. Yes,111.720849
8,2006,41,1. Never Married,2. Black,3. Some College,2. Information,2. >=Very Good,1. Yes,118.884359
9,2004,52,2. Married,1. White,2. HS Grad,2. Information,2. >=Very Good,1. Yes,128.680488


# Question 1: Codify the data using transformers (20 pts)

Create a fitted pipeline to the entire data `wage_df` and call it `pipe_feat`. This pipeline should codify the columns `maritl`, `race`, `education`, `jobclass`, `health`, and `health_ins`. The codification should be a combination of a `StringIndexer` and a `OneHotEncoder`. For example, for `maritl`, `StringIndexer` should create a column `maritl_index` and `OneHotEncoder` should create a column `maritl_feat`. Investigate the parameters of `StringIndexer` so that the labels are indexed alphabetically in ascending order so that, for example, the 1st index for `maritl_index` corresponds to `1. Never Married`, the 2nd index corresponds to `2. Married`, and so forth. Also, investigate the parameters of  `OneHotEncoder` so that there are no columns dropped as it is usually done for dummy variables. This is, marital status should have one column for each of the classes.

The pipeline should create a column `features` that combines `year`, `age`, and all codified columns.

In [4]:
# create `pipe_feat` below
# YOUR CODE HERE
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
pipe_feat=Pipeline(stages=[StringIndexer(inputCol='maritl',outputCol='maritl_index',stringOrderType='alphabetAsc'),OneHotEncoder(inputCol='maritl_index',outputCol='maritl_feat',dropLast=False),
                            StringIndexer(inputCol='race',outputCol='race_index',stringOrderType='alphabetAsc'),OneHotEncoder(inputCol='race_index',outputCol='race_feat',dropLast=False),
                            StringIndexer(inputCol='education',outputCol='education_index',stringOrderType='alphabetAsc'),OneHotEncoder(inputCol='education_index',outputCol='education_feat',dropLast=False),
                            StringIndexer(inputCol='jobclass',outputCol='jobclass_index',stringOrderType='alphabetAsc'),OneHotEncoder(inputCol='jobclass_index',outputCol='jobclass_feat',dropLast=False),
                            StringIndexer(inputCol='health',outputCol='health_index',stringOrderType='alphabetAsc'),OneHotEncoder(inputCol='health_index',outputCol='health_feat',dropLast=False),
                            StringIndexer(inputCol='health_ins',outputCol='health_ins_index',stringOrderType='alphabetAsc'),OneHotEncoder(inputCol='health_ins_index',outputCol='health_ins_feat',dropLast=False),
                            feature.VectorAssembler(inputCols=['maritl_feat','race_feat', 'education_feat','jobclass_feat','health_feat','health_ins_feat','year','age'],outputCol='features')]).fit(wage_df)

In [5]:
# investigate the results
pipe_feat.transform(wage_df).limit(5).toPandas().T

Unnamed: 0,0,1,2,3,4
year,2006,2004,2003,2003,2005
age,18,24,45,43,50
maritl,1. Never Married,1. Never Married,2. Married,2. Married,4. Divorced
race,1. White,1. White,1. White,3. Asian,1. White
education,1. < HS Grad,4. College Grad,3. Some College,4. College Grad,2. HS Grad
jobclass,1. Industrial,2. Information,1. Industrial,2. Information,2. Information
health,1. <=Good,2. >=Very Good,1. <=Good,2. >=Very Good,1. <=Good
health_ins,2. No,2. No,1. Yes,1. Yes,1. Yes
wage,75.0432,70.476,130.982,154.685,75.0432
maritl_index,0,0,1,1,3


In [6]:
# (20 pts)
assert set(type(pm) for pm in pipe_feat.stages) == {feature.OneHotEncoder, feature.StringIndexerModel, feature.VectorAssembler}
assert len(pipe_feat.transform(wage_df).first().features) == 22


# Question 2: (15 pts)

Create three pipelines that contain three different random forest regressions that take in all features from the `wage_df` to predict `wage`. These pipelines should have as first stage the pipeline created in question 1 and should be fitted to the training data.

- `pipe_rf1`: Random forest with `maxDepth=1` and `numTrees=60`
- `pipe_rf2`: Random forest with `maxDepth=3` and `numTrees=40`
- `pipe_rf3`: Random forest with `maxDepth=6`, `numTrees=20`

In [7]:
# create the fitted pipelines `pipe_rf1`, `pipe_rf2`, and `pipe_rf3` here
# YOUR CODE HERE

from pyspark.ml.regression import RandomForestRegressor
pipe_rf1=Pipeline(stages=[pipe_feat,RandomForestRegressor(featuresCol='features',labelCol='wage',maxDepth=1,numTrees=60)]).fit(training_df)
pipe_rf2=Pipeline(stages=[pipe_feat,RandomForestRegressor(featuresCol='features',labelCol='wage',maxDepth=3,numTrees=40)]).fit(training_df)
pipe_rf3=Pipeline(stages=[pipe_feat,RandomForestRegressor(featuresCol='features',labelCol='wage',maxDepth=6,numTrees=20)]).fit(training_df)


In [8]:
# tests for 15 pts
np.testing.assert_equal(type(pipe_rf1.stages[0]), pipeline.PipelineModel)
np.testing.assert_equal(type(pipe_rf2.stages[0]), pipeline.PipelineModel)
np.testing.assert_equal(type(pipe_rf3.stages[0]), pipeline.PipelineModel)
np.testing.assert_equal(type(pipe_rf1.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(pipe_rf2.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(pipe_rf3.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(pipe_rf1.transform(training_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_equal(type(pipe_rf2.transform(training_df)), pyspark.sql.dataframe.DataFrame)
np.testing.assert_equal(type(pipe_rf3.transform(training_df)), pyspark.sql.dataframe.DataFrame)

# Question 3 (10 pts)

Use the following evaluator to compute the RMSE of the models on validation data. Print the RMSE of the three models and assign the best one (i.e., the best pipeline) to a variable `best_model`

In [9]:
evaluator = evaluation.RegressionEvaluator(labelCol='wage', metricName='rmse')
# use it as follows:
#   evaluator.evaluate(fitted_pipeline.transform(df)) -> RMSE

In [10]:
# print MSE of each model and define `best_model`
# YOUR CODE HERE
rf1_rmse=evaluator.evaluate(pipe_rf1.transform(validation_df))
rf2_rmse=evaluator.evaluate(pipe_rf2.transform(validation_df))
rf3_rmse=evaluator.evaluate(pipe_rf3.transform(validation_df))
print(rf1_rmse)
print(rf2_rmse)
print(rf3_rmse)
best_model=pipe_rf3

36.7778208464142
33.88579891132714
32.83650074931659


In [11]:
# tests for 10 pts
np.testing.assert_equal(type(best_model.stages[0]), pipeline.PipelineModel)
np.testing.assert_equal(type(best_model.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(best_model.transform(training_df)), pyspark.sql.dataframe.DataFrame)

# Question 4: 5 pts

Compute the RMSE of the model on testing data, print it, and assign it to variable `RMSE_best`

In [12]:
# create RMSE_best below
# YOUR CODE HERE
RMSE_best=evaluator.evaluate(pipe_rf3.transform(testing_df))

In [13]:
# tests for 5 pts
np.testing.assert_array_less(RMSE_best, 40)
np.testing.assert_array_less(30, RMSE_best)

# Question 5: 5 pts

Using the parameters of the best model, create a new pipeline called `final_model` and fit it to the entire data (`wage_df`)

In [14]:
# create final_model pipeline below
# YOUR CODE HERE
final_model=Pipeline(stages=[pipe_feat,RandomForestRegressor(featuresCol='features',labelCol='wage',maxDepth=6,numTrees=20)]).fit(training_df)

In [15]:
# tests for 5 pts
np.testing.assert_equal(type(final_model.stages[0]), pipeline.PipelineModel)
np.testing.assert_equal(type(final_model.stages[1]), regression.RandomForestRegressionModel)
np.testing.assert_equal(type(final_model.transform(wage_df)), pyspark.sql.dataframe.DataFrame)

# Question 6: 30 pts

Create a pandas dataframe `feature_importance` with the columns `feature` and `importance` which contains the names of the features. Give appropriate column names such as `maritl_1._Never_Married`. You can build these feature names by using the labels from the fitted `StringIndexer` used in Question 1. Use as feature importance as determined by the random forest of the final model (`final_model`). Sort the pandas dataframe by `importance` in descending order and display.

In [16]:
col_labels=[]
col_labels.extend('maritl_%s'%(i) for i in final_model.stages[0].stages[0].labels)
col_labels.extend('race_%s'%(j) for j in final_model.stages[0].stages[2].labels)
col_labels.extend('education_%s'%(k) for k in final_model.stages[0].stages[4].labels)
col_labels.extend('jobclass_%s'%(l) for l in final_model.stages[0].stages[6].labels)
col_labels.extend('health_%s'%(m) for m in final_model.stages[0].stages[8].labels)
col_labels.extend('health_ins_%s'%(n) for n in final_model.stages[0].stages[10].labels)
col_labels.extend(['year'])
col_labels.extend(['age'])
col_labels
                  

['maritl_1. Never Married',
 'maritl_2. Married',
 'maritl_3. Widowed',
 'maritl_4. Divorced',
 'maritl_5. Separated',
 'race_1. White',
 'race_2. Black',
 'race_3. Asian',
 'race_4. Other',
 'education_1. < HS Grad',
 'education_2. HS Grad',
 'education_3. Some College',
 'education_4. College Grad',
 'education_5. Advanced Degree',
 'jobclass_1. Industrial',
 'jobclass_2. Information',
 'health_1. <=Good',
 'health_2. >=Very Good',
 'health_ins_1. Yes',
 'health_ins_2. No',
 'year',
 'age']

In [17]:
# create feature_importance below
# YOUR CODE HERE
randomforest=final_model.stages[-1]
feature_importance=pd.DataFrame(list(zip(col_labels,randomforest.featureImportances.toArray())),columns=['feature','importance']).sort_values('importance',ascending=False)


In [18]:
# display your feature importances here
feature_importance

Unnamed: 0,feature,importance
13,education_5. Advanced Degree,0.307504
21,age,0.121546
18,health_ins_1. Yes,0.092268
12,education_4. College Grad,0.083105
19,health_ins_2. No,0.071878
10,education_2. HS Grad,0.053304
0,maritl_1. Never Married,0.052271
1,maritl_2. Married,0.052142
20,year,0.034955
9,education_1. < HS Grad,0.024706


In [19]:
# tests for 25 pts
assert type(feature_importance) == pd.core.frame.DataFrame
np.testing.assert_array_equal(list(feature_importance.columns), ['feature', 'importance'])
np.testing.assert_array_equal(list(feature_importance.columns), ['feature', 'importance'])

**(5 pts)** Comment below on the importance that random forest has given to each feature. Are they reasonable? Do they tell you anything valuable about the titanic dataset? Answer in the cell below

The random forest has given the highest importance to education(Advanced degree) followed by age and health_ins and the least importance to maritl widowed followed by separated.The top features help in determining the wage.

# Question 7:  15 pts.

Pick any of the trees from the final model and assign its `toDebugString` property to a variable `example_tree`. Print this variable and add comments to the cell describing how you think this particular tree is fitting the data

In [20]:
# create a variable example_tree with the toDebugString property of a tree from final_model.
# print this string and comment in this same cell about the branches that this tree fit
# YOUR CODE HERE
example_tree=randomforest.trees[4].toDebugString

In [21]:
# display the tree here
print(example_tree)

DecisionTreeRegressionModel (uid=dtr_fec404542bc6) of depth 6 with 115 nodes
  If (feature 18 in {0.0})
   If (feature 13 in {0.0})
    If (feature 21 <= 28.5)
     If (feature 1 in {0.0})
      If (feature 20 <= 2004.5)
       If (feature 11 in {1.0})
        Predict: 42.23444589269706
       Else (feature 11 not in {1.0})
        Predict: 64.93361853449139
      Else (feature 20 > 2004.5)
       If (feature 10 in {0.0})
        Predict: 70.52803827720967
       Else (feature 10 not in {0.0})
        Predict: 77.03043169589179
     Else (feature 1 not in {0.0})
      If (feature 21 <= 22.5)
       Predict: 54.5981500331442
      Else (feature 21 > 22.5)
       If (feature 9 in {1.0})
        Predict: 40.4056652596845
       Else (feature 9 not in {1.0})
        Predict: 81.3814330667517
    Else (feature 21 > 28.5)
     If (feature 17 in {0.0})
      If (feature 20 <= 2003.5)
       If (feature 21 <= 36.5)
        Predict: 95.97032932658047
       Else (feature 21 > 36.5)
        Pred

In [22]:
# tests for 10 points
assert type(example_tree) == str
assert 'DecisionTreeRegressionModel' in example_tree
assert 'feature 0' in example_tree
assert 'If' in example_tree
assert 'Else' in example_tree
assert 'Predict' in example_tree

**(5 pts)** Comment on the feature that is at the top of the tree. Does it make sense that that is the feature there?

The feature at the top of the tree checks if feature 0 is never married, if yes it checks if feature 13 i.e. the person has advanced degree. This continues further if condition is false it goes into the else loop and checks the condition. So, yes it makes sense that the feature is at the top of the tree.