
# **Running Pyspark in Colab**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. 
Follow the steps to install the dependencies:

In [4]:
#spark sql imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

import matplotlib.pyplot as plt
%matplotlib inline

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [5]:
#spark ML imports
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StringIndexer, CountVectorizer, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

Run a local spark session to test your installation:

In [6]:
spark = SparkSession.builder.appName('ChicagoFoodInspectionML').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.yarn.jars',
  'local:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/jars/*,local:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/hive/*'),
 ('spark.yarn.appMasterEnv.MKL_NUM_THREADS', '1'),
 ('spark.sql.queryExecutionListeners',
  'com.cloudera.spark.lineage.NavigatorQueryListener'),
 ('spark.lineage.log.dir', '/var/log/spark/lineage'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'md01.rcc.local,md02.rcc.local'),
 ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
 ('spark.executorEnv.PYTHONPATH',
  '/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.10.7-src.zip:/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip<CPS>/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/lib/py4j-0.10.7-src.zip<CPS>/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/spark/python/lib/pyspark.zip'),
 ('spark.yarn.historyServer.addre

In [7]:
from pyspark import SparkContext, SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pyspark.sql.types as t
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import isnan, when, count, col, size
from functools import reduce
import pandas as pd

import matplotlib.pyplot as plt
%matplotlib inline

In [14]:
# Load app_id_info_join (loaded into RCC)
app_id_info = spark.read.csv("./app_id_info_join_old.csv", inferSchema=True, header=False)

In [15]:
# Drop repeating cols
app_id_info = app_id_info.drop("_c8","_c11","_c13","_c15")

In [16]:
# Rename the columns
oldColumns = app_id_info.schema.names
newColumns = ["app_id","title","type","price", "releasedDate", "rating", "requiredAge", "isMultiplayer", 
              "achieveName", "achievePercent", "gamesDeveloper", "gamesGenre", "gamesPublisher"]
  
app_id_info = reduce(lambda app_id_info, idx: app_id_info.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), app_id_info)
app_id_info.printSchema()

root
 |-- app_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- type: string (nullable = true)
 |-- price: double (nullable = true)
 |-- releasedDate: timestamp (nullable = true)
 |-- rating: integer (nullable = true)
 |-- requiredAge: integer (nullable = true)
 |-- isMultiplayer: integer (nullable = true)
 |-- achieveName: string (nullable = true)
 |-- achievePercent: double (nullable = true)
 |-- gamesDeveloper: string (nullable = true)
 |-- gamesGenre: string (nullable = true)
 |-- gamesPublisher: string (nullable = true)



In [17]:
# Rename app_id to app_id_1 to makesure we dont 2 columns with same name to drop later
app_id_info = app_id_info.withColumnRenamed("app_id_1", "app_id")

In [18]:
#df = spark.read.csv("./game2_df.csv", inferSchema=True, header=True)
df = spark.read.csv('/user/tamng/jwht/CleanData/game2_df.csv', inferSchema=True, header=True).coalesce(199)

In [48]:
df.count()

100000000

In [19]:
# Rename the columns
oldColumns = df.schema.names
newColumns = ["steam_id", "app_id","playtime_2weeks", "playtime_forever","dateretrieved"]
 
df = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), df).cache() 
df.printSchema()

root
 |-- steam_id: long (nullable = true)
 |-- app_id: integer (nullable = true)
 |-- playtime_2weeks: integer (nullable = true)
 |-- playtime_forever: integer (nullable = true)
 |-- dateretrieved: timestamp (nullable = true)



In [20]:
#grouping by acheievements name and grabbing the count of the achievmenets - can we do this because we do not know if the achievement names are unique?
app_id_info1 = app_id_info.groupby('app_id',
 'title',
 'type',
 'price',
 'releasedDate',
 'rating',
 'requiredAge',
 'isMultiplayer',
 'gamesDeveloper',
 'gamesGenre',
 'gamesPublisher').agg(F.count('achieveName').alias('achivements_cnt'))

In [21]:
#Select only the top 11 genres, label all others as 'Other' for gamesGenre
app_id_info1 = app_id_info1.withColumn('gamesGenre',F.when(app_id_info1.gamesGenre.isin('Action','Adventure','Sports','Simulation','Casual',
              'Strategy','RPG','Early Access','Free to Play','Violent','Indie'),app_id_info1.gamesGenre).otherwise('Other'))

genre_list = ['Action','Adventure','Sports','Simulation','Casual',
              'Strategy','RPG','Early Access','Free to Play','Violent','Indie','Other']

for column in genre_list:
    app_id_info1 = app_id_info1.withColumn(column,app_id_info1.gamesGenre.rlike(column).cast('integer'))

In [22]:
#in this expression can we potentially drop some of these grouping when making a new table
exprs = {x: "sum" for x in genre_list}
app_id_info1 = app_id_info1.groupby('app_id','title','type','price','releasedDate','rating','requiredAge','isMultiplayer','gamesDeveloper','gamesPublisher','achivements_cnt').agg(exprs)

In [23]:
app_id_info1 = app_id_info1.withColumnRenamed('sum(Action)','Action')
app_id_info1 = app_id_info1.withColumnRenamed('sum(Adventure)','Adventure')
app_id_info1 = app_id_info1.withColumnRenamed('sum(Sports)','Sports')
app_id_info1 = app_id_info1.withColumnRenamed('sum(Simulation)','Simulation')
app_id_info1 = app_id_info1.withColumnRenamed('sum(Casual)','Casual')
app_id_info1 = app_id_info1.withColumnRenamed('sum(Strategy)','Strategy')
app_id_info1 = app_id_info1.withColumnRenamed('sum(RPG)','RPG')
app_id_info1 = app_id_info1.withColumnRenamed('sum(Early Access)','Early Access')
app_id_info1 = app_id_info1.withColumnRenamed('sum(Free to Play)','Free to Play')
app_id_info1 = app_id_info1.withColumnRenamed('sum(Violent)','Violent')
app_id_info1 = app_id_info1.withColumnRenamed('sum(Other)','Other')
app_id_info1 = app_id_info1.withColumnRenamed('sum(Indie)','Indie')

