# Spark Assignment
---

> *Name:* Panagiota Gkourioti <br />
> *Student ID:* p2822109 <br />
> *Course:* Big Data Systems and Architectures <br />
> *Professor:* Thanasis Vergoulis <br />

> Department of Management Science and Technology <br />
> Athens University of Economics and Business <br />

## Task 3

For the final task, we will investigate if it is possible to train a linear regression model that could predict the “average_rating” of a book, using as input, its “language_code”, its “num_pages”, its “ratings_count”, and its
“publication year”, using Python and Dataframes with MLlib. Specifically, the notebook will focus on:
- preparing the feature vectors
- preparing the training and testing datasets (70%-30%)
- training the model
- evaluating the accuracy of the model 

## Import packages

In [1]:
# import packages
import findspark
#findspark.init('C:\spark\spark-3.2.1-bin-hadoop3.2') for local installation of spark
from pyspark.sql.session import SparkSession
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
import pyspark.sql.types as T
import pyspark.sql.functions as F 
from pyspark.sql.functions import col,when,count
import pyspark.mllib
import pyspark.mllib.regression
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType

# import packages for linear regression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

## Load & clean data 

The first step of our analysis is to import and inspect the data, ensuring that all columns have the appropriate data type and checking for missing or invalid values.

In [2]:
# load the data 
books = spark.read.json("books_5000.json")

In [3]:
# create new data frame with the desired features
df = books.select('average_rating', 'language_code', 'num_pages', 'ratings_count', 'publication_year')

In [4]:
# check data types
df.dtypes

[('average_rating', 'string'),
 ('language_code', 'string'),
 ('num_pages', 'string'),
 ('ratings_count', 'string'),
 ('publication_year', 'string')]

In [5]:
# convert to correct data types
df = df.withColumn("average_rating", df["average_rating"].cast(FloatType()))
df = df.withColumn("num_pages", df["num_pages"].cast(IntegerType()))
df = df.withColumn("ratings_count", df["ratings_count"].cast(IntegerType()))

In [6]:
# check if corrected
df.dtypes

[('average_rating', 'float'),
 ('language_code', 'string'),
 ('num_pages', 'int'),
 ('ratings_count', 'int'),
 ('publication_year', 'string')]

In [7]:
# check distinct values for publication year to find any missing or invalid data
df.select('publication_year').distinct().sort('publication_year').show(30)

+----------------+
|publication_year|
+----------------+
|                |
|             162|
|            1945|
|            1951|
|            1960|
|            1961|
|            1962|
|            1963|
|            1965|
|            1966|
|            1968|
|            1970|
|            1973|
|            1974|
|            1975|
|            1976|
|            1977|
|            1978|
|            1979|
|            1980|
|            1981|
|            1982|
|            1983|
|            1984|
|            1985|
|            1986|
|            1987|
|            1988|
|            1989|
|            1990|
+----------------+
only showing top 30 rows



