Name: Stamatios Sideris

ID: f2822113

### We first create a sparkSession. It is responsible to create a sparkContext object that will help us communicate with Spark. As it will need the credentials of our application it also creates a SparkConf object that includes our application name and the location it will run.

In [1]:
from pyspark.sql import SparkSession
appName = "task3" #determine the name of the App
master = "local" #determine it will run locally
spark = SparkSession.builder.appName(appName).master(master).getOrCreate()

### We read the json file and print its schema

In [2]:
json_path = "books_5000.json"
df = spark.read.json(json_path)
df.printSchema()
df.show(3)

root
 |-- asin: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- edition_information: string (nullable = true)
 |-- format: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- is_ebook: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn13: string (nullable = true)
 |-- kindle_asin: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- link: string (nullable = true)
 |-- num_pages: string (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- pub

### Variables language_code, publication_year and num_pages include 1685, 1072 and 1382 missing values we need to handle.

In [3]:
[df.filter(df.language_code == "").count(), df.filter(df.publication_year == "").count(), df.filter(df.num_pages == '').count()]

[1685, 1072, 1382]

### To do so, we substitute the missing values with NAs for variables language_code and publication_year as we will handle as categorical moving forward, and we substitute the missing values with 0 for variable num_pages as we will handle it as numeric.

In [4]:
from pyspark.sql.functions import when
df = df.withColumn("language_code", when(df.language_code == "" ,"NA").otherwise(df.language_code))
df = df.withColumn("publication_year", when(df.publication_year == "" ,"NA").otherwise(df.publication_year))
df = df.withColumn("num_pages", when(df.num_pages == "" ,0).otherwise(df.num_pages))
df.select(df["language_code"], df["publication_year"], df["num_pages"]).show()

+-------------+----------------+---------+
|language_code|publication_year|num_pages|
+-------------+----------------+---------+
|           NA|              NA|        0|
|          fre|            2016|        0|
|          eng|            2012|      146|
|          eng|              NA|        0|
|        en-US|            1997|      272|
|           NA|            2007|      206|
|          eng|            2016|      224|
|           NA|            2016|      160|
|           NA|            2016|      160|
|           NA|            2016|      144|
|          kor|            2014|      212|
|          eng|            2011|      144|
|          eng|            2012|      200|
|           NA|            2012|      230|
|           NA|              NA|        0|
|          jpn|            2013|      157|
|          spa|            2006|      224|
|          zho|            2011|      176|
|           NA|            2006|      192|
|          eng|              NA|      192|
+----------

### We change the variables types of ratings_count and num_pages to integer and average_rating to double as it is more appropriate.

In [5]:
from pyspark.sql.types import *
df = df.withColumn("average_rating",df.average_rating.cast(DoubleType()))
df = df.withColumn("ratings_count",df.ratings_count.cast(IntegerType()))
df = df.withColumn("num_pages",df.num_pages.cast(IntegerType()))

### We split the dataset by 80-20 to training and testing. We use cache command to the training dataset as it will be used multiple times.

In [6]:
trainDF, testDF = df.randomSplit([0.8, 0.2], seed=42)
print(trainDF.cache().count()) 
print(testDF.count())

4041
958


### As variable language_code is a categorical variable we must change it to numeric for our algorithm to understand. To do so, we will use the StringIndexer that will convert each distinct category to numeric starting by giving to the most appeared category the number 0. We will treat variable publication_year the same way as we do not pay attention to the time-series. We will then pass the new numeric variables to the OneHotEncoder in order to create dummy variables of 0 and 1 for the categories.

In [7]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# We determine which of the columns are categorical.
categoricalCols = ["language_code", "publication_year"]

# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols]).setHandleInvalid("keep")
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols]) 

### The algorithm we will use requires a single features column as input. We use VectorAssembler to create a single vector column from a list of the features we will use for the prediction. Our final dataset has 2 columns: features (that includes predictors) and average_rating (the variable we want to predict). 

In [8]:
from pyspark.ml.feature import VectorAssembler
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["num_pages", "ratings_count"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

### Define the linear regression model

In [9]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="average_rating")

### We create a pipeline that will run all the above commands we prepared to get the train dataset ready for the training of our model. Then it will train our model. Finally, we use the trained model to make predictions using the testing dataset.

In [10]:
from pyspark.ml import Pipeline

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, vecAssembler, lr])

# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Apply the pipeline model to the test dataset to classify the respective samples.
predDF = pipelineModel.transform(testDF)

### The R2 for our model is very low meaning that our model is very bad at predicting the average rating of each book.

In [11]:
from pyspark.ml.evaluation import RegressionEvaluator
mcEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="average_rating",metricName="r2")
print(f"R2: {mcEvaluator.evaluate(predDF)}")

R2: 0.02223652980479096