In [24]:
#JIM FEATURE ENGINEERING STARTS HERE
#Total playtime and total games in library
df2 = df.groupby('steam_id').agg(F.sum('playtime_forever').alias('total_playtime_forever'),
                                 F.countDistinct('app_id').alias('total_games_owned'),
                                 F.sum('playtime_2weeks').alias('total_playtime_2weeks'))
                                 #F.countDistinct(F.when(df['playtime_forever']>0)).alias('total_games_played'))

In [25]:
#Total games not played
df2_notplayed = df.filter(df.playtime_forever==F.lit(0)).groupby('steam_id').agg(\
                            F.count('app_id').alias('total_games_not_played'))

In [26]:
#Total games played
df2_played = df.filter(df.playtime_forever>F.lit(0)).groupby('steam_id').agg(
                            F.count('app_id').alias('total_games_played'))

In [27]:
#Percentage of games played
df2_join = df2.join(df2_played,df2.steam_id==df2_played.steam_id,'left').drop(df2_played.steam_id)
df2_join = df2_join.join(df2_notplayed,df2_join.steam_id==df2_notplayed.steam_id,'left').drop(df2_notplayed.steam_id)
df2_join = df2_join.withColumn('pct_games_played',F.lit(df2_join.total_games_not_played)/F.lit(df2.total_games_owned))
df2_join = df2_join.select('steam_id','total_playtime_forever','total_games_owned','total_playtime_2weeks','total_games_not_played','total_games_played','pct_games_played')
df2_join = df2_join.fillna(0)

In [28]:
#Retrieve playtime by genre for each game, after joining to app_id_info to get genre
df2_genre = df.join(app_id_info1,df.app_id==app_id_info1.app_id,'inner').drop(df.app_id)
df2_genre = df2_genre.groupby('steam_id',
 'app_id',
 'title',
 'type',
 'price',
 'releasedDate',
 'rating',
 'requiredAge',
 'isMultiplayer',
 'achivements_cnt',
 'Indie',
 'Other',
 'Sports',
 'Simulation',
 'Strategy',
 'RPG',
 'Violent',
 'Casual',
 'Adventure',
 'Early Access',
 'Action',
 'Free to Play').agg(F.max('gamesDeveloper').alias('gamesDeveloper'),F.max('gamesPublisher').alias('gamesPublisher'),
                      F.max('playtime_2weeks').alias('playtime_2weeks'),F.max('playtime_forever').alias('playtime_forever'),
                     F.max('dateretrieved').alias('date_retrieved'))

In [29]:
df2_genre1 = df2_genre.withColumn('action_playtime_forever',df2_genre.Action*df2_genre.playtime_forever) 
df2_genre1 = df2_genre1.withColumn('adventure_playtime_forever',df2_genre.Adventure*df2_genre.playtime_forever) 
df2_genre1 = df2_genre1.withColumn('simulation_playtime_forever',df2_genre.Simulation*df2_genre.playtime_forever) 
df2_genre1 = df2_genre1.withColumn('sports_playtime_forever',df2_genre.Sports*df2_genre.playtime_forever) 
df2_genre1 = df2_genre1.withColumn('casual_playtime_forever',df2_genre.Casual*df2_genre.playtime_forever) 
df2_genre1 = df2_genre1.withColumn('strategy_playtime_forever',df2_genre.Strategy*df2_genre.playtime_forever) 
df2_genre1 = df2_genre1.withColumn('rpg_playtime_forever',df2_genre.RPG*df2_genre.playtime_forever) 
df2_genre1 = df2_genre1.withColumn('earlyaccess_playtime_forever',df2_genre['Early Access']*df2_genre.playtime_forever) 
df2_genre1 = df2_genre1.withColumn('freetoplay_playtime_forever',df2_genre['Free To Play']*df2_genre.playtime_forever)
df2_genre1 = df2_genre1.withColumn('violent_playtime_forever',df2_genre['Violent']*df2_genre.playtime_forever) 
df2_genre1 = df2_genre1.withColumn('other_playtime_forever',df2_genre['Other']*df2_genre.playtime_forever)

#Total playtime and total games in library by genre
df2_total = df2_genre1.groupby('steam_id').agg(F.sum('action_playtime_forever').alias('tt_action_playtime_forever'),
                                F.sum('adventure_playtime_forever').alias('tt_adventure_playtime_forever'), 
                                F.sum('simulation_playtime_forever').alias('tt_simulation_playtime_forever'), 
                                F.sum('sports_playtime_forever').alias('tt_sports_playtime_forever'), 
                                F.sum('casual_playtime_forever').alias('tt_casual_playtime_forever'), 
                                F.sum('strategy_playtime_forever').alias('tt_strategy_playtime_forever'), 
                                F.sum('rpg_playtime_forever').alias('tt_rpg_playtime_forever'), 
                                F.sum('earlyaccess_playtime_forever').alias('tt_earlyaccess_playtime_forever'), 
                                F.sum('freetoplay_playtime_forever').alias('tt_freetoplay_playtime_forever'), 
                                F.sum('violent_playtime_forever').alias('tt_violent_playtime_forever'), 
                                F.sum('other_playtime_forever').alias('tt_other_playtime_forever'))

#Average playtime and total games in library by genre
df2_avgs = df2_genre1.groupby('steam_id').agg(F.avg('action_playtime_forever').alias('avg_action_playtime_forever'),
                                F.avg('adventure_playtime_forever').alias('avg_adventure_playtime_forever'),
                                F.avg('simulation_playtime_forever').alias('avg_simulation_playtime_forever'),
                                F.avg('sports_playtime_forever').alias('avg_sports_playtime_forever'),
                                F.avg('casual_playtime_forever').alias('avg_casual_playtime_forever'),
                                F.avg('strategy_playtime_forever').alias('avg_strategy_playtime_forever'),
                                F.avg('rpg_playtime_forever').alias('avg_rpg_playtime_forever'),
                                F.avg('earlyaccess_playtime_forever').alias('avg_earlyaccess_playtime_forever'),
                                F.avg('freetoplay_playtime_forever').alias('avg_freetoplay_playtime_forever'),
                                F.avg('violent_playtime_forever').alias('avg_violent_playtime_forever'),       
                                F.avg('other_playtime_forever').alias('avg_other_playtime_forever'))

