In [1]:
from __future__ import print_function

from pyspark.ml.regression import DecisionTreeRegressor, LinearRegression
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import numpy as np
import pandas as pd

In [2]:
# Create a SparkSession 
spark = SparkSession.builder.appName("LifeExpectation").getOrCreate()


In [3]:
df = spark.read.option("header", "true").option("inferSchema", "true")\
    .csv("data/Life_Expectancy_Data3.csv")

In [4]:
df.show(1)

+-----------+----+----------+----+---------------+-------------+-------+----------------------+----------+-------+----+-----------------+-----+-----------------+----------+--------+---------+-----------+-------------------+------------------+----------------------------+---------+------+
|    Country|Year|    Status|Life|Adult_Mortality|infant_deaths|Alcohol|percentage_expenditure|HepatitisB|Measles| BMI|under-five_deaths|Polio|Total_expenditure|Diphtheria|HIV_AIDS|      GDP| Population|thinness_1-19_years|thinness_5-9_years|Income_composition_resources|Schooling|Region|
+-----------+----+----------+----+---------------+-------------+-------+----------------------+----------+-------+----+-----------------+-----+-----------------+----------+--------+---------+-----------+-------------------+------------------+----------------------------+---------+------+
|Afghanistan|2015|Developing|65.0|            263|           62|   0.01|           71.27962362|        65|   1154|19.1|              

In [5]:
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- Life: double (nullable = true)
 |-- Adult_Mortality: integer (nullable = true)
 |-- infant_deaths: integer (nullable = true)
 |-- Alcohol: double (nullable = true)
 |-- percentage_expenditure: double (nullable = true)
 |-- HepatitisB: integer (nullable = true)
 |-- Measles: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- under-five_deaths: integer (nullable = true)
 |-- Polio: integer (nullable = true)
 |-- Total_expenditure: double (nullable = true)
 |-- Diphtheria: integer (nullable = true)
 |-- HIV_AIDS: double (nullable = true)
 |-- GDP: double (nullable = true)
 |-- Population: double (nullable = true)
 |-- thinness_1-19_years: double (nullable = true)
 |-- thinness_5-9_years: double (nullable = true)
 |-- Income_composition_resources: double (nullable = true)
 |-- Schooling: double (nullable = true)
 |-- Region: string (nullable = true)



In [6]:
# Cleaning the Dataset REMOVE Two Columns
df = df.drop(*['Year','Country'])             # dropping some irrelevant columns

#df = df.na.drop()
# df = df.dropna(how = 'any', subset = ['userId', 'sessionId'])                   # droppping some potential NA values
#df = df.filter(df.userId!='').orderBy(["userId", "ts"], ascending=[True, True]) # filtering out the invalid Ids
#df = df.withColumn("userId", df["userId"].cast(IntegerType()))                  # case userId column to integer

In [7]:
# IMPUTER
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=df.drop('Status','Region').columns,
                 outputCols=["{}".format(c) for c in df.drop('Status','Region').columns]).setStrategy('mean')

# add imputation to cols
df_imputed = imputer.fit(df).transform(df)

In [8]:
df_imputed.columns

['Status',
 'Life',
 'Adult_Mortality',
 'infant_deaths',
 'Alcohol',
 'percentage_expenditure',
 'HepatitisB',
 'Measles',
 'BMI',
 'under-five_deaths',
 'Polio',
 'Total_expenditure',
 'Diphtheria',
 'HIV_AIDS',
 'GDP',
 'Population',
 'thinness_1-19_years',
 'thinness_5-9_years',
 'Income_composition_resources',
 'Schooling',
 'Region']

In [9]:
# Find Count of Null, None, NaN of All DataFrame Columns
from pyspark.sql.functions import col,isnan, when, count
df_imputed.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_imputed.columns]
   ).show()

+------+----+---------------+-------------+-------+----------------------+----------+-------+---+-----------------+-----+-----------------+----------+--------+---+----------+-------------------+------------------+----------------------------+---------+------+
|Status|Life|Adult_Mortality|infant_deaths|Alcohol|percentage_expenditure|HepatitisB|Measles|BMI|under-five_deaths|Polio|Total_expenditure|Diphtheria|HIV_AIDS|GDP|Population|thinness_1-19_years|thinness_5-9_years|Income_composition_resources|Schooling|Region|
+------+----+---------------+-------------+-------+----------------------+----------+-------+---+-----------------+-----+-----------------+----------+--------+---+----------+-------------------+------------------+----------------------------+---------+------+
|     0|   0|              0|            0|      0|                     0|         0|      0|  0|                0|    0|                0|         0|       0|  0|         0|                  0|                 0|       

In [10]:
#One hot enconde Status and Region

# First we index it
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCols=['Region','Status'], outputCols=['Region_Numeric','Status_Numeric'])
indexer_fitted = indexer.fit(df_imputed)
df_indexed = indexer_fitted.transform(df_imputed)

#now we can one hot encode
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=['Region_Numeric','Status_Numeric'], outputCols=['Region_onehot','Status_onehot'])
df_onehot = encoder.fit(df_indexed).transform(df_indexed)

