In [58]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

In [59]:
data = spark.read.csv('data/vgsales.csv',inferSchema=True, header=True)

In [60]:
data.count(), len(data.columns)

(16719, 16)

In [61]:
data.show(5)

+--------------------+--------+---------------+------------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|                Name|Platform|Year_of_Release|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+--------------------+--------+---------------+------------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|          Wii Sports|     Wii|           2006|      Sports| Nintendo|   41.36|   28.96|    3.77|       8.45|       82.53|          76|          51|         8|       322| Nintendo|     E|
|   Super Mario Bros.|     NES|           1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|        null|        null|      null|      null|     null|  null|
|      Mario Kart Wii|     Wii|           2008|      Racing|

In [62]:
data.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = true)
 |-- User_Score: string (nullable = true)
 |-- User_Count: integer (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Rating: string (nullable = true)



In [63]:
data.select("Name","Platform","User_Score","User_Count") \
.show(15, truncate=False)

+---------------------------+--------+----------+----------+
|Name                       |Platform|User_Score|User_Count|
+---------------------------+--------+----------+----------+
|Wii Sports                 |Wii     |8         |322       |
|Super Mario Bros.          |NES     |null      |null      |
|Mario Kart Wii             |Wii     |8.3       |709       |
|Wii Sports Resort          |Wii     |8         |192       |
|Pokemon Red/Pokemon Blue   |GB      |null      |null      |
|Tetris                     |GB      |null      |null      |
|New Super Mario Bros.      |DS      |8.5       |431       |
|Wii Play                   |Wii     |6.6       |129       |
|New Super Mario Bros. Wii  |Wii     |8.4       |594       |
|Duck Hunt                  |NES     |null      |null      |
|Nintendogs                 |DS      |null      |null      |
|Mario Kart DS              |DS      |8.6       |464       |
|Pokemon Gold/Pokemon Silver|GB      |null      |null      |
|Wii Fit                

In [64]:
data.describe(["User_Score","User_Count","Year_of_Release"]).show()

+-------+------------------+------------------+------------------+
|summary|        User_Score|        User_Count|   Year_of_Release|
+-------+------------------+------------------+------------------+
|  count|             10015|              7590|             16719|
|   mean|7.1250461133070315|162.22990777338603|2006.4873556231003|
| stddev|1.5000060936257986| 561.2823262473789|5.8789947683491475|
|    min|                 0|                 4|              1980|
|    max|               tbd|             10665|               N/A|
+-------+------------------+------------------+------------------+



There are some strings in the user_score "tbd"

In [65]:
data.groupBy("Platform") \
.count() \
.orderBy("count", ascending=False) \
.show(10)

+--------+-----+
|Platform|count|
+--------+-----+
|     PS2| 2161|
|      DS| 2152|
|     PS3| 1331|
|     Wii| 1320|
|    X360| 1262|
|     PSP| 1209|
|      PS| 1197|
|      PC|  974|
|      XB|  824|
|     GBA|  822|
+--------+-----+
only showing top 10 rows



Create new dataframe to remove Null values in User_Score

In [66]:
condition1 = (data.User_Score.isNotNull()) | (data.User_Count.isNotNull())
condition2 = data.User_Score != "tbd"

data = data.filter(condition1).filter(condition2)

In [67]:
condition1 = (data.Critic_Score.isNotNull()) | (data.Critic_Count.isNotNull())
condition2 = data.User_Score != "tbd"

data = data.filter(condition1).filter(condition2)

In [123]:
condition1 = (data.Developer.isNotNull())
data = data.filter(condition1)

condition1 = (data.Rating.isNotNull())
data = data.filter(condition1)

condition1 = (data.Year_of_Release.isNotNull())
data = data.filter(condition1)


In [104]:
data.filter(data.Rating.isNull()).count()

0

In [124]:
data.show(5)

+--------------------+--------+---------------+--------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|                Name|Platform|Year_of_Release|   Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+--------------------+--------+---------------+--------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|          Wii Sports|     Wii|           2006|  Sports| Nintendo|   41.36|   28.96|    3.77|       8.45|       82.53|          76|          51|         8|       322| Nintendo|     E|
|      Mario Kart Wii|     Wii|           2008|  Racing| Nintendo|   15.68|   12.76|    3.79|       3.29|       35.52|          82|          73|         8|       709| Nintendo|     E|
|   Wii Sports Resort|     Wii|           2009|  Sports| Nintendo|   15.61|   10

In [125]:
data.describe(["User_Score","User_Count","Critic_Count","Critic_Score"]).show() #seems clean now

+-------+------------------+-----------------+------------------+-----------------+
|summary|        User_Score|       User_Count|      Critic_Count|     Critic_Score|
+-------+------------------+-----------------+------------------+-----------------+
|  count|              6826|             6826|              6826|             6826|
|   mean| 6.736155874597129|174.7480222677996|28.931145619689424|70.26867858189276|
| stddev|1.4611860669901755|587.3893323682087|19.222756806940907|13.87041674151392|
|    min|                 0|                4|                 3|               13|
|    max|                 9|            10665|               113|               98|
+-------+------------------+-----------------+------------------+-----------------+



## Build a model

For an example of **linear regression**, let’s see if we can predict User_Score from Year_of_Release, Global_Sales, Critic_Score, and User_Count.

First let’s recode all of our predictors to be Doubles (I found that this got rid of some really gnarly errors later on).

In [168]:
from pyspark.sql.types import DoubleType, IntegerType

data = data.withColumn("Year_of_Release", data["Year_of_Release"].cast(DoubleType()))

data = data.withColumn("User_Score", data["User_Score"].cast(DoubleType()))

data = data.withColumn("User_Count", data["User_Count"].cast(DoubleType()))

data = data.withColumn("Critic_Score", data["Critic_Score"].cast(DoubleType()))

data = data.withColumn("Critic_Score", data["Critic_Score"].cast(DoubleType()))

In [169]:
data.dtypes

[('Name', 'string'),
 ('Platform', 'string'),
 ('Year_of_Release', 'double'),
 ('Genre', 'string'),
 ('Publisher', 'string'),
 ('NA_Sales', 'double'),
 ('EU_Sales', 'double'),
 ('JP_Sales', 'double'),
 ('Other_Sales', 'double'),
 ('Global_Sales', 'double'),
 ('Critic_Score', 'double'),
 ('Critic_Count', 'int'),
 ('User_Score', 'double'),
 ('User_Count', 'double'),
 ('Developer', 'string'),
 ('Rating', 'string')]

In [170]:
data.filter(data.Year_of_Release.isNull()).count()

0

#### VectorAssembler
The next step is to get our data into a form that PySpark can create a model with. To do this we use something called a VectorAssembler.

In [171]:
from pyspark.ml.feature import VectorAssembler

inputcols = ["Year_of_Release", "Global_Sales", "Critic_Score", "User_Count"]

assembler = VectorAssembler(inputCols = inputcols,
                            outputCol = "predictors")

Here we’ve delineated what features we want our model to use as predictors so that VectorAssembler can take those columns and transform them into a single column (named “predictors”) that contains all the data we want to predict with.

In [172]:
predictors = assembler.transform(data)

predictors.columns

['Name',
 'Platform',
 'Year_of_Release',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales',
 'Critic_Score',
 'Critic_Count',
 'User_Score',
 'User_Count',
 'Developer',
 'Rating',
 'predictors']

What VectorAssembler.transform() does is create a new DataFrame with a new column at the end where each row contains a list of all the features we included in the inputCols parameter when we created the assembler.

The final step to getting our data ready to be used in a model is to collect the new predictions column we just made and User_Score (our target variable) by themselves in a DataFrame.

In [173]:
model_data = predictors.select("predictors", "User_Score")

model_data.show(5,truncate=False)

+-------------------------+----------+
|predictors               |User_Score|
+-------------------------+----------+
|[2006.0,82.53,76.0,322.0]|8.0       |
|[2008.0,35.52,82.0,709.0]|8.0       |
|[2009.0,32.77,80.0,192.0]|8.0       |
|[2006.0,29.8,89.0,431.0] |8.0       |
|[2006.0,28.92,58.0,129.0]|6.0       |
+-------------------------+----------+
only showing top 5 rows



In [174]:
train_data, test_data = model_data.randomSplit([0.7,0.3])

In [175]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'predictors', labelCol='User_Score', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

pred = lr_model.evaluate(test_data)

Coefficients: [-0.02745162209083305,0.0,0.04131079521461806,0.0]
Intercept: 58.9406467144236


### Model evaluation

In [176]:
from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(
    labelCol="User_Score", 
    predictionCol="prediction", 
    metricName="rmse")

In [177]:
rmse = eval.evaluate(pred.predictions)
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})