In [30]:
from pyspark.sql.functions import col, lit, coalesce, greatest
from pyspark.sql import Window
#Favorite or Most played genre, least played genre(s)
df2_most = df2_genre.withColumn('rank',F.dense_rank().over(Window.partitionBy("steam_id").orderBy(["playtime_forever"])))
df2_most = df2_most.filter(df2_most.rank==F.lit(1))

In [31]:
df2_most = df2_most.withColumnRenamed('playtime_forever','playtime_mostplayedgenre')
df2_most = df2_most.withColumnRenamed('Action','Mostplayed_Action')
df2_most = df2_most.withColumnRenamed('Adventure','Mostplayed_Adventure')
df2_most = df2_most.withColumnRenamed('Sports','Mostplayed_Sports')
df2_most = df2_most.withColumnRenamed('Simulation','Mostplayed_Simulation')
df2_most = df2_most.withColumnRenamed('Casual','Mostplayed_Casual')
df2_most = df2_most.withColumnRenamed('Strategy','Mostplayed_Strategy')
df2_most = df2_most.withColumnRenamed('RPG','Mostplayed_RPG')
df2_most = df2_most.withColumnRenamed('Early Access','Mostplayed_Early_Access')
df2_most = df2_most.withColumnRenamed('Free to Play','Mostplayed_FreetoPlay')
df2_most = df2_most.withColumnRenamed('Violent','Mostplayed_Violent')
df2_most = df2_most.withColumnRenamed('Indie','Mostplayed_Indie')
df2_most = df2_most.withColumnRenamed('Other','Mostplayed_Other')

In [32]:
df2_most=df2_most.select('steam_id','playtime_mostplayedgenre','Mostplayed_Action','Mostplayed_Adventure','Mostplayed_Simulation',
                         'Mostplayed_Casual','Mostplayed_Strategy','Mostplayed_RPG','Mostplayed_Early_Access','Mostplayed_FreetoPlay',
                         'Mostplayed_Violent','Mostplayed_Indie','Mostplayed_Other')

In [33]:
#Count games by genre in user's library, everything will appear as Sum(NameofGenre) -- you need to clean this naming
exprs = {x: "sum" for x in genre_list}
df4 = df2_genre.groupBy("steam_id").agg(exprs)

df4 = df4.withColumnRenamed('sum(Action)','Action_Cnt')
df4 = df4.withColumnRenamed('sum(Adventure)','Adventure_Cnt')
df4 = df4.withColumnRenamed('sum(Sports)','Sports_Cnt')
df4 = df4.withColumnRenamed('sum(Simulation)','Simulation_Cnt')
df4 = df4.withColumnRenamed('sum(Casual)','Casual_Cnt')
df4 = df4.withColumnRenamed('sum(Strategy)','Strategy_Cnt')
df4 = df4.withColumnRenamed('sum(RPG)','RPG_Cnt')
df4 = df4.withColumnRenamed('sum(Early Access)','EarlyAccess_Cnt')
df4 = df4.withColumnRenamed('sum(Free to Play)','FreetoPlay_Cnt')
df4 = df4.withColumnRenamed('sum(Violent)','Violent_Cnt')
df4 = df4.withColumnRenamed('sum(Indie)','Indie_Cnt')
df4 = df4.withColumnRenamed('sum(Other)','Other_Cnt')
#df4 = df2_genre.groupby('steam_id')['Action','Adventure','Sports','Simulation','Casual','Strategy','RPG','Early Access','Free To Play','Violent','Other'].sum()

In [34]:
#Join all temp tables
df_masterjoin = df2_genre.join(df2_join,df2_join.steam_id==df2_genre.steam_id,'inner').drop(df2_join.steam_id)
df_masterjoin = df_masterjoin.join(df2_total,df2_total.steam_id==df_masterjoin.steam_id,'inner').drop(df2_total.steam_id)
df_masterjoin = df_masterjoin.join(df2_avgs,df2_avgs.steam_id==df_masterjoin.steam_id,'inner').drop(df2_avgs.steam_id)
#df_masterjoin = df_masterjoin.join(df2_most,df2_most.steam_id==df_masterjoin.steam_id,'inner').drop(df2_most.steam_id)
df_masterjoin = df_masterjoin.join(df4,df4.steam_id==df_masterjoin.steam_id,'inner').drop(df4.steam_id)

In [35]:
#Reorder features to bring steam_id to left most column
df_masterjoin = df_masterjoin.select('steam_id',
 #'playtime_2weeks',
 'playtime_forever',
 #'dateretrieved',
 'app_id',
 'title',
 'type',
 'price',
 'releasedDate',
 #'rating',
 'requiredAge',
 'isMultiplayer',
 'gamesDeveloper',
 'gamesPublisher',
 'achivements_cnt',
 'Indie',
 'Other',
 'Sports',
 'Simulation',
 'Strategy',
 'RPG',
 'Violent',
 'Casual',
 'Adventure',
 'Early Access',
 'Action',
 'Free to Play',
 'total_playtime_forever',
 'total_games_owned',
 'total_playtime_2weeks',
 'total_games_not_played',
 'total_games_played',
 'pct_games_played',
 #'tt_action_playtime_forever',
 #'tt_adventure_playtime_forever',
 #'tt_simulation_playtime_forever',
 #'tt_sports_playtime_forever',
 #'tt_casual_playtime_forever',
 #'tt_strategy_playtime_forever',
 #'tt_rpg_playtime_forever',
 #'tt_earlyaccess_playtime_forever',
 #'tt_freetoplay_playtime_forever',
 #'tt_violent_playtime_forever',
 #'tt_other_playtime_forever',
 'Indie_Cnt',
 'Other_Cnt',
 'Sports_Cnt',
 'Simulation_Cnt',
 'Strategy_Cnt',
 'RPG_Cnt',
 'Violent_Cnt',
 'Casual_Cnt',
 'Adventure_Cnt',
 'EarlyAccess_Cnt',
 'Action_Cnt',
 'FreetoPlay_Cnt')

In [36]:
### Han's Edit's below for LR

In [37]:
# Load app_id_info_join FINAL from tam/woo (loaded into RCC)
app_id_info_new = spark.read.csv("./app_id_info_join_new.csv", inferSchema=True, header=True) #ded_2.GetContentFile('app_id_info_join_new.csv')

