## Initialization

Import findspark and initialize, then import pyspark and sparkSession and create a Spark Session

In [None]:
#Findspark searches for a local installation of spark to use
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

#Create the Spark Session 
spark = SparkSession.builder.master("<spark url>") \ #The master node
                            .appName("GBDT") \ #The app name to show on the spark url
                            .config("spark.executor.instances", "1") \ #How many workers to use
                            .config("spark.executor.cores", "2") \ #How many cores each worker uses
                            .config("spark.executor.memory", "1536m") \ #How much memory each worker uses
                            .getOrCreate()

Create schema and Dataframe

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType,\
                                FloatType, LongType, DecimalType
schema = StructType([ \
                     StructField('index', StringType(), True), \
                     StructField('app_id', LongType(), True), \
                     StructField('app_name', StringType(), True), \
                     StructField('review_id', LongType(), True), \
                     StructField('language', StringType(), True), \
                     StructField('review', StringType(), True), \
                     StructField('timestamp_created', LongType(), True), \
                     StructField('timestamp_updated', LongType(), True), \
                     StructField('recommended', BooleanType(), True), \
                     StructField('votes_helpful', IntegerType(), True), \
                     StructField('votes_funny', IntegerType(), True), \
                     StructField('weighted_vote_score', FloatType(), True), \
                     StructField('comment_count', IntegerType(), True), \
                     StructField('steam_purchase', BooleanType(), True), \
                     StructField('received_for_free', BooleanType(), True), \
                     StructField('written_during_early_access', BooleanType(), True), \
                     StructField('author_steamid', LongType(), True), \
                     StructField('author_num_games_owned', IntegerType(), True), \
                     StructField('author_num_reviews', IntegerType(), True), \
                     StructField('author_playtime_forever', DecimalType(), True), \
                     StructField('author_playtime_last_two_weeks', DecimalType(), True), \
                     StructField('author_playtime_at_review', DecimalType(), True), \
                     StructField('author_last_played', FloatType(), True), \
                    ])

In [None]:
from pyspark.sql.functions import col

#Read csv
df = spark.read.csv("steam_reviews.csv", header = True, schema = schema, multiLine = True,\
                    lineSep = "\r")
#Remove columns of string type, because string types and numeric types need different handling for the 
#GBDT algorithm
df = df.drop("review", "index", "app_name", "language")
#Cast Boolean Type column "recommended" to IntegerType
df = df.withColumn("recommended",col("recommended").cast(IntegerType()))
#Remove missing entries
df = df.na.drop()

Prepare data for Classification

In [None]:
from pyspark.ml.feature import VectorAssembler

#Gather columns in one
cols = df.columns
cols.remove("recommended")
assembler = VectorAssembler(inputCols = cols, outputCol = "features")

#This leaves with two columns: Features and Label("recommended")
data = assembler.transform(df)
data = data.select("features", "recommended")
data.show(truncate = False)

In [None]:
#Split the data randomly in train set and test set
train, test = data.randomSplit([0.9, 0.1])

## Execution

In [None]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

t = time.time()

gbt = GBTClassifier(labelCol = "recommended", featuresCol = "features")
#Cache the train dataset for use
train.cache()
model = gbt.fit(train)
prediction_test = model.transform(test)
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='recommended',\
                                          metricName='accuracy')
evaluator.evaluate(prediction_test)
duration = (time.time() - t)
#Remove the dataset from cache
train.unpersist()
#Print duration of the execution
duration