# Happiest Citizens in the World

## Linear Regression

A Regression technique - Linear Regression, is applied to create a model that can predict the ranking as close as possible to the actual results.


### Linear Regression for 2015

In [None]:
# Importing SparkSession libraries.
# Creating an instance 'linearReg' for the SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('linearReg').getOrCreate()
# Reading the data from csv file into the dataframe 'data'
data = spark.read.csv('WH_2015.csv', inferSchema=True, header=True)
# Printing the schema of the file
data.printSchema()

In [None]:
# Describing the statistical summary of the data
data.describe().show()

In [None]:
# Displays only the features to know what all features are available
data.columns

In [None]:
# Importing Vectors, VectorAssembler, Pipeline and StringIndexer libraries
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

# StringIndexer - A label indexer that maps a string column of labels to an ML column of label indices.
# If the input column is numeric, we cast it to string and index the string values.
# The indices are in [0, numLabels), ordered by label frequencies. So the most frequent label gets index 0.
# Fits a model to the input dataset with optional parameters.
# Transforms the input dataset with optional parameters.

# A new column 'country_in' is created as an indexer
indexer = StringIndexer(inputCol='Country', outputCol='country_in')
indexed = indexer.fit(data).transform(data)
indexed.columns

In [None]:
# VectorAssembler - A feature transformer that merges multiple columns into a vector column.
# The following columns are combined into one feature as 'features'
assembler15 = VectorAssembler(inputCols=['country_in',
 'Happiness Score',
 'Standard Error',
 'Economy (GDP per Capita)',
 'Family',
 'Health (Life Expectancy)',
 'Freedom',
 'Trust (Government Corruption)',
 'Generosity',
 'Dystopia Residual',
 ], outputCol='features')

#transform() - Transforms the input dataset with optional parameters and returns transformed dataset.
output = assembler15.transform(indexed)
final_data=output.select('features', 'Happiness Rank')
# randomSplit - Randomly splits this RDD with the provided weights and returns split RDDs in a list. 
train_data, test_data=final_data.randomSplit([0.8,0.2], seed=1234)

In [None]:
# Importing Linear Regression libraries 
from pyspark.ml.regression import LinearRegression

In [None]:
# Linear regression - The learning objective is to minimize the squared error, with regularization. 
# The specific squared error loss function used is: L = 1/2n ||A coefficients - y||^2^
# This support multiple types of regularization:
#        none (a.k.a. ordinary least squares)
#        L2 (ridge regression)
#        L1 (Lasso)
#        L2 + L1 (elastic net)
# regParam - lambda
# elasticNetParam - alpha
lr=LinearRegression(labelCol='Happiness Rank', maxIter=10, regParam=0.3, elasticNetParam=0.8, 
                          fitIntercept=True, standardization=True, tol=1e-02)

In [None]:
# fit() - Fits a model to the input dataset with optional parameters and returns fitted model.
linearmodel = lr.fit(train_data)

# Generate predictions
predicted = linearmodel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("Happiness Rank").rdd.map(lambda x: x[0])

In [None]:
# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

In [None]:
# Coefficients for the model
linearmodel.coefficients

In [None]:
# Intercept for the model
linearmodel.intercept

In [None]:
# Number of iterations the model ran
linearmodel.summary.totalIterations

In [None]:
# Objective function (scaled loss + regularization) at each iteration.
linearmodel.summary.objectiveHistory

In [None]:
# Get the residuals of the fitted model by type.
linearmodel.summary.residuals.show()

In [None]:
# Get the RMSE
linearmodel.summary.rootMeanSquaredError

In [None]:
# Get the R2
linearmodel.summary.r2

In [None]:
# Returns the mean squared error, which is a risk function corresponding 
#to the expected value of the squared error loss or quadratic loss.
linearmodel.summary.meanSquaredError

In [None]:
training_summary = linearmodel.summary
# predictions - Predictions associated with the boundaries at the same index, monotone because of isotonic regression.
training_summary.predictions.describe().show()