In [38]:
app_id_cleaned = app_id_info_new.filter((app_id_info_new['positiveReviewPercent'] != '999'))
app_id_cleaned = app_id_info_new[['title', 'rating', 'positiveReviewPercent']]

In [39]:
app_id_info_new.columns

['app_id',
 'title',
 'type',
 'price',
 'releaseDate',
 'rating',
 'ageRequirement',
 'isMultiplayer',
 'positiveReviewPercent']

In [40]:
df_masterjoin1 = df_masterjoin.join(app_id_cleaned, df_masterjoin['title'] == app_id_cleaned['title'],'left').drop(df_masterjoin.title)
df_masterjoin1 = df_masterjoin1.fillna(0)

In [41]:
#df_masterjoin1 = df_masterjoin.drop('dateretrieved')

In [42]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler #vectorizes the features
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [12]:
#df_masterjoin1.write.csv('/user/jfang5/df_masterjoin1.csv')
df_masterjoin1 = spark.read.csv("/user/jfang5/df_masterjoin1.csv", inferSchema=True, header=True)

In [43]:
df_masterjoin1.printSchema()

root
 |-- steam_id: long (nullable = true)
 |-- playtime_forever: integer (nullable = true)
 |-- app_id: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- price: double (nullable = false)
 |-- releasedDate: timestamp (nullable = true)
 |-- requiredAge: integer (nullable = true)
 |-- isMultiplayer: integer (nullable = true)
 |-- gamesDeveloper: string (nullable = true)
 |-- gamesPublisher: string (nullable = true)
 |-- achivements_cnt: long (nullable = false)
 |-- Indie: long (nullable = true)
 |-- Other: long (nullable = true)
 |-- Sports: long (nullable = true)
 |-- Simulation: long (nullable = true)
 |-- Strategy: long (nullable = true)
 |-- RPG: long (nullable = true)
 |-- Violent: long (nullable = true)
 |-- Casual: long (nullable = true)
 |-- Adventure: long (nullable = true)
 |-- Early Access: long (nullable = true)
 |-- Action: long (nullable = true)
 |-- Free to Play: long (nullable = true)
 |-- total_playtime_forever: long (nullable = true)
 |-- total_games_ow

Data type string of column title is not supported.
Data type string of column type is not supported.
Data type string of column releasedDate is not supported.
Data type string of column gamesDeveloper is not supported.
Data type string of column gamesPublisher is not supported.

In [44]:
df_masterjoin2 = df_masterjoin1.withColumn('year_released',F.year('releasedDate'))
df_masterjoin2 = df_masterjoin1.drop('title', 'type', 'releasedDate', 'gamesDeveloper', 'gamesPublisher','steam_id','rating')

In [0]:
df_masterjoin2.count()

1282418

In [45]:
df_masterjoin2.columns

['playtime_forever',
 'app_id',
 'price',
 'requiredAge',
 'isMultiplayer',
 'achivements_cnt',
 'Indie',
 'Other',
 'Sports',
 'Simulation',
 'Strategy',
 'RPG',
 'Violent',
 'Casual',
 'Adventure',
 'Early Access',
 'Action',
 'Free to Play',
 'total_playtime_forever',
 'total_games_owned',
 'total_playtime_2weeks',
 'total_games_not_played',
 'total_games_played',
 'pct_games_played',
 'Indie_Cnt',
 'Other_Cnt',
 'Sports_Cnt',
 'Simulation_Cnt',
 'Strategy_Cnt',
 'RPG_Cnt',
 'Violent_Cnt',
 'Casual_Cnt',
 'Adventure_Cnt',
 'EarlyAccess_Cnt',
 'Action_Cnt',
 'FreetoPlay_Cnt',
 'positiveReviewPercent']

In [46]:
vectorAssembler = VectorAssembler(inputCols = [i for i in df_masterjoin2.columns if i != 'playtime_forever'], outputCol = 'features') #[i for i in df_masterjoin2.columns if i != 'price']
vectorAssembler.setHandleInvalid("skip").transform(df_masterjoin2)
v_df_masterjoin = vectorAssembler.transform(df_masterjoin2)
v_df_masterjoin = v_df_masterjoin.select(['features', 'playtime_forever'])

In [47]:
v_df_masterjoin = v_df_masterjoin.fillna(0)
splits = v_df_masterjoin.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

LR with CV

In [91]:
lr = LinearRegression(featuresCol = 'features', labelCol='playtime_forever') #label col has to be label?
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="playtime_forever", metricName="rmse")

In [92]:
grid = (ParamGridBuilder().addGrid(lr.maxIter, [5, 10]) \
                                .addGrid(lr.regParam, [0.01, 0.1]) \
                                .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
                                .build())

In [93]:
lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, \
                        evaluator=evaluator, numFolds=2)

In [94]:
#Fit Linear Regression Model
lr_model = lr_cv.fit(train_df)

In [95]:
bestModel = lr_model.bestModel

print("The coefficient of the model is " + str(bestModel.coefficients))
print("The Intercept of the model is : " + str(bestModel.intercept))

The coefficient of the model is [-0.0036384708984676865,24.592333111453748,-5.1519881721697915,259.53645035098697,8.524469136769724,-357.8450684819402,-127.84180417064037,-342.5475995368185,162.40671603489199,-97.56596665781385,691.2125008526195,0.0,-158.56992660005372,-152.5191193372339,-100.24573275360235,-391.3493004349811,855.8432068494642,0.009528296728404918,-0.3404730165886879,0.00893004193454783,0.49482557527404414,-4.233960490440863,-962.9204188579632,10.212541621308622,23.342305313534677,-4.952501163254995,-0.38163875669377567,-10.768730182295775,-12.910776629212771,0.0,3.1309729702095432,9.462976915001532,26.846128149712165,-11.011879936954521,-23.003279220210363,5.527216564652625]
The Intercept of the model is : -10076.885085883614


In [96]:
#RMSE measures the differences between predicted values by the model and the actual values. However, RMSE alone is meaningless until we compare with the actual “MV” value, such as mean, min and max. After such comparison, our RMSE looks pretty good.
trainingSummary = bestModel.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
print('MAE: %f' % trainingSummary.meanAbsoluteError)

RMSE: 6221.471612
r2: 0.051977
MAE: 1616.112751


