In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('World Happiness Report Pyspark').getOrCreate()

In [21]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.stat import Correlation


In [41]:
report2017 = spark.read.csv('2017.csv', inferSchema = True, header = True)

In [42]:
report2017.printSchema()


root
 |-- Country: string (nullable = true)
 |-- Happiness.Rank: integer (nullable = true)
 |-- Happiness.Score: double (nullable = true)
 |-- Whisker.high: double (nullable = true)
 |-- Whisker.low: double (nullable = true)
 |-- Economy..GDP.per.Capita.: double (nullable = true)
 |-- Family: double (nullable = true)
 |-- Health..Life.Expectancy.: double (nullable = true)
 |-- Freedom: double (nullable = true)
 |-- Generosity: double (nullable = true)
 |-- Trust..Government.Corruption.: double (nullable = true)
 |-- Dystopia.Residual: double (nullable = true)



In [43]:
from pyspark.ml.feature import VectorAssembler


In [44]:
report2017.columns

['Country',
 'Happiness.Rank',
 'Happiness.Score',
 'Whisker.high',
 'Whisker.low',
 'Economy..GDP.per.Capita.',
 'Family',
 'Health..Life.Expectancy.',
 'Freedom',
 'Generosity',
 'Trust..Government.Corruption.',
 'Dystopia.Residual']

In [45]:
#Change columns names
new_column_name_list= list(map(lambda x: x.replace(".", ""), report2017.columns))

report2017 = report2017.toDF(*new_column_name_list)

In [47]:
report2017.columns

['Country',
 'HappinessRank',
 'HappinessScore',
 'Whiskerhigh',
 'Whiskerlow',
 'EconomyGDPperCapita',
 'Family',
 'HealthLifeExpectancy',
 'Freedom',
 'Generosity',
 'TrustGovernmentCorruption',
 'DystopiaResidual']

In [48]:
assembler = VectorAssembler(
 inputCols=[ 'HappinessRank',
 'HappinessScore',
 'Whiskerhigh',
 'Whiskerlow',
 'EconomyGDPperCapita',
 'Family',
 'HealthLifeExpectancy',
 'Freedom',
 'Generosity',
 'TrustGovernmentCorruption',
 'DystopiaResidual'],
              outputCol="features")

In [49]:
output = assembler.transform(report2017)


In [72]:
from pyspark.ml.linalg import Vectors

r1 = Correlation.corr(output, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))


