In [1]:
#Zih-Cin Jain
#zxj161530@utdallas.edu
#Project: World Happiness Indicator
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
#Setting FilePath
filePath_h2015 = "/WHR/2015.csv" 
filePath_h2016 = "/WHR/2016.csv"
filePath_h2017 = "/WHR/2017.csv"

In [3]:
# Loading CSV files 
raw_df_h2015 = spark.read.option("header","true").option("InferSchema", "True").csv(filePath_h2015)
raw_df_h2016 = spark.read.option("header","true").option("InferSchema", "True").csv(filePath_h2016)
raw_df_h2017 = spark.read.option("header","true").option("InferSchema", "True").csv(filePath_h2017)
#Setting Datatype and Rename columns
df_h2015 = raw_df_h2015.drop('Standard Error')\
.withColumnRenamed('Economy (GDP per Capita)','GDP')\
.withColumnRenamed('Health (Life Expectancy)','Health')\
.drop("Region")

df_h2016 = raw_df_h2016.drop('Lower Confidence Interval')\
.drop('Upper Confidence Interval')\
.withColumnRenamed('Economy (GDP per Capita)','GDP')\
.withColumnRenamed('Health (Life Expectancy)','Health')\
.drop("Region")

df_h2017 = raw_df_h2017.withColumnRenamed('Happiness.Rank','Happiness Rank')\
.withColumnRenamed('Happiness.Score','Happiness Score')\
.drop('Whisker.high').drop('Whisker.low')\
.withColumnRenamed('Economy..GDP.per.Capita.','GDP')\
.withColumnRenamed('Health..Life.Expectancy.','Health')\
.withColumnRenamed('Trust..Government.Corruption.','Trust (Government Corruption)')\
.withColumnRenamed('Dystopia.Residual','Dystopia Residual')

df = df_h2015.union(df_h2016).sort(asc("Country"))
indexer = StringIndexer(inputCol="Country", outputCol="CountryID")
df = indexer.fit(df).transform(df)
df.cache()
assembler = VectorAssembler(
    inputCols=[ "GDP", "Family","Health","Freedom","Trust (Government Corruption)","Generosity","Dystopia Residual"],
    outputCol="features")
output = assembler.transform(df)

In [4]:
#find correlation matrix
datasetUnionAllYearsPD = df_h2015.union(df_h2016).union(df_h2017).toPandas()
corrmat_allYears = datasetUnionAllYearsPD.corr()
fig, axes = plt.subplots(figsize=(10, 7))
plt.subplots_adjust(bottom=0.35)
mask = np.zeros_like(corrmat_allYears)
mask[np.triu_indices_from(mask)] = True
sns.set(font_scale=0.7)
sns.axes_style("white")
sns.heatmap(corrmat_allYears, linewidths=1, annot=True, mask=mask, vmax= 0.9, square=True, annot_kws={"size":8}, cmap="Set2")
axes.set_title(" Correlation Matrix: World Happiness Report")
display(fig)

In [5]:
pca_all = PCA(k=7, inputCol="features", outputCol="PCA_Features (GDP,Family,Health,Freedom,Trust(Government Corruption),Generosity,Dystopia Residual)")
pcaModel = pca_all.fit(output)
result_all = pcaModel.transform(output).select("PCA_Features (GDP,Family,Health,Freedom,Trust(Government Corruption),Generosity,Dystopia Residual)")
result_all.show(5,truncate=False)
pca = PCA(k=4, inputCol="features", outputCol="PCA_Features")
pcaModel = pca.fit(output)
result = pcaModel.transform(output).select("features", "PCA_Features")
result.show(4,truncate=False)
print ("The top 4 important factors(components) for the evaluation of happiness are : GPD, Family, Health, and Freedom (all > the value of Dystopia which contains unexplainable data)")

In [6]:
trainingData = df.select("Happiness Score", "GDP","Family","Health","Freedom").rdd\
  .map(lambda r: (r[0], Vectors.dense([r[1],r[2],r[3],r[4]]))).toDF().withColumnRenamed('_1','label').withColumnRenamed('_2','features')
