In [45]:
import findspark
findspark.init()

import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [46]:
from pyspark.ml.feature import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics


In [47]:
spark = SparkSession\
        .builder\
        .appName("Analysis")\
        .getOrCreate()

In [48]:
df = spark.read.csv(r"C:\Users\Kartikeya Mandhar\Desktop\AWSMachineLearning\vgsales.csv",header=True,inferSchema=True)

In [49]:
df = df.na.fill(0)

Due to computational power constraints, we will be taking video game publications published after 2005, and dropping all the unnecessary columns

In [50]:
df = df.drop('Name', 'NA_Sales', 'EU_Sales', 'JP_Sales', 'Other_Sales')
df = df.filter(df.Year > 2005)
df = df.withColumn("Year", df["Year"].cast(IntegerType()))

In [51]:
df.show()
df.count()

+----+--------+----+------------+--------------------+------------+
|Rank|Platform|Year|       Genre|           Publisher|Global_Sales|
+----+--------+----+------------+--------------------+------------+
|   1|     Wii|2006|      Sports|            Nintendo|       82.74|
|   3|     Wii|2008|      Racing|            Nintendo|       35.82|
|   4|     Wii|2009|      Sports|            Nintendo|        33.0|
|   7|      DS|2006|    Platform|            Nintendo|       30.01|
|   8|     Wii|2006|        Misc|            Nintendo|       29.02|
|   9|     Wii|2009|    Platform|            Nintendo|       28.62|
|  14|     Wii|2007|      Sports|            Nintendo|       22.72|
|  15|     Wii|2009|      Sports|            Nintendo|        22.0|
|  16|    X360|2010|        Misc|Microsoft Game St...|       21.82|
|  17|     PS3|2013|      Action|Take-Two Interactive|        21.4|
|  21|      DS|2006|Role-Playing|            Nintendo|       18.36|
|  24|    X360|2013|      Action|Take-Two Intera

10214

In [52]:
df2 = df.groupBy("Publisher").count().orderBy("Count")
df2 = df2.withColumnRenamed("Publisher","Publisher1")
df3 = df2.join(df,df.Publisher == df2.Publisher1)
df3 = df3.drop("Publisher1")



In [53]:
df3 = df3.withColumn("Publisher",when(df3["count"] < 50,"Small Publisher").otherwise(df3.Publisher))
df3 = df3.drop('count')

In [54]:
df3.show()

+----+--------+----+------------+--------------------+------------+
|Rank|Platform|Year|       Genre|           Publisher|Global_Sales|
+----+--------+----+------------+--------------------+------------+
|   1|     Wii|2006|      Sports|            Nintendo|       82.74|
|   3|     Wii|2008|      Racing|            Nintendo|       35.82|
|   4|     Wii|2009|      Sports|            Nintendo|        33.0|
|   7|      DS|2006|    Platform|            Nintendo|       30.01|
|   8|     Wii|2006|        Misc|            Nintendo|       29.02|
|   9|     Wii|2009|    Platform|            Nintendo|       28.62|
|  14|     Wii|2007|      Sports|            Nintendo|       22.72|
|  15|     Wii|2009|      Sports|            Nintendo|        22.0|
|  16|    X360|2010|        Misc|Microsoft Game St...|       21.82|
|  17|     PS3|2013|      Action|Take-Two Interactive|        21.4|
|  21|      DS|2006|Role-Playing|            Nintendo|       18.36|
|  24|    X360|2013|      Action|Take-Two Intera

In [55]:

platformIndexer = StringIndexer()\
  .setInputCol("Platform")\
  .setOutputCol("PlatformIndex")\
  .setHandleInvalid("keep")

    
genreIndexer = StringIndexer()\
  .setInputCol("Genre")\
  .setOutputCol("GenreIndex")\
  .setHandleInvalid("keep")

    
publisherIndexer = StringIndexer()\
  .setInputCol("Publisher")\
  .setOutputCol("PublisherIndex")\
  .setHandleInvalid("keep")



In [56]:
indexers = [StringIndexer(inputCol="Platform", outputCol="PlatformIndex") , StringIndexer(inputCol="Genre", outputCol="GenreIndex"), StringIndexer(inputCol="Publisher", outputCol="PublisherIndex")]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df3).transform(df3)

In [57]:
df.show()


+----+--------+----+------------+--------------------+------------+-------------+----------+--------------+
|Rank|Platform|Year|       Genre|           Publisher|Global_Sales|PlatformIndex|GenreIndex|PublisherIndex|
+----+--------+----+------------+--------------------+------------+-------------+----------+--------------+
|   1|     Wii|2006|      Sports|            Nintendo|       82.74|          2.0|       1.0|           8.0|
|   3|     Wii|2008|      Racing|            Nintendo|       35.82|          2.0|       7.0|           8.0|
|   4|     Wii|2009|      Sports|            Nintendo|        33.0|          2.0|       1.0|           8.0|
|   7|      DS|2006|    Platform|            Nintendo|       30.01|          0.0|      11.0|           8.0|
|   8|     Wii|2006|        Misc|            Nintendo|       29.02|          2.0|       2.0|           8.0|
|   9|     Wii|2009|    Platform|            Nintendo|       28.62|          2.0|      11.0|           8.0|
|  14|     Wii|2007|      Sp

In [58]:
from pyspark.ml.regression import RandomForestRegressor

In [59]:
vectorAssembler = VectorAssembler()\
  .setInputCols(["Year","PlatformIndex","GenreIndex","PublisherIndex"])\
  .setOutputCol("features")

In [60]:
df = vectorAssembler.transform(df)

In [61]:
df = df.select("Global_Sales","features")

In [62]:
train, test = df.randomSplit([0.8, 0.2], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 8207
Test Dataset Count: 2007


In [63]:
rf = RandomForestRegressor(featuresCol = 'features', labelCol = 'Global_Sales',numTrees=25,maxBins=100)

In [64]:
model = rf.fit(train)

In [65]:
result = model.transform(test)

In [66]:
result.show()

+------------+--------------------+-------------------+
|Global_Sales|            features|         prediction|
+------------+--------------------+-------------------+
|        0.01|    (4,[0],[2007.0])|0.13813981214879312|
|        0.01|[2006.0,0.0,2.0,2.0]|0.13814761984447183|
|        0.01|[2006.0,0.0,10.0,...|0.24853632212781795|
|        0.01|[2006.0,0.0,10.0,...|0.24853632212781795|
|        0.01|[2006.0,3.0,0.0,1.0]| 0.9480358247948972|
|        0.01|[2006.0,3.0,9.0,0.0]| 0.1942444432586518|
|        0.01|[2006.0,4.0,6.0,2...| 0.2197903583506686|
|        0.01|[2006.0,5.0,2.0,1...| 0.1704898524445626|
|        0.01|[2006.0,5.0,3.0,0.0]|0.08404503812005322|
|        0.01|[2006.0,5.0,3.0,0.0]|0.08404503812005322|
|        0.01|[2006.0,5.0,3.0,6.0]| 0.2661911625571377|
|        0.01|[2006.0,5.0,5.0,1...|0.17820845075716474|
|        0.01|[2006.0,6.0,2.0,4.0]|0.36978858920926094|
|        0.01|[2007.0,0.0,2.0,1...| 0.1296529167255284|
|        0.01|[2007.0,0.0,4.0,1...|0.28117256281

In [67]:
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.evaluation import RegressionEvaluator

In [68]:
evaluator = RegressionEvaluator(
    labelCol="Global_Sales", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(result)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 1.45559
