# **Setting up Collab**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 45kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 38.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=dbaaa15194ca192befd2ec418679f95fea569048b2649a4778d36512e041a3df
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


# **Model Creation**

In [None]:

# Setting up Spark context using SparkSession
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('predictor-models').master("local[*]").getOrCreate()

## **Loading the Data**

In [None]:
mid_df = spark.read.csv('drive/MyDrive/dataset/mid_model_features_wh2/part-*.csv', header=True, inferSchema=True)
mid_df.printSchema()

root
 |-- winner: string (nullable = true)
 |-- mid_golddiff: integer (nullable = true)
 |-- bKillCount: integer (nullable = true)
 |-- rKillCount: integer (nullable = true)
 |-- first_blood: string (nullable = true)
 |-- bTowerCount: integer (nullable = true)
 |-- rTowerCount: integer (nullable = true)
 |-- first_tower: string (nullable = true)
 |-- bInhibCount: integer (nullable = true)
 |-- rInhibCount: integer (nullable = true)
 |-- first_inhib: string (nullable = true)
 |-- bDragonCount: integer (nullable = true)
 |-- rDragonCount: integer (nullable = true)
 |-- first_dragon: string (nullable = true)
 |-- bBaronCount: integer (nullable = true)
 |-- rBaronCount: integer (nullable = true)
 |-- first_baron: string (nullable = true)



**Features:**
*   mid_goldiff: This is be the difference between the gold in each team. Computed by Blue(1) - Red(0) gold.
*   bKillCount: The number of kills that the Blue side has.
*   rKillCount: The number of kills that the Red side has.
*   first_blood: Which team got the first kills of the game.
*   bTowerCount: The number of towers that the Blue side knocked down.
*   rTowerCount: The number of towers that the Red side knocked down.
*   first_tower: Which team knocked down the first tower of the game.
*   bInhibCount: The number of inhibitors that the Blue side knocked down.
*   rInhibCount: The number of inhibitors that the Red side knocked down.
*   first_inhib: Which team knocked down the first inhibitor of the game.
*   bDragonCount: The number of dragons that the Blue side has.
*   rDragonCount: The number of dragons that the Red side has.
*   first_dragon: Which team got the first dragon of the game.
*   bBaronCount: The number of barons that the Blue side has.
*   rBaronCount: The number of barons that the Red side has.
*   first_baron: Which team got the first baron of the game


**Label:**


*   winner: Indicates which team won the match





Looking at the first 5 matches in our dataset

In [None]:
import pandas as pd
pd.DataFrame(mid_df.take(5), columns=mid_df.columns).transpose()

Unnamed: 0,0,1,2,3,4
winner,Blue,Red,Blue,Red,Blue
mid_golddiff,1422,1368,5293,2712,-730
bKillCount,4,5,11,5,1
rKillCount,3,3,6,5,1
first_blood,Blue,Blue,Blue,Red,Blue
bTowerCount,4,1,3,2,1
rTowerCount,2,1,0,1,3
first_tower,Blue,Read,Blue,Blue,Read
bInhibCount,0,0,0,0,0
rInhibCount,0,0,0,0,0


Below we can see that the number of records where blue is the winner and when red is the winner is almost balanced.

In [None]:
mid_df.groupby('winner').count().toPandas()

Unnamed: 0,winner,count
0,Blue,4146
1,Red,3474


# **Summary Statistics**

Here we can see the mean, standard deviation, max, and min for our numerical variables.

In [None]:
numeric_data = [t[0] for t in mid_df.dtypes if t[1] == 'int']
mid_df.select(numeric_data).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
mid_golddiff,7620,209.72086614173227,2817.02348085286,-13591,12673
bKillCount,7620,3.862992125984252,2.802155522367613,0,31
rKillCount,7620,3.744225721784777,2.825049109605663,0,20
bTowerCount,7620,1.7263779527559056,1.323631865981659,0,8
rTowerCount,7620,1.5360892388451444,1.2876729076787756,0,9
bInhibCount,7620,0.0032808398950131233,0.06573059977889233,0,3
rInhibCount,7620,0.0034120734908136482,0.07053997021908105,0,4
bDragonCount,7620,0.6570866141732283,0.7255637472953317,0,4
rDragonCount,7620,0.75748031496063,0.7647421170462376,0,5



# **Model Building**
### **Preparing Data**

Before we can create models, we must prepare our data so that the model can use our data and estimate parameters needed to predict game results. The data has already been altered during the firt phase of data preprocessing, however, we need to encode categroical features to numerical since machine learning models can only work with numbers. Also, we need to normalize our numeric features. This is necessar because the numeric features are in very different ranges. Numeric features that have larger values such as gold difference can influence the result more due to its larger values, however, this feature might not be as important as the other features.



### **Encoding Categorical Features**



In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# Creating coverting categorical feature to  numerical representations
categorical_features = [t[0] for t in mid_df.dtypes if t[1] == 'string' and t[0] != 'winner' ]
stages_v1 = [] # Stages in our Pipeline

