# Apache Spark Tutorial

This exercise was realised following Evan Heitman guide: https://towardsdatascience.com/a-neanderthals-guide-to-apache-spark-in-python-9ef1f156d427

## Setting up Spark in Google Colab

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
import findspark

In [0]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [0]:
findspark.init()

In [0]:
from pyspark.sql import SparkSession
from google.colab import files
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

## Preprocessing

In [0]:
# Upload manually files to Google Colab
# files.upload()
# Display all operation results in Jupyter notebook
# from IPython.core.interactiveshell import InteractiveShell
# InteractiveShell.ast_node_interactivity = "all"

In [0]:
df = spark.read.csv('Video_Games_Sales_as_at_22_Dec_2016.csv',inferSchema=True, header=True)

In [9]:
df.count(), len(df.columns)

(16719, 16)

In [10]:
df.show()

+--------------------+--------+---------------+------------+--------------------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+-------------------+------+
|                Name|Platform|Year_of_Release|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|          Developer|Rating|
+--------------------+--------+---------------+------------+--------------------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+-------------------+------+
|          Wii Sports|     Wii|           2006|      Sports|            Nintendo|   41.36|   28.96|    3.77|       8.45|       82.53|          76|          51|         8|       322|           Nintendo|     E|
|   Super Mario Bros.|     NES|           1985|    Platform|            Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|        null|        null|     