In [None]:
# Importing RegressionEvaluator Libraries for evaluating the Linear Regression model 
from pyspark.ml.evaluation import RegressionEvaluator
# Applied the Regression Evaluator to evaluate the label 'Happiness Rank'
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='Happiness Rank')
# Evaluation of the predicted values on a scale of 0 to 1
evaluator.evaluate(pred_and_labels.predictions,{evaluator.metricName: "r2"})

### Linear Regression for 2016

In [None]:
# Reading the data from csv file into the dataframe 'data'
data = spark.read.csv('WH_2016.csv', inferSchema=True, header=True)
# Printing the schema of the file
data.printSchema()

In [None]:
# Describing the statistical summary of the data
data.describe().show()

In [None]:
# Displays only the features to know what all features are available
data.columns

In [None]:
# StringIndexer - A label indexer that maps a string column of labels to an ML column of label indices.
# If the input column is numeric, we cast it to string and index the string values.
# The indices are in [0, numLabels), ordered by label frequencies. So the most frequent label gets index 0.
# Fits a model to the input dataset with optional parameters.
# Transforms the input dataset with optional parameters.

# A new column 'country_in' is created as an indexer
indexer = StringIndexer(inputCol='Country', outputCol='country_in')
indexed = indexer.fit(data).transform(data)
indexed.columns

In [None]:
# VectorAssembler - A feature transformer that merges multiple columns into a vector column.
assembler16 = VectorAssembler(inputCols=['country_in',
 'Happiness Score',
 'Lower Confidence Interval',
 'Upper Confidence Interval',
 'Economy (GDP per Capita)',
 'Family',
 'Health (Life Expectancy)',
 'Freedom',
 'Trust (Government Corruption)',
 'Generosity',
 'Dystopia Residual'
 ], outputCol='features')

#transform() - Transforms the input dataset with optional parameters and returns transformed dataset.
output = assembler16.transform(indexed)
final_data = output.select('features', 'Happiness Rank')
# randomSplit - Randomly splits this RDD with the provided weights and returns split RDDs in a list. 
train_data, test_data = final_data.randomSplit([0.8,0.2], seed=1234)

In [None]:
# Linear regression - The learning objective is to minimize the squared error, with regularization. 
# The specific squared error loss function used is: L = 1/2n ||A coefficients - y||^2^
# This support multiple types of regularization:
#        none (a.k.a. ordinary least squares)
#        L2 (ridge regression)
#        L1 (Lasso)
#        L2 + L1 (elastic net)
# regParam - lambda
# elasticNetParam - alpha
lr=LinearRegression(labelCol='Happiness Rank', maxIter=10, regParam=0.3, elasticNetParam=0.8, 
                          fitIntercept=True, standardization=True, tol=1e-02)

# fit() - Fits a model to the input dataset with optional parameters and returns fitted model.
linearmodel = lr.fit(train_data)
# Generate predictions
predicted = linearmodel.transform(test_data)
# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("Happiness Rank").rdd.map(lambda x: x[0])

In [None]:
# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

In [None]:
# Coefficients for the model
linearmodel.coefficients

In [None]:
# Intercept for the model
linearmodel.intercept

In [None]:
# Number of iterations the model ran
linearmodel.summary.totalIterations

In [None]:
# Objective function (scaled loss + regularization) at each iteration.
linearmodel.summary.objectiveHistory

In [None]:
# Get the residuals of the fitted model by type.
linearmodel.summary.residuals.show()

In [None]:
# Get the RMSE
linearmodel.summary.rootMeanSquaredError

In [None]:
# Get the R2
linearmodel.summary.r2

In [None]:
# Returns the mean squared error, which is a risk function corresponding 
#to the expected value of the squared error loss or quadratic loss.
linearmodel.summary.meanSquaredError

