# Course         : Big Data Systems and Architectures 

# Assignment : Spark & Airflow 

# Task3



Student Name:Athanasios Alexandris

Student id : p2822202

Msc : Business Analytics PT, AUEB

As first steps in Task3, we created a SparkSession using the SparkSession.builder API with application name 'q3'. The getOrCreate() method is called to either get an existing SparkSession with the same application name or create a new one if it doesn't exist. So we created a new one.

In [1]:
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder.appName('q3').getOrCreate()

23/04/14 12:37:36 WARN Utils: Your hostname, MacBook-Pro-4.local resolves to a loopback address: 127.0.0.1; using 192.168.23.32 instead (on interface en0)
23/04/14 12:37:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/14 12:37:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Next we used the read.json() method to read the JSON file that contains our dataset and load it into a DataFrame in Spark. The dataframe is being cached to memory, allowing Spark to access it quickly without having to read it from the JSON file again. The cache() does not immediately cache the data, but instead marks it for caching. The data will be cached in memory the first time an action operation (e.g., show(), count(), etc.) is performed on the DataFrame. Also we displayed the first 20 rows of the dataset. Finally we decided to drop the rows that contain null values.

In [3]:
df = spark.read.json('/Users/thanosalexandris/Downloads/movie.json', multiLine=True)

                                                                                

In [4]:
df.show(20)

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+------+-------+--------------------+--------------------+------------+---------+----+
|              actors|           countries|         description|           directors|               genre|            imdb_url|             img_url|           languages|metascore|rating|runtime|             tagline|               title|users_rating|    votes|year|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+------+-------+--------------------+--------------------+------------+---------+----+
|[Timothée Chalame...|               [USA]|A young couple ar...|       [Woody Allen]|   [Comedy, Romance]|https://www.imdb....|https://m.media-a...|           [English]|       44| PG-13| 92 min|           

                                                                                

Next, we decided to create a new dataframe, that contains only the columns that we are interested in. For the 'users rating' column we transform it to type double, for the 'metascore' column we transform it to integer, for the 'runtime' column we split the column with space keeping only the first field and tranform it to integer. For te columns 'genre' and 'languages' we used the expr() method to keep only the first field for each movie.

In [5]:
from pyspark.sql.functions import *

In [6]:
df2=df.select(df["users_rating"].cast("double"),df["metascore"].cast("integer"),(split("runtime", " ").getItem(0).cast("integer")).alias("runtime"),expr("genre[0]").alias("genre"),expr("languages[0]").alias("languages"))

In [7]:
df2.show(200)

+------------+---------+-------+---------+---------+
|users_rating|metascore|runtime|    genre|languages|
+------------+---------+-------+---------+---------+
|         6.6|       44|     92|   Comedy|  English|
|         2.4|     null|     91|   Horror|  English|
|         7.8|       61|    103|   Comedy|  English|
|         7.3|       73|    128|    Crime|  English|
|         7.0|       66|     97|   Comedy|  English|
|         5.3|       64|     87|  Fantasy|  English|
|         6.2|       51|    128|   Action|  English|
|         8.0|       69|    143|   Action|  English|
|         8.3|       70|    126|    Drama|  English|
|         8.3|       84|    149|Adventure|  English|
|         8.3|       65|    170|    Crime|  English|
|         7.6|       78|    102|   Comedy|  English|
|         5.3|       45|     90|    Drama|  English|
|         8.3|       76|    116|    Drama|  English|
|         7.4|       63|    161|Adventure|  English|
|         8.1|       72|    116|    Drama|  En

In [8]:
df2=df2.dropna()

As our goal is to create a regression model that makes predictions we had to train and evaluate the model we building, by splitting our dataset to train and test dataframes. The train dataframe contains 80% of random data the test dataset 20%.  We used randomSplit() method and we set a particular number in seed, so every time we rerun the command we take exactly the same split. Also we used cache() function to keep the train dataset in the memory for perfomans reasons, as it is much bigger than the test dataset.

In [9]:
trainDF, testDF = df2.randomSplit([0.8, 0.2], seed=999)

print(trainDF.cache().count()) # Cache because accessing training data multiple times

print(testDF.count())

                                                                                

7232
1797


[Stage 6:>                                                          (0 + 1) / 1]                                                                                


Next we had to manipulate the features to be in the appropriate MLlib format. As we want to implement linear regression the features have to be in numeric format, so we had to tranform the categorical columns 'genre', 'languages', to numeric, by using the 'one hot encoding' aproach, that converts that converts categorical variables into a set of numeric variables that only take on values 0 and 1. For this task first we used the StringIndexer that converts a column of string values to a column of label indexes, and then OneHotEncoder that maps a column of category indices to a column of binary vectors, with at most one "1" in each row that indicates the category index for that row. Then we used the .fit() method to return a StringIndexerModel, that used to transform the dataset.

In [10]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# We determine which of the columns are categorical.
categoricalCols = ["genre", "languages"]

# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols]).setHandleInvalid('keep') 
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols]) 
 


