In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [27]:
spark = SparkSession.builder.appName('').getOrCreate()

In [28]:
df = spark.read.csv('./vgsales.csv', header = True, inferSchema = True)

In [29]:
!ls

 bitcoin.csv
 Cause_of_death_in_indonesia.ipynb
 LinearRegression_bitcoin.ipynb
 LinearRegression_student.ipynb
'Penyebab Kematian di Indonesia yang Dilaporkan - Clean.csv'
 student_academic_placement_performance_dataset.csv
 vgsales.csv
 video_games.ipynb
 work


In [30]:
df.show()

+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+--------------------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports|            Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform|            Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing|            Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports|            Nintendo|   15.75|   11.01|    3.28|       2.96|        33.0|
|   5|Pokemon Red/Pokem...|      GB|1996|Role-Playing|            Nintendo|   11.27|    8.89|   10.22|  

In [31]:
df.groupBy('Year').count().show()

+----+-----+
|Year|count|
+----+-----+
|1987|   16|
|2016|  344|
|2012|  657|
|2020|    1|
|1988|   15|
|2017|    3|
|2014|  582|
|1984|   14|
|2013|  546|
|1982|   36|
|2005|  941|
|2000|  349|
|1981|   46|
|2002|  829|
|2009| 1431|
|1995|  219|
|2006| 1008|
|2004|  763|
|1989|   17|
|2011| 1139|
+----+-----+
only showing top 20 rows



In [32]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|Rank|Name|Platform|Year|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|   0|   0|       0|   0|    0|        0|       0|       0|       0|          0|           0|
+----+----+--------+----+-----+---------+--------+--------+--------+-----------+------------+



In [33]:
df = df.drop('Name', 'Rank')
df.show(2)

+--------+----+--------+---------+--------+--------+--------+-----------+------------+
|Platform|Year|   Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+--------+----+--------+---------+--------+--------+--------+-----------+------------+
|     Wii|2006|  Sports| Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|     NES|1985|Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
+--------+----+--------+---------+--------+--------+--------+-----------+------------+
only showing top 2 rows



In [34]:
df.columns

['Platform',
 'Year',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales']

In [35]:
df.printSchema()

root
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)



In [36]:
df = df.withColumn('Year', col('Year').cast('int'))

In [37]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
df.printSchema()

+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|Platform|Year|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|       0| 271|    0|        0|       0|       0|       0|          0|           0|
+--------+----+-----+---------+--------+--------+--------+-----------+------------+

root
 |-- Platform: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)



In [38]:
# df = df.dropna(subset = 'Year')
# df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [39]:
df.columns

['Platform',
 'Year',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales']

In [40]:
imputer = Imputer(inputCol = 'Year', outputCol = 'Year_imputer').setStrategy('median')

cat_cols = ['Platform', 'Genre', 'Publisher']
indexer = [StringIndexer(inputCol = i, outputCol = i+'_idx', handleInvalid = 'keep') for i in cat_cols]
ohe = [OneHotEncoder(inputCol = i+'_idx', outputCol = i+'_enc') for i in cat_cols]

num_cols = ['Year_imputer', 'NA_Sales', 'EU_Sales', 'JP_Sales', 'Other_Sales']
num_assembler = VectorAssembler(inputCols = num_cols, outputCol = 'num_features')
scaler = StandardScaler(inputCol = 'num_features', outputCol = 'scaled_features', withMean = True, withStd = True)

features_cols = [i+'_enc' for i in cat_cols] + ['scaled_features']
assembler_all = VectorAssembler(inputCols = features_cols, outputCol = 'features')
lm = LinearRegression(featuresCol = 'features', labelCol = 'Global_Sales')

In [41]:
pipeline = Pipeline(stages = [imputer] + indexer + ohe + [num_assembler, scaler, assembler_all, lm])

In [42]:
df.count()

16598

In [43]:
train, test = df.randomSplit([0.7, 0.3], seed = 42)
train.describe().show()
test.describe().show()

+-------+--------+------------------+--------+--------------------+------------------+-------------------+-------------------+-------------------+------------------+
|summary|Platform|              Year|   Genre|           Publisher|          NA_Sales|           EU_Sales|           JP_Sales|        Other_Sales|      Global_Sales|
+-------+--------+------------------+--------+--------------------+------------------+-------------------+-------------------+-------------------+------------------+
|  count|   11723|             11541|   11723|               11723|             11723|              11723|              11723|              11723|             11723|
|   mean|  2600.0|2006.4126158911706|    NULL|                NULL|0.2717461400665366|0.15199607608973312|0.07789985498592249|0.04914356393414434|0.5510185106201623|
| stddev|     0.0|5.8355505467984115|    NULL|                NULL|0.8854130581898536| 0.5377644845384209| 0.2939859993668561|0.18123714679038358| 1.645114855197849|
|   

In [44]:
train.select([count(when(col(c).isNull(), c)).alias(c) for c in train.columns]).show()
test.select([count(when(col(c).isNull(), c)).alias(c) for c in test.columns]).show()

+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|Platform|Year|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|       0| 182|    0|        0|       0|       0|       0|          0|           0|
+--------+----+-----+---------+--------+--------+--------+-----------+------------+

+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|Platform|Year|Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+--------+----+-----+---------+--------+--------+--------+-----------+------------+
|       0|  89|    0|        0|       0|       0|       0|          0|           0|
+--------+----+-----+---------+--------+--------+--------+-----------+------------+



In [45]:
model = pipeline.fit(train)

In [46]:
pred = model.transform(test)

In [47]:
pred.select('features', 'prediction').show(truncate = False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|features                                                                                                                                                |prediction         |
+--------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|(559,[19,29,52,554,555,556,557,558],[1.0,1.0,1.0,0.09986397102324425,0.02061620818060091,-0.2454533162468594,-0.2649781117253696,-0.27115613330078103]) |0.3103454413345508 |
|(559,[19,33,52,554,555,556,557,558],[1.0,1.0,1.0,0.09986397102324425,1.0596792663659635,-0.13388031035841777,-0.2649781117253696,-0.21597980671931105]) |1.3000606972395015 |
|(559,[19,40,52,554,555,556,557,558],[1.0,1.0,1.0,0.09986397102324425,0.08838119023616804,-0.2454533162468594,-0.264978111725

In [48]:
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'rmse')
rmse = eval.evaluate(pred)
print('RMSE = {:.4f}'.format(rmse))
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'mse')
mse = eval.evaluate(pred)
print('MSE = {:.4f}'.format(mse))
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'mae')
mae = eval.evaluate(pred)
print('MAE = {:.4f}'.format(mae))
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'r2')
r2 = eval.evaluate(pred)
print('R2 = {:.4f}'.format(r2))