In [None]:
training_summary = linearmodel.summary
# predictions - Predictions associated with the boundaries at the same index, monotone because of isotonic regression.
training_summary.predictions.describe().show()

In [None]:
# Importing RegressionEvaluator Libraries for evaluating the Linear Regression model 
from pyspark.ml.evaluation import RegressionEvaluator 

In [None]:
# Applied the Regression Evaluator to evaluate the label 'Happiness Rank'
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='Happiness Rank')

In [None]:
# Evaluation of the predicted values on a scale of 0 to 1
evaluator.evaluate(pred_and_labels.predictions,{evaluator.metricName: "r2"})

### Linear Regression for 2017

In [None]:
# Reading the data from csv file into the dataframe 'data'
data1 = spark.read.csv('WH_2017.csv', inferSchema=True, header=True)

In [None]:
# Printing the schema of the file
data1.printSchema()

In [None]:
# Importing Regular Expression libraries
# Substituting '.' to ' ' in all the column names
import re
data = data1.toDF(*(re.sub(r'[\.\s]+', ' ', c) for c in data1.columns))

In [None]:
# Displays only the features to know what all features are available
data.columns

In [None]:
# StringIndexer - A label indexer that maps a string column of labels to an ML column of label indices.
# If the input column is numeric, we cast it to string and index the string values.
# The indices are in [0, numLabels), ordered by label frequencies. So the most frequent label gets index 0.
# Fits a model to the input dataset with optional parameters.
# Transforms the input dataset with optional parameters.

# A new column 'country_in' is created as an indexer
indexer = StringIndexer(inputCol='Country', outputCol='country_in')
indexed = indexer.fit(data).transform(data)
indexed.columns

In [None]:
# VectorAssembler - A feature transformer that merges multiple columns into a vector column.
assembler17 = VectorAssembler(inputCols=['country_in',
 'Happiness Rank',
 'Happiness Score',
 'Whisker high',
 'Whisker low',
 'Economy GDP per Capita ',
 'Family',
 'Health Life Expectancy ',
 'Freedom',
 'Generosity',
 'Trust Government Corruption ',
 'Dystopia Residual'
 ], outputCol='features')

In [None]:
#transform() - Transforms the input dataset with optional parameters and returns transformed dataset.
output = assembler17.transform(indexed)
final_data = output.select('features', 'Happiness Rank')
# randomSplit - Randomly splits this RDD with the provided weights and returns split RDDs in a list. 
train_data, test_data = final_data.randomSplit([0.8,0.2], seed=1234)

In [None]:
# Linear regression - The learning objective is to minimize the squared error, with regularization. 
# The specific squared error loss function used is: L = 1/2n ||A coefficients - y||^2^
# This support multiple types of regularization:
#        none (a.k.a. ordinary least squares)
#        L2 (ridge regression)
#        L1 (Lasso)
#        L2 + L1 (elastic net)
# regParam - lambda
# elasticNetParam - alpha
lr=LinearRegression(labelCol='Happiness Rank', maxIter=10, regParam=0.3, elasticNetParam=0.8, 
                          fitIntercept=True, standardization=True, tol=1e-02)

In [None]:
# fit() - Fits a model to the input dataset with optional parameters and returns fitted model.
linearmodel = lr.fit(train_data)
# Generate predictions
predicted = linearmodel.transform(test_data)
# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("Happiness Rank").rdd.map(lambda x: x[0])

In [None]:
# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

In [None]:
# Coefficients for the model
linearmodel.coefficients

In [None]:
# Intercept for the model
linearmodel.intercept

In [None]:
# Number of iterations the model ran
linearmodel.summary.totalIterations

In [None]:
# Objective function (scaled loss + regularization) at each iteration.
linearmodel.summary.objectiveHistory

In [None]:
# Get the residuals of the fitted model by type.
linearmodel.summary.residuals.show()

In [None]:
# Get the RMSE
linearmodel.summary.rootMeanSquaredError