In [11]:
stringIndexerModel = stringIndexer.fit(trainDF)
stringIndexerModel.transform(trainDF).show(5)

[Stage 9:>                                                          (0 + 1) / 1]                                                                                

+------------+---------+-------+------+---------+----------+--------------+
|users_rating|metascore|runtime| genre|languages|genreIndex|languagesIndex|
+------------+---------+-------+------+---------+----------+--------------+
|         1.4|       18|     79|Comedy|  English|       0.0|           0.0|
|         1.9|        7|     91|Comedy|  English|       0.0|           0.0|
|         1.9|        9|     88|Comedy|  English|       0.0|           0.0|
|         1.9|       15|     87|Comedy|  English|       0.0|           0.0|
|         2.0|       15|     90|Action|  English|       2.0|           0.0|
+------------+---------+-------+------+---------+----------+--------------+
only showing top 5 rows



Then we used the 'VectorAssembler' transformer to create a single vector column from a list of columns, as the MLlib algorithm we are using requires a single features column as input. Each row in this column contains a vector of data points corresponding to the set of features used for prediction.

In [12]:
from pyspark.ml.feature import VectorAssembler

# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["metascore", "runtime"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

Next we defined the linear regression model that we use, defining the response column and the features data and seting a small number in regParam

In [13]:
from pyspark.ml.regression import LinearRegression



In [14]:
lr = LinearRegression(featuresCol="features", labelCol="users_rating", regParam=0.05)

Next we defined a pipeline (ordered list of transformers and estimators) with the same steps that we implemented to the train dataset, to apply the same automate transformations to the test dataset. Then we used the pipeline.fit() method that returns a PipelineModel, which is a transformer.

In [15]:
from pyspark.ml import Pipeline

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, vecAssembler, lr])

# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Apply the pipeline model to the test dataset to classify the respective samples.
predDF = pipelineModel.transform(testDF)

23/04/14 12:37:59 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/04/14 12:37:59 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/04/14 12:38:00 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [16]:
predDF.select("features", "users_rating", "prediction").show(5)

+--------------------+------------+-----------------+
|            features|users_rating|       prediction|
+--------------------+------------+-----------------+
|(57,[1,18,55,56],...|         1.8|4.137592268247825|
|(57,[2,18,55,56],...|         2.1|5.370812849284812|
|(57,[0,18,55,56],...|         2.2|5.024887038983677|
|(57,[2,18,55,56],...|         2.5|4.789723958889092|
|(57,[0,18,55,56],...|         2.5|5.326513549594811|
+--------------------+------------+-----------------+
only showing top 5 rows



To evaluate our model we used the RegressionEvaluator(), defining the predictions and the true labels, and setting as metring the R-Squared to measure the goodness of fit of the model. The Rsquared of our model is 0.53

In [17]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="users_rating",metricName="r2")
print("Rsquared = %g" % evaluator.evaluate(predDF))

Rsquared = 0.530499


[Stage 19:>                                                         (0 + 1) / 1]                                                                                

Finally we used ParamGridBuilder and CrossValidator to tune the modelusing three values for both regParam and  elasticNetParam, for a total of 9 hyperparameter combinations for CrossValidator to examine. We created a 3-fold cross validator, that automatically tracks all of the runs using MLflow. We used the pipeline we created as the estimator and MLlib  automatically tracks trials in MLflow. Then we fit the cross validator to our train data set to return the best model found from the cross validation, and then we used that model to make predictions on the test dataset. The Rsquared of the best model of the cross validation is 0.5321.

In [19]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

# Create a 3-fold CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3, parallelism = 4)

# Run cross validations. This step takes a few minutes and returns the best model found from the cross validation.
cvModel = cv.fit(trainDF)

# Use the model identified by the cross-validation to make predictions on the test dataset
cvPredDF = cvModel.transform(testDF)

# Evaluate the model's performance based on area under the ROC curve and accuracy 
print(f"Rsquared: {evaluator.evaluate(cvPredDF)}")

Rsquared: 0.5321272552009226