print("RMSE: " + str(rmse))
print("MSE: " + str(mse))
print("MAE: " + str(mae))
print("R squared: " + str(r2))


RMSE: 1.1804929078473727
MSE: 1.3935635054779456
MAE: 0.896775202343803
R squared: 0.31927745469019386


In [178]:
train_data.describe().show()

+-------+-----------------+
|summary|       User_Score|
+-------+-----------------+
|  count|             4783|
|   mean|6.738657746184403|
| stddev|1.473970600587042|
|    min|              0.0|
|    max|              9.0|
+-------+-----------------+



This model is accounting for about 40% of the variation in the data. Can we do better?

In [179]:
lr_predictions = lr_model.transform(test_data)
lr_predictions.select("prediction","User_Score","predictors").show(5)

lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="User_Score",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+----------+--------------------+
|       prediction|User_Score|          predictors|
+-----------------+----------+--------------------+
|7.010712891583047|       2.0|[1988.0,0.03,64.0...|
|7.052557135111144|       6.0|[1994.0,1.27,69.0...|
| 7.57600502393413|       7.0|[1996.0,0.14,83.0...|
|7.906491385651073|       8.0|[1996.0,4.63,91.0...|
|7.823869795221839|       8.0|[1996.0,5.74,89.0...|
+-----------------+----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.319277


---

### Gradient-boosted tree regression

In [180]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol = 'predictors', labelCol = 'User_Score', maxIter=10)
gbt_model = gbt.fit(train_data)
gbt_predictions = gbt_model.transform(test_data)
gbt_predictions.select('prediction', 'User_Score', 'predictors').show(5)

+-----------------+----------+--------------------+
|       prediction|User_Score|          predictors|
+-----------------+----------+--------------------+
|5.658324803912353|       2.0|[1988.0,0.03,64.0...|
|6.679882136924183|       6.0|[1994.0,1.27,69.0...|
|6.953429385784522|       7.0|[1996.0,0.14,83.0...|
|8.606887120844258|       8.0|[1996.0,4.63,91.0...|
|8.606887120844258|       8.0|[1996.0,5.74,89.0...|
+-----------------+----------+--------------------+
only showing top 5 rows



In [181]:
gbt_evaluator = RegressionEvaluator(
    labelCol="User_Score", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbt_evaluator = RegressionEvaluator(
    labelCol="User_Score", predictionCol="prediction", metricName="r2")
rsquared = gbt_evaluator.evaluate(gbt_predictions)
print("R squared on test data = %g" % rsquared)

Root Mean Squared Error (RMSE) on test data = 1.12591
R squared on test data = 0.380768


In [182]:
gbt_predictions = gbt_model.transform(test_data)
lr_predictions.select("prediction","User_Score","predictors").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
gbt_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="User_Score",metricName="r2")
print("R Squared (R2) on test data = %g" % gbt_evaluator.evaluate(gbt_predictions))

+-----------------+----------+--------------------+
|       prediction|User_Score|          predictors|
+-----------------+----------+--------------------+
|7.010712891583047|       2.0|[1988.0,0.03,64.0...|
|7.052557135111144|       6.0|[1994.0,1.27,69.0...|
| 7.57600502393413|       7.0|[1996.0,0.14,83.0...|
|7.906491385651073|       8.0|[1996.0,4.63,91.0...|
|7.823869795221839|       8.0|[1996.0,5.74,89.0...|
+-----------------+----------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.380768


---