testData = df_h2017.select("Happiness Score", "GDP","Family","Health","Freedom").rdd\
  .map(lambda r: (r[0], Vectors.dense([r[1],r[2],r[3],r[4]]))).toDF().withColumnRenamed('_1','label').withColumnRenamed('_2','features')
df2017WithFeatures = df_h2017.select("Country", "Happiness Rank", "Happiness Score", "GDP","Family","Health","Freedom").rdd\
  .map(lambda r: (r[0], r[1] ,r[2], Vectors.dense([r[3],r[4],r[5],r[6]]))).toDF().withColumnRenamed('_1','Country').withColumnRenamed('_2','Happiness Rank').withColumnRenamed('_3','Happiness Score').withColumnRenamed('_4','features')

In [7]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

regularParam = [0.4, 0.5, 0.6]
bestRegularParam = -1
minrmse = float('inf') # infinity

trainingData1, validationData = trainingData.randomSplit([0.66, 0.34])
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

for rp in regularParam:
    lr = LinearRegression(maxIter= 10, regParam=rp)
    model =lr.fit(trainingData1)
    print("when regular parameter: %s" %rp)
    print("Coefficients: %s" % str(model.coefficients))
    print("Intercept: %s" % str(model.intercept))
    trainingSummary = model.summary
    print("Root Mean Squared Error (RMSE) on training data : %f" % trainingSummary.rootMeanSquaredError)
    validation = model.transform(validationData)
    #evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(validation)
    print("Root Mean Squared Error (RMSE) on validation data = %g" % rmse)
    
    #rmse = trainingSummary.rootMeanSquaredError
    if rmse < minrmse:
         minrmse = rmse
         bestRegularParam = rp
          
print 'The best model was trained with regular parameter %s' % bestRegularParam
    

In [8]:
mylr = LinearRegression(maxIter= 10, regParam=bestRegularParam)
mymodel =mylr.fit(trainingData)
predictions = mymodel.transform(testData)
dfPredicted2017 = df2017WithFeatures.join(predictions, df2017WithFeatures.features == predictions.features,'inner').select("Country", "Happiness Rank", "Happiness Score","prediction").withColumnRenamed("prediction", "Predicted Score")
#evaluator = RegressionEvaluator(
#    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [9]:
from pyspark.sql import Window
window = Window.orderBy(desc("Predicted Score"))
dfPredictedRank = dfPredicted2017.withColumn("Predicted Rank", dense_rank().over(window))
dfPredictedRank.show(50,False)

In [10]:
from plotly.offline import plot
pd2017 = df_h2017.toPandas()
data = dict(type = 'choropleth', 
           colorscale = 'Portland',
           autocolorscale = False, 
           locations = pd2017['Country'],
           locationmode = 'country names',
           z = pd2017['Happiness Rank'], 
           text = pd2017['Country'],
           colorbar = {'title':'Happiness Rank'})
layout = dict(title = '2017 Actual Global Happiness Rank', geo = dict( showframe = False, showcoastlines = False, projection = dict( type = 'Mercator')))
fig = dict(data = [data], layout=layout)
p1 = plot(fig, output_type = 'div')
displayHTML(p1)

In [11]:
from plotly.offline import plot
pdPredictedRank = dfPredictedRank.toPandas()
data = dict(type = 'choropleth', 
           colorscale = 'Portland',
           autocolorscale = False, 
           locations = pdPredictedRank['Country'],
           locationmode = 'country names',
           z = pdPredictedRank['Predicted Rank'], 
           text = pdPredictedRank['Country'],
           colorbar = {'title':'Happiness Rank'})
layout = dict(title = '2017 Predicted Global Happiness Rank', geo = dict( showframe = False, showcoastlines = False, projection = dict( type = 'Mercator')))
fig = dict(data = [data], layout=layout)
p2 = plot(fig, output_type = 'div')
displayHTML(p2)