for col in categorical_features:
  strIndexer = StringIndexer(inputCol=col, outputCol=col + "Index")
  encoder = OneHotEncoder(inputCols=[strIndexer.getOutputCol()], outputCols=[col + "classVec"])
  stages_v1 += [strIndexer, encoder]


Since we doing a binary classification, our label is also categorical which we have to convert to numerical. Our label only have category which means that we can use integer encoding rather than one-hot encoding.

In [None]:
label_strIndexer = StringIndexer(inputCol="winner", outputCol="label")
stages_v1 += [label_strIndexer]

**Normalizing Numeric Features**

In [None]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

num_features = [t[0] for t in mid_df.dtypes if t[1] == 'int']

for col in num_features:
  assembler = VectorAssembler(inputCols=[col], outputCol=col + "_Vect")
  scaler = MinMaxScaler(inputCol=assembler.getOutputCol(), outputCol=col+"_Scaled")
  stages_v1 +=[assembler,scaler]


### **Combining All Features**
Now, we will combine all the features columns into a single vector column that we will call features which will be the input data to our models

In [None]:
from pyspark.ml.feature import VectorAssembler

assemblerInputs = [col + "classVec" for col in categorical_features] + [col + "_Scaled" for col in num_features]
assembler2 = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages_v1 += [assembler2]


Now that we have defined all our data transformations, it still has not been applied to our data. They will be applied when we start training a model. Below will be the number of transformations that our data will go through.

In [None]:
stages_v1

[StringIndexer_7046474865f2,
 OneHotEncoder_5b2205b1b3f0,
 StringIndexer_453b40b54bfa,
 OneHotEncoder_c9a444c73bb2,
 StringIndexer_a73781000fc6,
 OneHotEncoder_4a15ab493e2e,
 StringIndexer_f24fc67df6ce,
 OneHotEncoder_0a6ee7288645,
 StringIndexer_6841a6612552,
 OneHotEncoder_c7eff41dcc1e,
 StringIndexer_3a221ad1acb9,
 VectorAssembler_d1c5407d1d57,
 MinMaxScaler_e41f52baf1ae,
 VectorAssembler_eb35aac320a5,
 MinMaxScaler_66b1fc9d7c20,
 VectorAssembler_fb05c92d41de,
 MinMaxScaler_fa88df29e180,
 VectorAssembler_6e1eb553039e,
 MinMaxScaler_66dfba3e444f,
 VectorAssembler_e5b9cd1a92df,
 MinMaxScaler_d8b3c4de8411,
 VectorAssembler_6a3e88c6e8b1,
 MinMaxScaler_b87de969cb20,
 VectorAssembler_56f9b90d0f79,
 MinMaxScaler_8403daf5ee5f,
 VectorAssembler_35d10b2c7731,
 MinMaxScaler_6c822238c5bb,
 VectorAssembler_679b29239edc,
 MinMaxScaler_686cbdaee021,
 VectorAssembler_2cc7619997e9,
 MinMaxScaler_4172e5612fd3,
 VectorAssembler_eaa6f6c5f877,
 MinMaxScaler_d6fb5a6af0f6,
 VectorAssembler_71c18b8f9e9a]

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages_v1)
pipelineModel = pipeline.fit(mid_df)
transformed_data = pipelineModel.transform(mid_df)

In [None]:
selectedCols = ['label', 'features'] + mid_df.columns
model_df = transformed_data.select(selectedCols)
model_df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- winner: string (nullable = true)
 |-- mid_golddiff: integer (nullable = true)
 |-- bKillCount: integer (nullable = true)
 |-- rKillCount: integer (nullable = true)
 |-- first_blood: string (nullable = true)
 |-- bTowerCount: integer (nullable = true)
 |-- rTowerCount: integer (nullable = true)
 |-- first_tower: string (nullable = true)
 |-- bInhibCount: integer (nullable = true)
 |-- rInhibCount: integer (nullable = true)
 |-- first_inhib: string (nullable = true)
 |-- bDragonCount: integer (nullable = true)
 |-- rDragonCount: integer (nullable = true)
 |-- first_dragon: string (nullable = true)
 |-- bBaronCount: integer (nullable = true)
 |-- rBaronCount: integer (nullable = true)
 |-- first_baron: string (nullable = true)



In [None]:
pd.DataFrame(model_df.take(5), columns=model_df.columns).transpose()

Unnamed: 0,0,1,2,3,4
label,0,1,0,1,0
features,"(1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, ...","(0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, ..."
winner,Blue,Red,Blue,Red,Blue
mid_golddiff,1422,1368,5293,2712,-730
bKillCount,4,5,11,5,1
rKillCount,3,3,6,5,1
first_blood,Blue,Blue,Blue,Red,Blue
bTowerCount,4,1,3,2,1
rTowerCount,2,1,0,1,3
first_tower,Blue,Read,Blue,Blue,Read



From the dataframe above, we can see that 0 indicates for Blue and 1 indicates for Red