In [8]:
# check for missing/null values in all columns
missing = df.select([count(when((col(c)=='')|col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------------+-------------+---------+-------------+----------------+
|average_rating|language_code|num_pages|ratings_count|publication_year|
+--------------+-------------+---------+-------------+----------------+
|             0|         1685|     1382|            0|            1072|
+--------------+-------------+---------+-------------+----------------+



It is observed that publication year has an invalid value (162), which is assumed to be 1962, as well as many missing values. Therefore, we make the following corrections. We also replace missing values for categorical data with the word 'Unknown' and remove the rows where three values are missing at the same time.

In [9]:
# replace invalid value 162 in publication_year with the possibly correct one, 1962
df = df.withColumn('publication_year', when(df.publication_year == '162', '1962').otherwise(df.publication_year))

In [10]:
# replace publication_year missing values with the word unknown
df = df.withColumn('publication_year', when(df.publication_year == '', 'Unknown').otherwise(df.publication_year))

In [11]:
# replace language_code missing values with the word unknown
df = df.withColumn('language_code', when(df.language_code == '', 'Unknown').otherwise(df.language_code))

In [12]:
# remove rows where language_code, num_pages and publication_year are missing at the same time, because the information
# of ratings_count exclusively would not be sufficient
df = df.filter(~(df.num_pages.isNull()&(df.language_code == 'Unknown')&(df.publication_year == 'Unknown')))

Finally, we fill null values for number of pages with their average.

In [13]:
# calculate the average number of pages
df.agg({'num_pages': 'avg'}).show()

+-----------------+
|   avg(num_pages)|
+-----------------+
|169.7041747304396|
+-----------------+



In [14]:
# use the average number of pages to replace missing values 
df = df.withColumn('num_pages', when(df.num_pages.isNull(), 170).otherwise(df.num_pages))

In [15]:
# inspect the first lines after the data cleaning
df.show()

+--------------+-------------+---------+-------------+----------------+
|average_rating|language_code|num_pages|ratings_count|publication_year|
+--------------+-------------+---------+-------------+----------------+
|          3.94|          fre|      170|           16|            2016|
|          4.28|          eng|      146|           51|            2012|
|          4.05|          eng|      170|            6|         Unknown|
|          4.06|        en-US|      272|           51|            1997|
|          3.44|      Unknown|      206|           46|            2007|
|          4.15|          eng|      224|           39|            2016|
|          3.16|      Unknown|      160|           38|            2016|
|          3.51|      Unknown|      160|           44|            2016|
|           4.0|      Unknown|      144|           32|            2016|
|          4.41|          kor|      212|          133|            2014|
|          3.16|          eng|      144|          114|          

## Prepare data for training & testing

The following step is to split the data into 70% as training and 30% as a testing set.  
Moreover, here we are using a `seed` so that for each run we have exactly the same split (for reproducibility). 

In [16]:
trainDF, testDF = df.randomSplit([0.7, 0.3], seed=42)

print(trainDF.cache().count()) # cache because of accessing training data multiple times

print(testDF.count())

3169
1284


We can then see interesting statistics about the numerical attribute, num_pages, and the number of appearances of a categorical attribute, language_code.

In [17]:
trainDF.select("num_pages").summary().show()

+-------+------------------+
|summary|         num_pages|
+-------+------------------+
|  count|              3169|
|   mean|170.75039444619753|
| stddev|100.93238554920669|
|    min|                 4|
|    25%|               120|
|    50%|               170|
|    75%|               192|
|    max|              1192|
+-------+------------------+



In [18]:
trainDF.groupby("language_code").count().show()

+-------------+-----+
|language_code|count|
+-------------+-----+
|          fre|  151|
|          zho|    4|
|        en-CA|    7|
|          fin|   31|
|          ind|  143|
|          nor|    7|
|          ben|    3|
|          pol|    6|
|          vie|    6|
|          por|   26|
|          swe|   20|
|      Unknown|  820|
|          cze|   22|
|          eng| 1145|
|          jpn|  150|
|           nl|   18|
|          dan|    4|
|        en-GB|   44|
|          gre|   12|
|          bos|    1|
+-------------+-----+
only showing top 20 rows



## Feature preprocessing
The next step is to manipulate the features so they are in the format MLlib requires. Since we have two categorical variables, we have to use one hot encoding, that converts categorical variables into a set of numeric variables that only take on values 0 and 1. We first use the *StringIndexer*, which converts a column of string values to a column of label indexes. 
Then, *OneHotEncoder* maps a column of category indices to a column of binary vectors, with at most one "1" in each row that indicates the category index for that row. 

In [20]:
categoricalCols = ["language_code", "publication_year"]

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols]) 
encoder = OneHotEncoder(inputCols=stringIndexer.setHandleInvalid("skip").getOutputCols(), 
                        outputCols=[x + "OHE" for x in categoricalCols],dropLast=False) 

We can also call the .fit() method to return a StringIndexerModel, which we can then use to transform the dataset.

The .transform() method of StringIndexerModel returns a new DataFrame with the new columns appended.

In [21]:
stringIndexerModel = stringIndexer.fit(trainDF)
stringIndexerModel.transform(trainDF).show()

+--------------+-------------+---------+-------------+----------------+------------------+---------------------+
|average_rating|language_code|num_pages|ratings_count|publication_year|language_codeIndex|publication_yearIndex|
+--------------+-------------+---------+-------------+----------------+------------------+---------------------+
|           2.0|      Unknown|      144|            1|            2017|               1.0|                 10.0|
|           2.0|          ind|      128|            1|            2010|               5.0|                  8.0|
|          2.33|      Unknown|      368|            3|            2002|               1.0|                 16.0|
|          2.35|      Unknown|      104|           43|            2008|               1.0|                  9.0|
|          2.38|      Unknown|      170|            8|            1996|               1.0|                 22.0|
|          2.53|      Unknown|      170|           30|            2017|               1.0|      

Finally, we gather all these vectors together with VectorAssembler and transform it into a single vector 
column. We will pipe this vector in our regression model, fit the data into the 
model and run a prediction. 

In [22]:
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ['num_pages', 'ratings_count']
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features').setHandleInvalid("skip")

## Define the model
We are going to use a linear model.

In [23]:
lr = LinearRegression(featuresCol="features", labelCol='average_rating', regParam=0.5)

## Build the pipeline

In this step, we define the pipeline and then apply it to the test dataset. A `Pipeline` is an estimator. The `pipeline.fit()` method returns a `PipelineModel`, which is a transformer.

In [24]:
# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, vecAssembler, lr])

# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Apply the pipeline model to the test dataset to classify the respective samples.
predDF = pipelineModel.transform(testDF)

We can then display the predictions of the model.

In [25]:
predDF.select("features", "average_rating", "prediction").show(5)

+--------------------+--------------+------------------+
|            features|average_rating|        prediction|
+--------------------+--------------+------------------+
|(98,[1,43,96,97],...|          2.29|3.8426251582535698|
|(98,[0,43,96,97],...|           2.5|3.9162843462801167|
|(98,[1,53,96,97],...|          2.55|  3.87091053939996|
|(98,[1,42,96,97],...|          2.57|3.8306982078773233|
|(98,[1,54,96,97],...|          2.67| 3.868346771986987|
+--------------------+--------------+------------------+
only showing top 5 rows



## Evaluate the model
To evaluate the accuracy of the model on unseen data, we use the the `RegressionEvaluator` and calculate the R Squared and Root Mean Squared Error (RMSE) metrics.

In [26]:
lr_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='average_rating', metricName='r2')
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predDF))

R Squared (R2) on test data = 0.0466514


In [27]:
lr_evaluator2 = RegressionEvaluator(predictionCol='prediction', labelCol='average_rating', metricName='rmse')
print("Root Mean Squared Error (RMSE) on test data = %g" % lr_evaluator2.evaluate(predDF))

Root Mean Squared Error (RMSE) on test data = 0.446748


It appears that the model performs very poorly in terms of prediction. The R2 on test data is only 4,7%, very low, and the RMSE is 44,7%, rather high. To increase the predictive ability of the model, we could possibly add more predictors, that explain the book's average rating better.