#Remobe the categorical columns not needed anymore
df_onehot = df_onehot.drop(*['Status','Region','Region_Numeric','Status_Nhmeric'])
df_onehot.columns

['Life',
 'Adult_Mortality',
 'infant_deaths',
 'Alcohol',
 'percentage_expenditure',
 'HepatitisB',
 'Measles',
 'BMI',
 'under-five_deaths',
 'Polio',
 'Total_expenditure',
 'Diphtheria',
 'HIV_AIDS',
 'GDP',
 'Population',
 'thinness_1-19_years',
 'thinness_5-9_years',
 'Income_composition_resources',
 'Schooling',
 'Status_Numeric',
 'Region_onehot',
 'Status_onehot']

In [11]:
df_onehot.printSchema()

root
 |-- Life: double (nullable = true)
 |-- Adult_Mortality: integer (nullable = true)
 |-- infant_deaths: integer (nullable = true)
 |-- Alcohol: double (nullable = true)
 |-- percentage_expenditure: double (nullable = true)
 |-- HepatitisB: integer (nullable = true)
 |-- Measles: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- under-five_deaths: integer (nullable = true)
 |-- Polio: integer (nullable = true)
 |-- Total_expenditure: double (nullable = true)
 |-- Diphtheria: integer (nullable = true)
 |-- HIV_AIDS: double (nullable = true)
 |-- GDP: double (nullable = true)
 |-- Population: double (nullable = true)
 |-- thinness_1-19_years: double (nullable = true)
 |-- thinness_5-9_years: double (nullable = true)
 |-- Income_composition_resources: double (nullable = true)
 |-- Schooling: double (nullable = true)
 |-- Status_Numeric: double (nullable = false)
 |-- Region_onehot: vector (nullable = true)
 |-- Status_onehot: vector (nullable = true)



In [12]:
input_cols = df_onehot.drop('Life').columns

In [13]:
assembler = VectorAssembler(inputCols=input_cols, outputCol='features')

df_transform = assembler.transform(df_onehot).select("Life", "features")

In [14]:
df_transform.show(5)