**Splitting the data into train and test sets**

In [None]:
weights = [.8, .2] # train/test split
seed = 42
trainData, testData = model_df.randomSplit(weights,seed)
print("Training Dataset Count: " + str(trainData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 6166
Test Dataset Count: 1454


# **RandomForestModel:**

**Initial model**

In [None]:
from pyspark.ml.classification import RandomForestClassifier

# Create initial LogisticRegression model
rf = RandomForestClassifier(labelCol="label", featuresCol="features", maxDepth=30)

# Train model with Training Data
rfModel = rf.fit(trainData)


**Evaluating the Model**

The first thing that we will do is evaluate our the model that only had a max depth of 30.

In [None]:
predictions = rfModel.transform(testData)
predCols = ['mid_golddiff', 'bKillCount', 'rKillCount', 'first_blood',\
            'bTowerCount', 'rTowerCount', 'first_tower', 'bInhibCount',\
            'rInhibCount', 'first_inhib', 'bDragonCount', 'rDragonCount',\
            'first_dragon', 'bBaronCount', 'rBaronCount', 'first_baron',\
            'rawPrediction', 'prediction', 'probability']

pd.DataFrame(predictions.take(5), columns=predictions.columns).transpose()

Unnamed: 0,0,1,2,3,4
label,0,0,0,0,0
features,"(1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, ...","(1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, ...","(1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, ...","(1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, ...","(1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 1.0, ..."
winner,Blue,Blue,Blue,Blue,Blue
mid_golddiff,659,1166,2128,1625,2106
bKillCount,1,4,8,4,4
rKillCount,0,1,2,1,3
first_blood,Blue,Blue,Blue,Blue,Blue
bTowerCount,0,0,0,1,1
rTowerCount,0,0,0,0,0
first_tower,Blue,Blue,Blue,Blue,Blue


In [None]:

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print('Test Area Under ROC', evaluator.evaluate(predictions))

Test Area Under ROC 0.98327633378933



## **Cross-validate Model**

Here we will use Cross Validation to find the best decision tree model by trying out different max depths. For this cross validation, we are using ten-fold cross validation.

In [72]:
rf.explainParams()

"bootstrap: Whether bootstrap samples are used when building trees. (default: True)\ncacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)\nfeatureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the fe

In [None]:

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import *

testDept = [x for x in range(0, 31) if x%6==0]
testBin = [x for x in range(2, 32) if x%6==0]
testInstance = [x for x in range(1, 10) if x %3 ==0]
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
grid = ParamGridBuilder().addGrid(rf.maxDepth, testDept).addGrid(rf.maxBins, testBin).addGrid(rf.bootstrap,[True, False]).addGrid(rf.minInstancesPerNode, testInstance).build()
cvRF = CrossValidator(estimator=rf, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3)
cv_RF_model = cvRF.fit(model_df)


In [None]:
best_RF_model = cv_RF_model.bestModel

In [None]:
# Saving the model
cv_RF_model.save("RF_Model")
from google.colab import files
files.download('RF_Model')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Below we can see how well our model performed on each number of epochs. We can see that our model performs relatively the same after a certain point.

In [None]:
cv_RF_model.avgMetrics

[0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.5,
 0.9867249221778315,
 0.9868474568947445,
 0.9863921989325777,
 0.986934606156977,
 0.9865213460852587,
 0.9861846676223143,
 0.9867593899752192,
 0.9870620571400688,
 0.986647084705405,
 0.9867170536993686,
 0.9863123814360909,
 0.9859703901495378,
 0.9868305389766188,
 0.9864579893858163,
 0.9861984137035296,
 0.9868423116340965,
 0.9862063444001501,
 0.9860127395997562,
 0.9864245550894035,
 0.9863872783746492,
 0.9864203206039224,
 0.9868149208882855,
 0.9862074582118419,
 0.9861828252688583,
 0.9863705499721814,
 0.9864945039416861,
 0.9864245467884489,
 0.9866840866257013,
 0.9865952527222267,
 0.9862237501569258,
 0.9869556046172616,
 0.9873607885730575,
 0.9868193355968715,
 0.9870090812461996,
 0.9870313057534801,
 0.986805855237092,
 0.98681620883732,
 0.9871880754132871,
 0.9870309518381186,
 0.987

Best paramters


In [None]:
best_RF_model = cv_RF_model.bestModel

In [None]:
# Saving the model
best_RF_model.save("drive/MyDrive/dataset/RF_Model")


In [None]:
print('maxDepth', best_RF_model.getMaxDepth())
print('maxBins', best_RF_model.getMaxBins())
print('minInstancePerNode', best_RF_model.getMinInstancesPerNode())
print('bootstrap', best_RF_model.getBootstrap())


maxDepth 18
maxBins 18
minInstancePerNode 3
bootstrap False


In [70]:
testPredictions = best_RF_model.transform(testData)
print('Test Area Under ROC', evaluator.evaluate(testPredictions))

Test Area Under ROC 0.9957088843289255