In [None]:
# Get the R2
linearmodel.summary.r2

In [None]:
# Returns the mean squared error, which is a risk function corresponding 
#to the expected value of the squared error loss or quadratic loss.
linearmodel.summary.meanSquaredError

In [None]:
training_summary = linearmodel.summary
# predictions - Predictions associated with the boundaries at the same index, monotone because of isotonic regression.
training_summary.predictions.describe().show()

In [None]:
# Importing RegressionEvaluator Libraries for evaluating the Linear Regression model 
from pyspark.ml.evaluation import RegressionEvaluator 

In [None]:
# Applied the Regression Evaluator to evaluate the label 'Happiness Rank'
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='Happiness Rank')

In [None]:
# Evaluation of the predicted values on a scale of 0 to 1
evaluator.evaluate(pred_and_labels.predictions,{evaluator.metricName: "r2"})

##                                                   

## Combining DataFrames

In [None]:
# Creating a new instance for SparkSession 'linearReg1'
spark = SparkSession.builder.appName('linearReg1').getOrCreate()
# Reading data from 3 csv files into respective dataframes 
df15_sample = spark.read.csv('WH_2015.csv', inferSchema=True, header=True)
df16_sample = spark.read.csv('WH_2016.csv', inferSchema=True, header=True)
df17_sample = spark.read.csv('WH_2017.csv', inferSchema=True, header=True)

In [None]:
# Importing all the pyspark sql functions
from pyspark.sql.functions import *

In [None]:
# Renaming the Happiness Rank column to Happiness_Rank_2015 by using alias method
df15 = df15_sample.select("Country", "Region", col("Happiness Rank").alias("Happiness_Rank_2015"))
df15.show()

In [None]:
# Renaming the Happiness Rank column to Happiness_Rank_2016 by using alias method
df16 = df16_sample.select("Country", "Region", col("Happiness Rank").alias("Happiness_Rank_2016"))
df16.show()

In [None]:
# Importing Regular Expression libraries
# Substituting '.' to ' ' in all the column names
import re
df17_space = df17_sample.toDF(*(re.sub(r'[\.\s]+', ' ', c) for c in df17_sample.columns))

In [None]:
# Renaming the Happiness Rank column to Happiness_Rank_2017 by using alias method
df17 = df17_space.select("Country", col("Happiness Rank").alias("Happiness_Rank_2017"))
df17.show()

In [None]:
# Joining all the 3 dataframes into one single dataframe by fullouter join
df = df15.join((df16.join(df17,['Country'],"fullouter")), ['Country','Region'], "fullouter").sort("Country")

In [None]:
# Count of total Number of rows
df.count()

In [None]:
# Checking for the column 'Country' for NULL values
df.where(col("Country").isNull()).count()

In [None]:
# Checking for the column 'Region' for NULL values
df.where(col("Region").isNull()).show()

In [None]:
# StringIndexer - A label indexer that maps a string column of labels to an ML column of label indices.
# If the input column is numeric, we cast it to string and index the string values.
# The indices are in [0, numLabels), ordered by label frequencies. So the most frequent label gets index 0.
# Fits a model to the input dataset with optional parameters.
# Transforms the input dataset with optional parameters.

# A new column 'region_in' is created as an indexer
from pyspark.ml.feature import StringIndexer
indexer1 = StringIndexer(inputCol='Region', outputCol='Region_in')
indexed1 = indexer1.fit(df).transform(df)
indexed1.columns

In [None]:
# Importing pyspark sql functions and naming them as 'sf'
import pyspark.sql.functions as sf

# Combining rows which have NULL values for a few columns
df_new = indexed1.groupBy("Country", "Region", "Happiness_Rank_2016")\
.agg(sf.max('Happiness_Rank_2015').alias('Happiness_Rank_2015'), sf.max('Happiness_Rank_2017').alias('Happiness_Rank_2017'))

In [None]:
# Total number of rows of the new dataframe
df_new.count()

