# Happiness Dataset
This is a Regression example for predicting a numerical value. We first need to download Happiness dataset and we are going to predict Happiness rank from Happiness score of a country.

In [3]:
%sh
curl https://raw.githubusercontent.com/tofighi/dataset/main/big-data/happiness/happiness_2020.csv --output happiness.csv --silent
mv happiness.csv  /var/tmp/

Just a bunch of import statements, you can run these in a paste command


In [5]:
import org.apache.spark.sql.functions._
import org.apache.spark.ml.feature.{VectorAssembler}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.regression.{LinearRegression}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
import org.apache.spark.ml.evaluation.{RegressionEvaluator}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.types.{DoubleType}

-- Loading our CSV file, note the inferSchema option being set to true

In [7]:
val data = spark.read
 .format("csv")
 .option("sep", ",")
 .option("inferSchema", "true")
 .option("header", "true")
 .load("file:///var/tmp/happiness.csv")

-- Here we are just selecting two columns from our dataset --> The happiness rank and happiness score

In [9]:
val rank_score = data.select(col("Happiness Rank").cast(DoubleType), col("Happiness Score").cast(DoubleType))

In [10]:
rank_score.show(5)


-- Next we split the dataset into training and test sets
-- We use a randomSplit function with a split percentage of 80%, and 20%
-- The second argument to the function is the seed, it is used to get the same random results every time (for reproducibility of the results)

In [12]:
val Array(trainingData, testData) = rank_score.randomSplit(Array(0.8, 0.2), seed=1111) 

## Machine Learning Steps

-- All the following steps will be the same for all machine learning problems

-- Preparing features and labels 

-- We need to prepare our features and labels to supply it to our algorithm, in this case linear regression
-- Remember features are the input data to our algorithm and labels are the values our algorithm is going to train the model to predict
-- Features are also called independent variables
-- Lables are also called dependent variables

-- In our problem, we only have one FEATURE, it is the happiness score
-- We will predict the happiness rank, which becomes the LABEL

-- We will pass an array to the InputCols with the name of all the features 
-- And we will simply give the name of the output column

-- Think of a VectorAssembler as an Array
-- Vector assembler can be used to package one feature or multiple features into a vector
-- And this will be our features column
-- We pass an array to inputCols with the name of all the features we want to assemple

In [14]:
val assembler = new VectorAssembler()
.setInputCols(Array("Happiness Score"))
.setOutputCol("assembled-features")


-- Next we will instantiate our algorithm, in this case linear regression, and set the features and label column
-- In this case, the features column will be the output from VectorAssembler which is the assembled features
-- And label is the value linear regression will try to predict which is the happiness rank
-- Features column will be the output form the vector assempler

In [16]:
val lr = new LinearRegression() 
 .setFeaturesCol("assembled-features")
 .setLabelCol("Happiness Rank")


-- Next we create a pipeline for everything that needs to be executed
-- Think of the pipeline as the assembly line in the factory where all the parts for a product are assembled together
-- This is what we are doing with pipeline
-- We create STAGES in pipelines
-- In this example, our pipeline has only two stages
-- The first stage is where we assemble the features with VectorAssembler
-- And the second stage, we'll specify the algorithm that is used to train the model


In [18]:
%spark
val pipeline = new Pipeline()
 .setStages(Array(assembler, lr))

-- Once we train the model, we need to evaluate the model for accuracy
-- Since this is a regression problem, we will use RegressionEvaluator to evaluate the model
-- The model, once it performs the predictions, it will store the prediction results in the predition column
-- Once a prediction is made, we need to evalue the prediction with the actual value, so we know how good the prediction is
-- So in our case, our model will predict the ranking based on our score
-- We need to see if the the prediction matches the actual or not
-- If it doesn't match, we want to know how far the prediction is from the actual value
-- We will use a metric called r squared to show the accuracy of the model prediction
-- r squared is also called coefficient of determination
-- r squared basically tells us how good our model is 
-- The higher the r2 model the better our model
-- r2 is a value between 0 and 1, and as much as it is closer to 1 it is better)

In [20]:
%spark
val evaluator = new RegressionEvaluator()
 .setLabelCol("Happiness Rank")
 .setPredictionCol("prediction")
 .setMetricName("r2")

-- Next we setup our cross validator
-- cross validator is an interesting concept
-- We need to provide two things to the CrossValidator
-- An estimator and an evaluator
-- Estimator we provide the pipeline
-- For the evaluator we provide regression evaluator 
-- We can provide additinoal parameters called "hyper parameters" which can be used for tuning our model
-- We don't have any hyper parameters so we create an instance of the parameter Grid Builder and call the build method
-- This will provide an empty parameter map
-- Finally we specify the fold as 3
-- This means the cross validator will generate three sets of data from our initial training dataset
-- In other words it "folds" the data into 3 and from each sets of data, it will use 2/3 of the data for training and 1/3 of the data for testing
-- And then it will fix the model based on the best accuracy based on the defined evaluation metric, in our case r2

In [22]:
%spark
val cross_validator = new CrossValidator()
 .setEstimator(pipeline)
 .setEvaluator(evaluator)
 .setEstimatorParamMaps(new ParamGridBuilder().build)
 .setNumFolds(3)

-- Next we call fit on the cross validator passing our trainig dataset
-- CrossValidator will now fold the dataset into 3, divide each subset into training and test set
-- Inside each fold, it will use the training set to train and test set to evaluate the model based on r2
-- The best of the three models is returned 

In [24]:
%spark
val cvModel = cross_validator.fit(trainingData)


-- Now cvModel is a trained model that we can use to make predictions
-- We can actually save this model and reuse it as well
-- We'll see how to do that later

-- Before we can use our model on the real data, let's test our data on the real data to see how good it is
-- We now called cvModel.transform on the testData set
-- when we call transform, our model will go over the dataset and make predictions

In [26]:
%spark
val predictions = cvModel.transform(testData)

In [27]:
%spark
predictions.show()

-- Remember, the directory SHOULD NOT EXIST, remove it first if it does
-- On HDFS:
hadoop fs -rm -r /BigData/happiness/output/
-- On local:
rm -r /var/tmp/output
-- Once the model has made predictions, we are just selecting the "Happiness Rank", "Happiness Socre" and the prediction column from the model
-- and write it as a CSV file in a location in HDFS

In [29]:
%sh
rm -r /var/tmp/output

In [30]:
%sh
ls /var/tmp

In [31]:
predictions
 .select(col("Happiness Rank"), col("Happiness Score"), col("prediction"))
 .write
 .format("csv")
 .save("file:///var/tmp/output1")

-- Finally, we call the evaluator method on the evaluator passing the preditions DataFrame
-- It will give us back the r2 result
-- R2 is presented as a percentage, if we get 0.9, means our model is 90% accurate

In [33]:
%spark
val r2 = evaluator.evaluate(predictions)
println("r-squared on test data = " + r2)

In [34]:
%spark