Pearson correlation matrix:
DenseMatrix([[ 1.        , -0.99277447, -0.99305849, -0.99153348, -0.81324364,
              -0.73675268, -0.78071584, -0.55160784, -0.13261979, -0.40584233,
              -0.48450596],
             [-0.99277447,  1.        ,  0.99949742,  0.99952041,  0.81246875,
               0.75273667,  0.78195062,  0.57013719,  0.15525584,  0.42907974,
               0.47535512],
             [-0.99305849,  0.99949742,  1.        ,  0.99803641,  0.81186758,
               0.75093422,  0.7766345 ,  0.56990738,  0.15546203,  0.42645891,
               0.47882363],
             [-0.99153348,  0.99952041,  0.99803641,  1.        ,  0.81226748,
               0.75376686,  0.78638485,  0.56980835,  0.15490374,  0.43122349,
               0.4715055 ],
             [-0.81324364,  0.81246875,  0.81186758,  0.81226748,  1.        ,
               0.68829631,  0.84307664,  0.36987339, -0.01901125,  0.3509441 ,
               0.02422642],
             [-0.73675268,  0.75273667,  0

In [73]:
#Remove redundant features

assembler = VectorAssembler(
 inputCols=[ 'EconomyGDPperCapita',
 'Family',
 'HealthLifeExpectancy',
 'Freedom',
 'Generosity',
 'TrustGovernmentCorruption',
 'DystopiaResidual'],
              outputCol="features")

output2 = assembler.transform(report2017)


In [79]:
#Scale variables
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

scalerModel = scaler.fit(output2)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(output2)
scaledData.show(1)

+-------+-------------+----------------+----------------+----------------+-------------------+----------------+--------------------+-----------------+----------------+-------------------------+----------------+--------------------+--------------------+
|Country|HappinessRank|  HappinessScore|     Whiskerhigh|      Whiskerlow|EconomyGDPperCapita|          Family|HealthLifeExpectancy|          Freedom|      Generosity|TrustGovernmentCorruption|DystopiaResidual|            features|      scaledFeatures|
+-------+-------------+----------------+----------------+----------------+-------------------+----------------+--------------------+-----------------+----------------+-------------------------+----------------+--------------------+--------------------+
| Norway|            1|7.53700017929077|7.59444482058287|7.47955553799868|   1.61646318435669|1.53352355957031|   0.796666502952576|0.635422587394714|0.36201223731041|        0.315963834524155|2.27702665328979|[1.61646318435669...|[3.8414714

In [81]:
scaledData.columns

['Country',
 'HappinessRank',
 'HappinessScore',
 'Whiskerhigh',
 'Whiskerlow',
 'EconomyGDPperCapita',
 'Family',
 'HealthLifeExpectancy',
 'Freedom',
 'Generosity',
 'TrustGovernmentCorruption',
 'DystopiaResidual',
 'features',
 'scaledFeatures']

In [82]:
final_data = scaledData.select("scaledFeatures",'HappinessScore')


In [83]:
train, test = final_data.randomSplit([0.7,0.3])


In [84]:
lr = LinearRegression(featuresCol='scaledFeatures', labelCol='HappinessScore', predictionCol='prediction')


In [85]:
# Fit the model
lrModel = lr.fit(train)

In [86]:
# Print the coefficients and intercept for linear regression
print("Coefficients: {}".format(str(lrModel.coefficients))) # For each feature...
print('\n')
print("Intercept:{}".format(str(lrModel.intercept)))

Coefficients: [0.4208607647467514,0.28722967051271053,0.23701843570281034,0.1500160990794907,0.13478428755879313,0.10162394719435851,0.4999816428689854]


Intercept:0.0002821469314831961


In [87]:
Summary = lrModel.summary

In [88]:
Summary.residuals.show()
print("RMSE: {}".format(Summary.rootMeanSquaredError))
print("r2: {}".format(Summary.r2))

+--------------------+
|           residuals|
+--------------------+
|-1.67518984090175...|
|-3.03916198370224...|
|4.497230400510865...|
| -6.7970652792404E-5|
|2.298365759227927...|
|-3.03472493346390...|
|2.820236007536891...|
|-1.37305955437483...|
|4.200553357867065E-4|
|-4.15374973223414...|
|-2.23626906370455...|
|2.178133167749507...|
|3.839437495845033E-4|
|-2.00241139603818...|
|1.853253925951925...|
| 9.86530050846035E-5|
|-3.26259385607041...|
|-1.70805778044957...|
|1.649976054629576E-4|
|-2.12853158926051...|
+--------------------+
only showing top 20 rows

RMSE: 0.00027110357125753354
r2: 0.9999999397692348


In [90]:
test_results = lrModel.evaluate(test)

In [91]:
test_results.residuals.show()
print("RMSE: {}".format(test_results.rootMeanSquaredError))

+--------------------+
|           residuals|
+--------------------+
|2.528941710808752E-4|
|3.720344258706376E-4|
| 2.78248546647486E-4|
|1.762437286978446...|
|2.595077600009432...|
|-3.40717166289472...|
|1.753933555210807E-4|
|-3.10578546397266...|
|-1.27355973703835...|
|3.911131027063419E-4|
|-8.33403515629882...|
|5.216971777377566E-4|
|-3.41186228018486...|
|-3.12695691278008...|
|-8.31132354051078...|
|3.286486491269613...|
|3.565681952055755E-4|
|3.861736630259571E-4|
|-1.40175087866012...|
|-2.70359668100184...|
+--------------------+
only showing top 20 rows

RMSE: 0.0003087989388196993


In [92]:
unlabeled_data = test.select('scaledFeatures')

In [93]:
predictions = lrModel.transform(unlabeled_data)

In [94]:
predictions.show()

+--------------------+------------------+
|      scaledFeatures|        prediction|
+--------------------+------------------+
|[0.55681965189602...| 4.549747296563779|
|[0.58116484578800...|3.5066279350565495|
|[0.72587930006475...|3.4947216370124323|
|[0.72674427646559...| 3.643823809677062|
|[0.80617804884618...|  4.45997408737097|
|[0.90645751876715...|   4.0813405683931|
|[1.14027686574806...| 4.961824499832959|
|[1.23818981462933...|3.8083106662844375|
|[1.39423274718889...| 4.608127157609444|
|[1.51239972793849...| 4.513608825862134|
|[1.72741474434877...| 5.269083393757323|
|[1.73618292648071...| 5.180478535518792|
|[1.75216724842509...| 6.071341285410148|
|[1.84687880016634...| 3.462031162757608|
|[1.88268767580868...| 4.315000140333695|
|[2.07228387372530...| 6.453671347536173|
|[2.10416285413059...| 5.010643588207384|
|[2.12611086008845...| 4.095614020886534|
|[2.16207285773018...| 6.003139957650126|
|[2.35510828057311...|  5.33627032533582|
+--------------------+------------