+----+--------------------+
|Life|            features|
+----+--------------------+
|65.0|[263.0,62.0,0.01,...|
|59.9|[271.0,64.0,0.01,...|
|59.9|[268.0,66.0,0.01,...|
|59.5|[272.0,69.0,0.01,...|
|59.2|[275.0,71.0,0.01,...|
+----+--------------------+
only showing top 5 rows



In [15]:
from pyspark.ml.feature import Normalizer, StandardScaler

scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

df_scaled = scaler.fit(df_transform).transform(df_transform)

In [16]:
df_scaled.show(5)

+----+--------------------+--------------------+
|Life|            features|     scaled_features|
+----+--------------------+--------------------+
|65.0|[263.0,62.0,0.01,...|[2.11959495798077...|
|59.9|[271.0,64.0,0.01,...|[2.18406932932619...|
|59.9|[268.0,66.0,0.01,...|[2.15989144007166...|
|59.5|[272.0,69.0,0.01,...|[2.19212862574437...|
|59.2|[275.0,71.0,0.01,...|[2.21630651499890...|
+----+--------------------+--------------------+
only showing top 5 rows



In [17]:
# Let's split our data into training data and testing data
train,test = df_scaled.randomSplit([0.8,0.2])

In [18]:
##############################################################

lr_model = LinearRegression(featuresCol='scaled_features', labelCol='Life')
lr_fitted = lr_model.fit(train)

In [19]:
trainingSummary = lr_fitted.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 3.748937
r2: 0.843096


In [20]:
#######################################################################

In [21]:
# Now create our decision tree
dtr = DecisionTreeRegressor().setFeaturesCol("scaled_features").setLabelCol("Life")

# Train the model using our training data
tree_model = dtr.fit(train)

In [22]:
# Now see if we can predict values in our test data.
# Generate predictions using our decision tree model for all features in our
# test dataframe:
fullPredictions = tree_model.transform(test).cache()

In [23]:
fullPredictions.show()

+----+--------------------+--------------------+------------------+
|Life|            features|     scaled_features|        prediction|
+----+--------------------+--------------------+------------------+
|43.3|[48.0,30.0,3.83,4...|[0.38684622807253...|56.705729166666664|
|43.5|[599.0,48.0,1.15,...|[4.82751855448852...|          46.34375|
|44.0|[67.0,46.0,1.1,3....|[0.53997286001791...|56.705729166666664|
|44.5|[675.0,5.0,2.67,5...|[5.44002508227004...|          46.34375|
|44.5|[715.0,26.0,4.06,...|[5.76239693899715...|          46.34375|
|44.8|[73.0,25.0,4.43,0...|[0.58832863852698...|56.705729166666664|
|45.3|[593.0,7.0,0.83,0...|[4.77916277597945...|          46.34375|
|45.5|[69.0,41.0,2.44,5...|[0.55609145285427...|56.705729166666664|
|45.6|[54.0,17.0,1.52,3...|[0.43520200658160...|51.816326530612244|
|45.6|[69.0,3.0,5.78,37...|[0.55609145285427...|58.745918367346924|
|45.9|[511.0,17.0,1.5,4...|[4.11830046968887...|          46.34375|
|46.0|[63.0,3.0,5.08,37...|[0.50773567434520...|

In [24]:
dt_evaluator = RegressionEvaluator(labelCol="Life", predictionCol="prediction", metricName="r2")

rmse = dt_evaluator.evaluate(fullPredictions)

print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.884091


In [25]:
#################################################################################################
#################################################################################################

In [18]:
lr=LinearRegression(featuresCol='scaled_features', labelCol='Life')

In [19]:
#pipeline1=Pipeline(stages=[assembler, scaler, lr])
pipeline1=Pipeline(stages=[lr])

In [20]:
paramgrid =ParamGridBuilder()\
.addGrid(lr.regParam, [0, 0.001, 0.01, 0.1, 0.5, 1.0, 2.0])\
.addGrid(lr.maxIter, [10])\
.build()


evaluator=RegressionEvaluator(metricName="r2").setLabelCol("Life")


crossval= CrossValidator(estimator=pipeline1,  
                         estimatorParamMaps=paramgrid,
                         evaluator = evaluator, 
                         numFolds=5
                        )

cvModel1=crossval.fit(train) 
evaluator.evaluate(cvModel1.transform(test))


0.8420121042520536

In [21]:
cvModel1.getEstimatorParamMaps()[ np.argmax(cvModel1.avgMetrics) ]

{Param(parent='LinearRegression_845e365a736a', name='regParam', doc='regularization parameter (>= 0).'): 0.001,
 Param(parent='LinearRegression_845e365a736a', name='maxIter', doc='max number of iterations (>= 0).'): 10}

In [22]:
cvModel1.avgMetrics

[0.8348072739048387,
 0.8358324270001516,
 0.835761267076788,
 0.8351171981645227,
 0.8350327892182114,
 0.834507277236124,
 0.8322460374614746]

In [23]:
###########################################################################################################################

In [24]:
rf = RandomForestRegressor(featuresCol='scaled_features', labelCol='Life')
pipeline2 = Pipeline(stages=[rf])

paramgrid =ParamGridBuilder()\
.addGrid(rf.numTrees, [200, 500])\
.addGrid(rf.maxDepth, [5,10])\
.build()


evaluator=RegressionEvaluator(metricName="r2").setLabelCol("Life")


crossval= CrossValidator(estimator=pipeline2,  
                         estimatorParamMaps=paramgrid,
                         evaluator = evaluator, 
                         numFolds=5
                        )

cvModel2=crossval.fit(train) 

evaluator.evaluate(cvModel2.transform(test))


0.9601563822827419

In [25]:
cvModel2.getEstimatorParamMaps()[ np.argmax(cvModel2.avgMetrics) ]

{Param(parent='RandomForestRegressor_1d0f01b4213a', name='numTrees', doc='Number of trees to train (>= 1).'): 500,
 Param(parent='RandomForestRegressor_1d0f01b4213a', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 10}

In [26]:
##########################################################

In [27]:
gbt = GBTRegressor(featuresCol='scaled_features', labelCol='Life')
pipeline3 = Pipeline(stages=[gbt])

paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5, 10])\
  .addGrid(gbt.maxIter, [10,100,500])\
  .build()

evaluator=RegressionEvaluator(metricName="r2").setLabelCol("Life")


crossval= CrossValidator(estimator=pipeline3,  
                         estimatorParamMaps=paramgrid,
                         evaluator = evaluator, 
                         numFolds=5
                        )

cvModel3=crossval.fit(train) 

evaluator.evaluate(cvModel3.transform(test))

0.9406648503167245

In [34]:
##################################################################################################################
#
#   From Site
#
##################################################################################################################

# Train Test Split
training, test = features.randomSplit([0.8, 0.2])


# Make VectorAssembler - this is a Pypark specific step
# All input features must be in one column before feeding into the model

assembler = VectorAssembler(inputCols=["userId","song_count","error","friends","playlist_count", \
                                       "thumbs_up","thumbs_down","downgrade", "count_session_dist",\
                                       "count_diff_time","pages_per_session", "duration","level_shift",\
                                       "usage_time"], \
                            outputCol="inputFeatures")



# Normalize Data
scaler = Normalizer(inputCol="inputFeatures", outputCol="features")


# Spark supports most common classification methods (https://spark.apache.org/docs/latest/ml-classification-regression.html)
# I decided for the following three:

lr=LogisticRegression()
gbt = GBTClassifier()
rf= RandomForestClassifier()


# Building pipelines
pipeline1=Pipeline(stages=[assembler, scaler, lr])
pipeline2=Pipeline(stages=[assembler, scaler, gbt])
pipeline3=Pipeline(stages=[assembler, scaler, rf])

NameError: name 'features' is not defined