RMSE = 0.0053
MSE = 0.0000
MAE = 0.0032
R2 = 1.0000


In [49]:
paramGrid = (ParamGridBuilder().addGrid(lm.regParam, [0.01, 0.5, 1.0]).addGrid(lm.elasticNetParam, [0.1, 0.2, 0.5, 1.0]).build())

In [50]:
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'rmse')

In [51]:
cv = CrossValidator(estimator = pipeline, estimatorParamMaps = paramGrid, evaluator = eval, numFolds = 3)

In [55]:
cv_model = cv.fit(train)

In [56]:
cv_pred = cv_model.transform(test)

In [60]:
cv_pred.select('Global_Sales', 'prediction').show()

+------------+-------------------+
|Global_Sales|         prediction|
+------------+-------------------+
|        0.31| 0.3092698677981408|
|         1.3| 1.2916324871913456|
|        0.38|0.36877026599154017|
|        0.44| 0.4382358226008485|
|        0.98| 0.9742001793925308|
|        2.53| 2.5123121962582085|
|        0.51| 0.5178851837587752|
|        0.27| 0.2696029356692079|
|        0.24|0.22988757815659938|
|        2.76| 2.7405423321506004|
|        4.31|  4.278702774399954|
|        0.39| 0.3886037320560067|
|        0.43|0.41835393115270636|
|        0.22| 0.2199708451243661|
|        1.97| 1.9565143734020598|
|        0.59| 0.5873023149844079|
|        0.39|0.37868699902377345|
|        0.36|0.34893679992707377|
|        0.82| 0.8154840254931235|
|        0.22| 0.2199708451243661|
+------------+-------------------+
only showing top 20 rows



In [58]:
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'rmse')
rmse = eval.evaluate(cv_pred)
print('RMSE = {:.4f}'.format(rmse))
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'mse')
mse = eval.evaluate(cv_pred)
print('MSE = {:.4f}'.format(mse))
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'mae')
mae = eval.evaluate(cv_pred)
print('MAE = {:.4f}'.format(mae))
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'r2')
r2 = eval.evaluate(cv_pred)
print('R2 = {:.4f}'.format(r2))

RMSE = 0.0071
MSE = 0.0001
MAE = 0.0043
R2 = 1.0000


In [59]:
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'rmse')
rmse = eval.evaluate(pred)
print('RMSE = {:.4f}'.format(rmse))
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'mse')
mse = eval.evaluate(pred)
print('MSE = {:.4f}'.format(mse))
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'mae')
mae = eval.evaluate(pred)
print('MAE = {:.4f}'.format(mae))
eval = RegressionEvaluator(labelCol = 'Global_Sales', predictionCol = 'prediction', metricName = 'r2')
r2 = eval.evaluate(pred)
print('R2 = {:.4f}'.format(r2))

RMSE = 0.0053
MSE = 0.0000
MAE = 0.0032
R2 = 1.0000