In [None]:
# Filling NULL values with '0'
fill_df = df_new.na.fill(0)
fill_df.show()

### Q1. From 2015 to 2017, which country’s happiness ranking increased the most?

In [None]:
# Most Happiest Country
# Adding a new column 'Difference', which shows the ranking difference
new_df = fill_df.filter(~(fill_df['Happiness_Rank_2015'] == 0) & ~(fill_df['Happiness_Rank_2016'] == 0) & ~(fill_df['Happiness_Rank_2017'] == 0) & ~(fill_df['Region'] == '0')).withColumn('Difference', fill_df.Happiness_Rank_2015 - fill_df.Happiness_Rank_2017).sort(desc("Difference"))
new_df.show()

### Q2. From 2015 to 2017, which country’s happiness ranking decreased the most?

In [None]:
# Least Happiest
# Adding a new column 'Difference', which shows the ranking difference
new_df = fill_df.filter(~(fill_df['Happiness_Rank_2015'] == 0) & ~(fill_df['Happiness_Rank_2016'] == 0) & ~(fill_df['Happiness_Rank_2017'] == 0) & ~(fill_df['Region'] == '0')).withColumn('Difference', fill_df.Happiness_Rank_2015 - fill_df.Happiness_Rank_2017).sort("Difference")
new_df.show()

In [None]:
new_df.count()

### Q3. For each year, provide the ranking of the happiest continents.

In [None]:
# Calculating average of the 'Difference' column and applying groupBy
# to the 'Region' column, groups all the continents and gives out the Happiest Continent throughout.
final = new_df.filter(~(new_df['Happiness_Rank_2015'] == 0) & ~(new_df['Happiness_Rank_2016'] == 0) & ~(new_df['Happiness_Rank_2017'] == 0) & ~(new_df['Region'] == '0')).groupBy('Region').agg(avg('Difference').alias('Happiest Continent Rank')).sort(desc('Happiest Continent Rank'))
final.show(truncate=False)

### Happiest Continent in 2015

In [None]:
# Calculating average of the 'Happiness_Rank_2015' column and applying groupBy
# to the 'Region' column, groups all the continents and gives out the Happiest Continent for 2015.
final = new_df.filter(~(new_df['Happiness_Rank_2015'] == 0) & ~(new_df['Happiness_Rank_2016'] == 0) & ~(new_df['Happiness_Rank_2017'] == 0) & ~(new_df['Region'] == '0')).groupBy('Region').agg(avg('Happiness_Rank_2015').alias('Happiest Continent in 2015')).sort(desc('Happiest Continent in 2015'))
final.show(truncate=False)

### Happiest Continent in 2016

In [None]:
# Calculating average of the 'Happiness_Rank_2016' column and applying groupBy
# to the 'Region' column, groups all the continents and gives out the Happiest Continent for 2016.
final = new_df.filter(~(new_df['Happiness_Rank_2015'] == 0) & ~(new_df['Happiness_Rank_2016'] == 0) & ~(new_df['Happiness_Rank_2017'] == 0) & ~(new_df['Region'] == '0')).groupBy('Region').agg(avg('Happiness_Rank_2016').alias('Happiest Continent in 2016')).sort(desc('Happiest Continent in 2016'))
final.show(truncate=False)

### Happiest Continent in 2017

In [None]:
# Calculating average of the 'Happiness_Rank_2017' column and applying groupBy
# to the 'Region' column, groups all the continents and gives out the Happiest Continent for 2017.
final = new_df.filter(~(new_df['Happiness_Rank_2015'] == 0) & ~(new_df['Happiness_Rank_2016'] == 0) & ~(new_df['Happiness_Rank_2017'] == 0) & ~(new_df['Region'] == '0')).groupBy('Region').agg(avg('Happiness_Rank_2017').alias('Happiest Continent in 2017')).sort(desc('Happiest Continent in 2017'))
final.show(truncate=False)