In [97]:
#test aspect to check with actuals
lr_predictions1 = bestModel.transform(test_df)
lr_predictions1.select("prediction","playtime_forever","features").show(5)

+------------------+----------------+--------------------+
|        prediction|playtime_forever|            features|
+------------------+----------------+--------------------+
|142.61850466545184|               7|(36,[0,1,2,3,4,5,...|
|  570.622298840708|             141|(36,[0,1,2,3,4,5,...|
|1215.8711300663108|            2757|(36,[0,1,2,3,4,5,...|
| 894.7216097409437|             122|(36,[0,1,2,3,4,5,...|
| 682.9565667657589|               0|(36,[0,1,2,3,4,5,...|
+------------------+----------------+--------------------+
only showing top 5 rows



In [120]:
# how to evaluate the lr predictions
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="playtime_forever",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions1))

print("MSE %g" % lr_evaluator.evaluate(lr_predictions1, {lr_evaluator.metricName: "mse"}))

print("MAE %g" % lr_evaluator.evaluate(lr_predictions1, {lr_evaluator.metricName: "mae"}))

R Squared (R2) on test data = 0.052101
MSE 3.85563e+07
MAE 1617.63


In [0]:
# eval = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

# # Root Mean Square Error
# rmse = eval.evaluate(lr_predictions)
# print("RMSE: %.3f" % rmse)

# # Mean Square Error
# mse = eval.evaluate(lr_predictions, {eval.metricName: "mse"})
# print("MSE: %.3f" % mse)

# # Mean Absolute Error
# mae = eval.evaluate(lr_predictions, {eval.metricName: "mae"})
# print("MAE: %.3f" % mae)

# # r2 - coefficient of determination
# r2 = eval.evaluate(lr_predictions, {eval.metricName: "r2"})
# print("r2: %.3f" %r2)

In [0]:
#Decision Tree Regression Below

In [102]:
df_masterjoin2.printSchema()

root
 |-- playtime_forever: integer (nullable = true)
 |-- app_id: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- requiredAge: integer (nullable = true)
 |-- isMultiplayer: integer (nullable = true)
 |-- achivements_cnt: long (nullable = false)
 |-- Indie: long (nullable = true)
 |-- Other: long (nullable = true)
 |-- Sports: long (nullable = true)
 |-- Simulation: long (nullable = true)
 |-- Strategy: long (nullable = true)
 |-- RPG: long (nullable = true)
 |-- Violent: long (nullable = true)
 |-- Casual: long (nullable = true)
 |-- Adventure: long (nullable = true)
 |-- Early Access: long (nullable = true)
 |-- Action: long (nullable = true)
 |-- Free to Play: long (nullable = true)
 |-- total_playtime_forever: long (nullable = true)
 |-- total_games_owned: long (nullable = false)
 |-- total_playtime_2weeks: long (nullable = true)
 |-- total_games_not_played: long (nullable = true)
 |-- total_games_played: long (nullable = true)
 |-- pct_games_played: double (nul

In [98]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'playtime_forever', maxBins = 500, maxDepth = 4)
dt_model = dt.fit(train_df)

dt_predictions_train = dt_model.transform(train_df)
dt_evaluator = RegressionEvaluator(
    labelCol="playtime_forever", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions_train)
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse)
print("R2 on train data %g" % dt_evaluator.evaluate(dt_predictions_train, {dt_evaluator.metricName: "r2"}))

dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="playtime_forever", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("\nRoot Mean Squared Error (RMSE) on test data = %g" % rmse)

print("MSE %g" % dt_evaluator.evaluate(dt_predictions, {dt_evaluator.metricName: "mse"}))
print("R2 on test data %g" % dt_evaluator.evaluate(dt_predictions, {dt_evaluator.metricName: "r2"}))
print("MAE %g" % dt_evaluator.evaluate(dt_predictions, {dt_evaluator.metricName: "mae"}))

Root Mean Squared Error (RMSE) on train data = 5980.68
R2 on train data 0.123942

Root Mean Squared Error (RMSE) on test data = 6035.23
MSE 3.6424e+07
R2 on test data 0.12338
MAE 1379.28


In [99]:
dt_model.featureImportances

SparseVector(36, {0: 0.0783, 1: 0.0099, 4: 0.3071, 10: 0.0237, 17: 0.3012, 21: 0.2799})

In [100]:
print([i for i in [i for i in [i for i in [i for i in [i for i in df_masterjoin2.columns if i != 'price'] if i !='title_index']if i !='releasedDate_index']if i !='gamesDeveloper_index']if i !='gamesPublisher_index'][0],
[i for i in [i for i in [i for i in [i for i in [i for i in df_masterjoin2.columns if i != 'price'] if i !='title_index']if i !='releasedDate_index']if i !='gamesDeveloper_index']if i !='gamesPublisher_index'][2],
[i for i in [i for i in [i for i in [i for i in [i for i in df_masterjoin2.columns if i != 'price'] if i !='title_index']if i !='releasedDate_index']if i !='gamesDeveloper_index']if i !='gamesPublisher_index'][3],
[i for i in [i for i in [i for i in [i for i in [i for i in df_masterjoin2.columns if i != 'price'] if i !='title_index']if i !='releasedDate_index']if i !='gamesDeveloper_index']if i !='gamesPublisher_index'][4],
[i for i in [i for i in [i for i in [i for i in [i for i in df_masterjoin2.columns if i != 'price'] if i !='title_index']if i !='releasedDate_index']if i !='gamesDeveloper_index']if i !='gamesPublisher_index'][6],
[i for i in [i for i in [i for i in [i for i in [i for i in df_masterjoin2.columns if i != 'price'] if i !='title_index']if i !='releasedDate_index']if i !='gamesDeveloper_index']if i !='gamesPublisher_index'][10],
[i for i in [i for i in [i for i in [i for i in [i for i in df_masterjoin2.columns if i != 'price'] if i !='title_index']if i !='releasedDate_index']if i !='gamesDeveloper_index']if i !='gamesPublisher_index'][11],
[i for i in [i for i in [i for i in [i for i in [i for i in df_masterjoin2.columns if i != 'price'] if i !='title_index']if i !='releasedDate_index']if i !='gamesDeveloper_index']if i !='gamesPublisher_index'][17],
[i for i in [i for i in [i for i in [i for i in [i for i in df_masterjoin2.columns if i != 'price'] if i !='title_index']if i !='releasedDate_index']if i !='gamesDeveloper_index']if i !='gamesPublisher_index'][21])

playtime_forever requiredAge isMultiplayer achivements_cnt Other RPG Violent total_playtime_forever total_games_played


In [103]:
#Gradient Boosting
from pyspark.ml.regression import GBTRegressor
gbt_evaluator = RegressionEvaluator(
    labelCol="playtime_forever", predictionCol="prediction", metricName="rmse")

gbt = GBTRegressor(featuresCol = 'features', labelCol = 'playtime_forever', maxIter=10)
'''gbt_model = gbt.fit(train_df)

###
gbt_predictions1 = gbt_model.transform(train_df)
gbt_predictions1.select('prediction', 'playtime_forever', 'features').show(5)
rmse = gbt_evaluator.evaluate(gbt_predictions1)
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse)
print("MSE TRAIN %g" % gbt_evaluator.evaluate(gbt_predictions1, {gbt_evaluator.metricName: "mse"}))
print("R2 on train data %g" % gbt_evaluator.evaluate(gbt_predictions1, {gbt_evaluator.metricName: "r2"}))
print("MAE train %g" % gbt_evaluator.evaluate(gbt_predictions1, {gbt_evaluator.metricName: "mae"}))
###
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'playtime_forever', 'features').show(5)
gbt_evaluator = RegressionEvaluator(
    labelCol="playtime_forever", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("\nRoot Mean Squared Error (RMSE) on test data = %g" % rmse)
print("MSE %g" % gbt_evaluator.evaluate(gbt_predictions, {gbt_evaluator.metricName: "mse"}))
print("R2 on test data %g" % gbt_evaluator.evaluate(gbt_predictions, {gbt_evaluator.metricName: "r2"}))
print("MAE %g" % gbt_evaluator.evaluate(gbt_predictions, {gbt_evaluator.metricName: "mae"}))'''

'gbt_model = gbt.fit(train_df)\n\n###\ngbt_predictions1 = gbt_model.transform(train_df)\ngbt_predictions1.select(\'prediction\', \'playtime_forever\', \'features\').show(5)\nrmse = gbt_evaluator.evaluate(gbt_predictions1)\nprint("Root Mean Squared Error (RMSE) on train data = %g" % rmse)\nprint("MSE TRAIN %g" % gbt_evaluator.evaluate(gbt_predictions1, {gbt_evaluator.metricName: "mse"}))\nprint("R2 on train data %g" % gbt_evaluator.evaluate(gbt_predictions1, {gbt_evaluator.metricName: "r2"}))\nprint("MAE train %g" % gbt_evaluator.evaluate(gbt_predictions1, {gbt_evaluator.metricName: "mae"}))\n###\ngbt_predictions = gbt_model.transform(test_df)\ngbt_predictions.select(\'prediction\', \'playtime_forever\', \'features\').show(5)\ngbt_evaluator = RegressionEvaluator(\n    labelCol="playtime_forever", predictionCol="prediction", metricName="rmse")\nrmse = gbt_evaluator.evaluate(gbt_predictions)\nprint("\nRoot Mean Squared Error (RMSE) on test data = %g" % rmse)\nprint("MSE %g" % gbt_evaluato

In [104]:
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [4,5])
             .addGrid(gbt.maxBins, [250,500])
             .addGrid(gbt.maxIter, [10])
             .build())

In [105]:
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=gbt_evaluator, numFolds=2)
# Run cross validations
cvModel = cv.fit(train_df)

Exception ignored in: <bound method JavaModelWrapper.__del__ of <pyspark.mllib.evaluation.MulticlassMetrics object at 0x7feb1e8ccba8>>
Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH/lib/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", line 142, in __del__
AttributeError: 'MulticlassMetrics' object has no attribute '_sc'


In [106]:
gbestModel = cvModel.bestModel

gb_predictions = gbestModel.transform(train_df)
gb_predictions.select("prediction","playtime_forever","features").show(5)

gbt_evaluator = RegressionEvaluator(
    labelCol="playtime_forever", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gb_predictions)

print("Root Mean Squared Error (RMSE) on train data = %g" % rmse)
print("MSE TRAIN %g" % gbt_evaluator.evaluate(gb_predictions, {gbt_evaluator.metricName: "mse"}))
print("R2 on train data %g" % gbt_evaluator.evaluate(gb_predictions, {gbt_evaluator.metricName: "r2"}))
print("MAE train %g" % gbt_evaluator.evaluate(gb_predictions, {gbt_evaluator.metricName: "mae"}))

+-----------------+----------------+--------------------+
|       prediction|playtime_forever|            features|
+-----------------+----------------+--------------------+
|545.9614189462898|              21|(36,[0,1,2,3,4,5,...|
|580.2965581062791|              25|(36,[0,1,2,3,4,5,...|
|542.3691439662712|             898|(36,[0,1,2,3,4,5,...|
| 603.030644179706|             282|(36,[0,1,2,3,4,5,...|
|759.5331650638544|             522|(36,[0,1,2,3,4,5,...|
+-----------------+----------------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on train data = 5689.27
MSE TRAIN 3.23678e+07
R2 on train data 0.207232
MAE train 1292.05


In [107]:
gbestModel

GBTRegressionModel (uid=GBTRegressor_7d3d22944f79) with 10 trees

In [108]:
###
gb_predictions_test = gbestModel.transform(test_df)
gb_predictions_test.select('prediction', 'playtime_forever', 'features').show(5)

gbt_evaluator = RegressionEvaluator(
    labelCol="playtime_forever", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gb_predictions_test)

print("\nRoot Mean Squared Error (RMSE) on test data = %g" % rmse)
print("MSE %g" % gbt_evaluator.evaluate(gb_predictions_test, {gbt_evaluator.metricName: "mse"}))
print("R2 on test data %g" % gbt_evaluator.evaluate(gb_predictions_test, {gbt_evaluator.metricName: "r2"}))
print("MAE %g" % gbt_evaluator.evaluate(gb_predictions_test, {gbt_evaluator.metricName: "mae"}))

+-----------------+----------------+--------------------+
|       prediction|playtime_forever|            features|
+-----------------+----------------+--------------------+
|848.8344824067185|               7|(36,[0,1,2,3,4,5,...|
|509.7372365694996|             141|(36,[0,1,2,3,4,5,...|
|1977.183526591758|            2757|(36,[0,1,2,3,4,5,...|
| 850.752186698694|             122|(36,[0,1,2,3,4,5,...|
|474.8478295548297|               0|(36,[0,1,2,3,4,5,...|
+-----------------+----------------+--------------------+
only showing top 5 rows


Root Mean Squared Error (RMSE) on test data = 5762.36
MSE 3.32048e+07
R2 on test data 0.200856
MAE 1294.91


# Classification Analysis

In [48]:
df_masterjoin2 = df_masterjoin2.withColumn('played_or_not',F.when(df_masterjoin2.playtime_forever>0,F.lit(1)).otherwise(F.lit(0)))
#df_masterjoin2.groupby('played_or_not').count().show()

In [49]:
df_masterjoin2 = df_masterjoin2.drop('playtime_forever')

In [50]:
vectorAssembler = VectorAssembler(inputCols = [i for i in df_masterjoin2.columns if i != 'played_or_not'], outputCol = 'features') #
v_df_masterjoin = vectorAssembler.transform(df_masterjoin2)
v_df_masterjoin = v_df_masterjoin.select(['features', 'played_or_not'])

In [51]:
splits = v_df_masterjoin.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [52]:
from pyspark.ml.classification import LogisticRegression
logr = LogisticRegression(featuresCol = 'features', labelCol = 'played_or_not', maxIter=10)

In [134]:
paramGrid = ParamGridBuilder()\
    .addGrid(lr.aggregationDepth,[2])\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.maxIter,[10])\
    .addGrid(lr.regParam,[0.01, 2.0]) \
    .build()

gbt_evaluator = MulticlassClassificationEvaluator(
    labelCol="played_or_not", predictionCol="prediction", metricName="f1")

cv_logr = CrossValidator(estimator=logr, estimatorParamMaps=paramGrid, evaluator=gbt_evaluator, numFolds=2)
# Run cross validations
lrModel = cv_logr.fit(train_df)

KeyboardInterrupt: 

In [135]:
###
#lrbestModel = lrModel.bestModel

results = lrModel.transform(train_df)
results.select('prediction', 'played_or_not', 'features').show(5)

f1 = gbt_evaluator.evaluate(results)
print("\nF1 on train data = %g" % f1)

TP = results.filter((results.prediction==1) & (results.played_or_not==1)).count()
FP = results.filter((results.prediction==1) & (results.played_or_not!=1)).count()
TN = results.filter((results.prediction==0) & (results.played_or_not==0)).count()
FN = results.filter((results.prediction==0) & (results.played_or_not!=0)).count()

pr = TP/(TP+FP)
rc = TP/(TP+FN)
f1 = (2*pr*rc)/(pr+rc)
print(TP,FP,TN,FN,pr,rc,f1)

+----------+-------------+--------------------+
|prediction|played_or_not|            features|
+----------+-------------+--------------------+
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
+----------+-------------+--------------------+
only showing top 5 rows


F1 on train data = 0.716352
15383915 4851615 7347023 4064980 0.7602427512400218 0.7909917247226642 0.7753124809040323


In [None]:
predictions_test = lrbestModel.transform(test_df)
predictions_test.select('prediction', 'played_or_not', 'features').show(5)

f1 = gbt_evaluator.evaluate(predictions_test)
print("\nF1 on test data = %g" % f1)

In [None]:
lrModel.coefficients

In [136]:
lrbestModel.explainParams

<bound method Params.explainParams of LogisticRegressionModel: uid = LogisticRegression_7b14ffab857c, numClasses = 2, numFeatures = 37>

In [82]:
trainingSummary = lrbestModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
print(maxFMeasure)

bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
    
lrModel.setThreshold(bestThreshold)

objectiveHistory:
0.6666527819054636
0.6666181533548772
0.6666181533548772
0.6665835009808695
0.6665835009808695
0.6665606634534205
0.666483037894898
0.6661413701545481
0.6653948042538465
0.6633384561881391
0.6583850775403209
0.6470669791708894
0.6255294361023934
+--------------------+--------------------+
|                 FPR|                 TPR|
+--------------------+--------------------+
|                 0.0|                 0.0|
|6.530816501272877E-4|0.006604622433660078|
|0.001156789114147...|0.012711463777171344|
|0.001686567374644...|0.018969253817302825|
|0.002511404743704...|0.028100930180212487|
|0.003109967275411566| 0.03440828239457486|
|0.003947348140889864|0.042787528861354805|
|0.004679298049543696| 0.04977800618879374|
| 0.00548175388577843| 0.05720410623641764|
|0.006301426285724...| 0.06447247106563868|
|0.007105275843783788| 0.07126027817438657|
|0.007976679918501706| 0.07839358093752405|
| 0.00888013959517792| 0.08550307946137845|
|0.009669314082263692|  0.091749

AttributeError: 'LogisticRegressionModel' object has no attribute 'setThreshold'

In [None]:
'''import matplotlib.pyplot as plt
plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(lrbestModel.summary.roc.select('FPR').collect(),
         lrbestModel.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()'''

In [2]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.mllib.evaluation import MulticlassMetrics

transformed = lrbestModel.transform(test_df)

results = transformed.select(['prediction', 'played_or_not'])
predictionAndLabels=results
metrics = MulticlassMetrics(predictionAndLabels.rdd)

In [158]:
predictions.limit(4).show()

+--------------------+-------------+--------------------+--------------------+----------+
|            features|played_or_not|       rawPrediction|         probability|prediction|
+--------------------+-------------+--------------------+--------------------+----------+
|(37,[0,1,2,3,4,5,...|            1|[-0.5275915085578...|[0.37107880539092...|       1.0|
|(37,[0,1,2,3,4,5,...|            1|[-0.2163931622497...|[0.44611182614903...|       1.0|
|(37,[0,1,2,3,4,5,...|            1|[-0.0680174750095...|[0.48300218393434...|       1.0|
|(37,[0,1,2,3,4,5,...|            1|[-0.5111459481584...|[0.37492492697711...|       1.0|
+--------------------+-------------+--------------------+--------------------+----------+



In [57]:
results.printSchema()

root
 |-- prediction: double (nullable = false)
 |-- played_or_not: integer (nullable = false)



In [None]:
TP = results.filter((results.prediction==1) & (results.played_or_not==1)).count()
FP = results.filter((results.prediction==1) & (results.played_or_not!=1)).count()
TN = results.filter((results.prediction==0) & (results.played_or_not==0)).count()
FN = results.filter((results.prediction==0) & (results.played_or_not!=0)).count()

In [64]:
pr = TP/(TP+FP)
rc = TP/(TP+FN)
f1 = (2*pr*rc)/(pr+rc)
print(TP,FP,TN,FN,pr,rc,f1)

6434587 1909805 3316757 1904835 0.7711271234620809 0.7715866879023511 0.7713568372315827


In [53]:
#Implement RandomForest Classifier
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="played_or_not", featuresCol="features")

In [59]:
paramGrid = {"max_depth": [3, None],
              "max_features": [10],
              "min_samples_split": [10],
              "min_samples_leaf": [10],
              "bootstrap": [True],
              "criterion": ["gini"],
              "n_estimators": [10]}

evaluator = MulticlassClassificationEvaluator(
    labelCol="played_or_not", predictionCol="prediction", metricName="f1")

cv_rf = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)

# Train model.  This also runs the indexers.
rf_fit = rf.fit(train_df)

In [60]:
###rfbestModel = rf_fit.bestModel

results = rf_fit.transform(train_df)
results.select('prediction', 'played_or_not', 'features').show(5)

f1 = evaluator.evaluate(results)
print("\nF1 on train data = %g" % f1)

TP = results.filter((results.prediction==1) & (results.played_or_not==1)).count()
FP = results.filter((results.prediction==1) & (results.played_or_not!=1)).count()
TN = results.filter((results.prediction==0) & (results.played_or_not==0)).count()
FN = results.filter((results.prediction==0) & (results.played_or_not!=0)).count()

pr = TP/(TP+FP)
rc = TP/(TP+FN)
f1 = (2*pr*rc)/(pr+rc)
print(TP,FP,TN,FN,pr,rc,f1)

+----------+-------------+--------------------+
|prediction|played_or_not|            features|
+----------+-------------+--------------------+
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
+----------+-------------+--------------------+
only showing top 5 rows


F1 on train data = 0.73465
19133488 6576139 6148984 1825453 0.744214919959749 0.9129033761772601 0.8199732205196439


In [121]:
###
predictions_test = rf_fit.transform(test_df)
predictions_test.select('prediction', 'played_or_not', 'features').show(5)

f1 = evaluator.evaluate(predictions_test)
print("\nF1 on train data = %g" % f1)

+----------+-------------+--------------------+
|prediction|played_or_not|            features|
+----------+-------------+--------------------+
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
|       1.0|            1|(36,[0,1,2,3,4,5,...|
+----------+-------------+--------------------+
only showing top 5 rows


F1 on train data = 0.730671


In [None]:
'''import matplotlib.pyplot as plt
plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(lrbestModel.summary.roc.select('FPR').collect(),
         lrbestModel.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()'''

In [122]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.mllib.evaluation import MulticlassMetrics

transformed = rf_fit.transform(test_df)

results_rf = transformed.select(['prediction', 'played_or_not'])
predictionAndLabels=results_rf.rdd
metrics = MulticlassMetrics(predictionAndLabels)

# Overall statistics
TP = results_rf.filter((results_rf.prediction==1) & (results_rf.played_or_not==1)).count()
FP = results_rf.filter((results_rf.prediction==1) & (results_rf.played_or_not!=1)).count()
TN = results_rf.filter((results_rf.prediction==0) & (results_rf.played_or_not==0)).count()
FN = results_rf.filter((results_rf.prediction==0) & (results_rf.played_or_not!=0)).count()

pr = TP/(TP+FP)
rc = TP/(TP+FN)
f1 = (2*pr*rc)/(pr+rc)
print(TP,FP,TN,FN,pr,rc,f1)

7588084 2693208 2533714 745087 0.7380477084008508 0.9105878182506996 0.8152890577611613


In [68]:
list(zip(vectorAssembler.getInputCols(), rf_fit.featureImportances))

[('app_id', 0.18030579349020562),
 ('price', 0.052442408753893936),
 ('requiredAge', 0.007699913259432337),
 ('isMultiplayer', 0.003478697786304344),
 ('achivements_cnt', 0.1226161830490817),
 ('Indie', 0.003211549608079511),
 ('Other', 0.0004116671842941346),
 ('Sports', 1.8418254412799813e-05),
 ('Simulation', 0.0),
 ('Strategy', 0.00012612944552430237),
 ('RPG', 0.001221428097685736),
 ('Violent', 0.0),
 ('Casual', 0.0),
 ('Adventure', 0.0),
 ('Early Access', 0.0),
 ('Action', 0.003370402702154244),
 ('Free to Play', 0.002913056304380445),
 ('total_playtime_forever', 0.019610596359290612),
 ('total_games_owned', 0.0010781985387321707),
 ('total_playtime_2weeks', 1.1492016921713716e-06),
 ('total_games_not_played', 0.0355406699651781),
 ('total_games_played', 0.0514592016036887),
 ('pct_games_played', 0.42822918022304285),
 ('Indie_Cnt', 0.007828543967610822),
 ('Other_Cnt', 0.00026190509261707644),
 ('Sports_Cnt', 0.00038701626270018686),
 ('Simulation_Cnt', 0.0),
 ('Strategy_Cnt', 

In [63]:
rf_fit.featureImportances

SparseVector(36, {0: 0.1803, 1: 0.0524, 2: 0.0077, 3: 0.0035, 4: 0.1226, 5: 0.0032, 6: 0.0004, 7: 0.0, 9: 0.0001, 10: 0.0012, 15: 0.0034, 16: 0.0029, 17: 0.0196, 18: 0.0011, 19: 0.0, 20: 0.0355, 21: 0.0515, 22: 0.4282, 23: 0.0078, 24: 0.0003, 25: 0.0004, 27: 0.0173, 28: 0.0023, 30: 0.0066, 31: 0.0108, 32: 0.0001, 33: 0.0282, 34: 0.0046, 35: 0.0079})