In [11]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = true)
 |-- User_Score: string (nullable = true)
 |-- User_Count: integer (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Rating: string (nullable = true)



In [12]:
df.select("Name","Platform","User_Score","User_Count").show(20, truncate=False)

+--------------------------------------------+--------+----------+----------+
|Name                                        |Platform|User_Score|User_Count|
+--------------------------------------------+--------+----------+----------+
|Wii Sports                                  |Wii     |8         |322       |
|Super Mario Bros.                           |NES     |null      |null      |
|Mario Kart Wii                              |Wii     |8.3       |709       |
|Wii Sports Resort                           |Wii     |8         |192       |
|Pokemon Red/Pokemon Blue                    |GB      |null      |null      |
|Tetris                                      |GB      |null      |null      |
|New Super Mario Bros.                       |DS      |8.5       |431       |
|Wii Play                                    |Wii     |6.6       |129       |
|New Super Mario Bros. Wii                   |Wii     |8.4       |594       |
|Duck Hunt                                   |NES     |null     

In [13]:
df.describe(["User_Score","User_Count"]).show()

+-------+------------------+------------------+
|summary|        User_Score|        User_Count|
+-------+------------------+------------------+
|  count|             10015|              7590|
|   mean|7.1250461133070315|162.22990777338603|
| stddev|1.5000060936257986| 561.2823262473789|
|    min|                 0|                 4|
|    max|               tbd|             10665|
+-------+------------------+------------------+



In [14]:
df.describe(["User_Score","User_Count"]).show()

+-------+------------------+------------------+
|summary|        User_Score|        User_Count|
+-------+------------------+------------------+
|  count|             10015|              7590|
|   mean|7.1250461133070315|162.22990777338603|
| stddev|1.5000060936257986| 561.2823262473789|
|    min|                 0|                 4|
|    max|               tbd|             10665|
+-------+------------------+------------------+



In [15]:
df.groupBy("Platform").count().orderBy("count", ascending=False).show(15)

+--------+-----+
|Platform|count|
+--------+-----+
|     PS2| 2161|
|      DS| 2152|
|     PS3| 1331|
|     Wii| 1320|
|    X360| 1262|
|     PSP| 1209|
|      PS| 1197|
|      PC|  974|
|      XB|  824|
|     GBA|  822|
|      GC|  556|
|     3DS|  520|
|     PSV|  432|
|     PS4|  393|
|     N64|  319|
+--------+-----+
only showing top 15 rows



In [0]:
# Eliminating all the Not a Number values and tbd (to be defined) from the dataset
condition1 = (df.User_Score.isNotNull()) | (df.User_Count.isNotNull())
condition2 = df.User_Score != "tbd"
df = df.filter(condition1).filter(condition2)

## Machine Learning

Creating a Linear Regression model able to predict the user score using the year of release, global sales, critic score and user count.

Further documentation: https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html, https://spark.apache.org/docs/2.2.0/ml-classification-regression.html

In [0]:
df = df.withColumn("Year_of_Release", df["Year_of_Release"].cast(DoubleType()))
df = df.withColumn("User_Score", df["User_Score"].cast(DoubleType()))
df = df.withColumn("User_Count", df["User_Count"].cast(DoubleType()))
df = df.withColumn("Critic_Score", df["Critic_Score"].cast(DoubleType()))

In [0]:
# Creating a dataframe suitable for Machine Learning using Spark
inputcols = ["Year_of_Release",  "Global_Sales", "Critic_Score", "User_Count"]
assembler = VectorAssembler(inputCols= inputcols,
                            outputCol = "predictors")

In [19]:
predictors = assembler.setHandleInvalid("skip").transform(df)
predictors.columns
# In predictors are stored all the coulumns we want to use to make predictions for our machine learning model

['Name',
 'Platform',
 'Year_of_Release',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales',
 'Critic_Score',
 'Critic_Count',
 'User_Score',
 'User_Count',
 'Developer',
 'Rating',
 'predictors']

In [20]:
mldf = predictors.select("predictors", "User_Score")
mldf.show(15,truncate=False)

+--------------------------+----------+
|predictors                |User_Score|
+--------------------------+----------+
|[2006.0,82.53,76.0,322.0] |8.0       |
|[2008.0,35.52,82.0,709.0] |8.3       |
|[2009.0,32.77,80.0,192.0] |8.0       |
|[2006.0,29.8,89.0,431.0]  |8.5       |
|[2006.0,28.92,58.0,129.0] |6.6       |
|[2009.0,28.32,87.0,594.0] |8.4       |
|[2005.0,23.21,91.0,464.0] |8.6       |
|[2007.0,22.7,80.0,146.0]  |7.7       |
|[2010.0,21.81,61.0,106.0] |6.3       |
|[2009.0,21.79,80.0,52.0]  |7.4       |
|[2013.0,21.04,97.0,3994.0]|8.2       |
|[2004.0,20.81,95.0,1588.0]|9.0       |
|[2005.0,20.15,77.0,50.0]  |7.9       |
|[2013.0,16.27,97.0,3711.0]|8.1       |
|[2002.0,16.15,95.0,730.0] |8.7       |
+--------------------------+----------+
only showing top 15 rows



In [0]:
train_data,test_data = mldf.randomSplit([0.8,0.2])

In [0]:
lr = LinearRegression(featuresCol = 'predictors', labelCol = 'User_Score')
lrModel = lr.fit(train_data)
pred = lrModel.evaluate(test_data)

In [23]:
lrModel.coefficients

DenseVector([-0.0764, -0.0342, 0.0625, -0.0002])

In [24]:
lrModel.intercept

156.1480615005749

In [25]:
pred.predictions.show(5)

+--------------------+----------+-----------------+
|          predictors|User_Score|       prediction|
+--------------------+----------+-----------------+
|[1997.0,0.42,91.0...|       7.8|9.323316373812503|
|[1997.0,0.5,66.0,...|       8.2|7.757854836381256|
|[1997.0,0.89,83.0...|       8.2|8.805071218707724|
|[1997.0,1.01,86.0...|       8.3|8.990844731977234|
|[1997.0,1.27,93.0...|       9.4|9.353394380544671|
+--------------------+----------+-----------------+
only showing top 5 rows



In [0]:
eval = RegressionEvaluator(labelCol="User_Score", predictionCol="prediction", metricName="rmse")

In [0]:
rmse = eval.evaluate(pred.predictions)
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})

In [28]:
print("Overall model loss using RMSE, MSE, MAE and R2")
rmse, mse, mae, r2

Overall model loss using RMSE, MSE, MAE and R2


(1.121893877539846, 1.258645872461391, 0.8471111698518611, 0.44572418741667963)