# Apache Spark for Machine Learning
### By Khyatee Desai<br>Nov. 23, 2020


## Import PySpark using Docker
Run the followingDocker command in the terminal to import PySpark for Jupyter Notebooks<br>Source: https://hub.docker.com/r/jupyter/pyspark-notebook

In [1]:
# !docker pull jupyter/pyspark-notebook

In [2]:
# !docker run -it jupyter/pyspark-notebook:latest /bin/bash

### Import PySpark
Once PySpark has been installed, import it and instantiate a SparkContext object

In [3]:
import pyspark
sc = pyspark.SparkContext('local[*]')

Verify successful installation by running the following code:

In [4]:
sc.parallelize(range(1000)).count()
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

[729, 375, 601, 610, 695]

Inspect available PySpark attributes:

In [5]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'binaryRecords',
 'broadcas

Inspect the version of Spark and the number of cores being used

In [6]:
print('PySpark Version:',sc.version)
print('Number of Cores Used:',sc.defaultParallelism)

PySpark Version: 2.4.4
Number of Cores Used: 4


Terminate your SparkContext instance with the following command

In [91]:
sc.stop()

# Machine Learning with Spark

In [92]:
# Import Libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassificationModel, RandomForestClassifier
from pyspark.ml import feature
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoderEstimator

# Instantiate Spark Context, and create a Spark Session
sc = SparkContext('local[*]')
spark = SparkSession(sc)

Dataset: https://www.kaggle.com/datasnaek/chess

In [93]:
# Read data into a pyspark dataframe
df = spark.read.csv('games.csv', header='true', inferSchema='true')

### Inspect  attributes of the spark dataframe

In [94]:
df.head(1)

[Row(id='TZJHLljE', rated=False, created_at=Decimal('1504210000000'), last_move_at=Decimal('1504210000000'), turns=13, victory_status='outoftime', winner='white', increment_code='15+2', white_id='bourgris', white_rating=1500, black_id='a-00', black_rating=1191, moves='d4 d5 c4 c6 cxd5 e6 dxe6 fxe6 Nf3 Bb4+ Nc3 Ba5 Bf4', opening_eco='D10', opening_name='Slav Defense: Exchange Variation', opening_ply=5)]

In [12]:
df.columns

['id',
 'rated',
 'created_at',
 'last_move_at',
 'turns',
 'victory_status',
 'winner',
 'increment_code',
 'white_id',
 'white_rating',
 'black_id',
 'black_rating',
 'moves',
 'opening_eco',
 'opening_name',
 'opening_ply']

In [13]:
df.select('winner')

DataFrame[winner: string]

In [14]:
df['winner']

Column<b'winner'>

In [15]:
df.dtypes

[('id', 'string'),
 ('rated', 'boolean'),
 ('created_at', 'decimal(20,0)'),
 ('last_move_at', 'decimal(20,0)'),
 ('turns', 'int'),
 ('victory_status', 'string'),
 ('winner', 'string'),
 ('increment_code', 'string'),
 ('white_id', 'string'),
 ('white_rating', 'int'),
 ('black_id', 'string'),
 ('black_rating', 'int'),
 ('moves', 'string'),
 ('opening_eco', 'string'),
 ('opening_name', 'string'),
 ('opening_ply', 'int')]

In [16]:
# View distinct values
df.select('victory_status').distinct().collect()

[Row(victory_status='resign'),
 Row(victory_status='outoftime'),
 Row(victory_status='mate'),
 Row(victory_status='draw')]

In [17]:
df.select('winner').distinct().collect()

[Row(winner='white'), Row(winner='black'), Row(winner='draw')]

## Data Cleaning & Feature Engineering

In [95]:
# Create a new column for game duration
df_new = df.withColumn("duration", df['last_move_at'] - df['created_at'])
df_new.head(1)

[Row(id='TZJHLljE', rated=False, created_at=Decimal('1504210000000'), last_move_at=Decimal('1504210000000'), turns=13, victory_status='outoftime', winner='white', increment_code='15+2', white_id='bourgris', white_rating=1500, black_id='a-00', black_rating=1191, moves='d4 d5 c4 c6 cxd5 e6 dxe6 fxe6 Nf3 Bb4+ Nc3 Ba5 Bf4', opening_eco='D10', opening_name='Slav Defense: Exchange Variation', opening_ply=5, duration=Decimal('0'))]

In [96]:
# Create dummy variables for victory_status, increment_code, and opening_name
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df_new) 
            for column in list(set(['victory_status', 'increment_code','opening_name'])) ]

# put all the string indexers in a pipeline, and then just fit&transform the pipeline
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df_new).transform(df_new)

df_r.head()

Row(id='TZJHLljE', rated=False, created_at=Decimal('1504210000000'), last_move_at=Decimal('1504210000000'), turns=13, victory_status='outoftime', winner='white', increment_code='15+2', white_id='bourgris', white_rating=1500, black_id='a-00', black_rating=1191, moves='d4 d5 c4 c6 cxd5 e6 dxe6 fxe6 Nf3 Bb4+ Nc3 Ba5 Bf4', opening_eco='D10', opening_name='Slav Defense: Exchange Variation', opening_ply=5, duration=Decimal('0'), increment_code_index=21.0, victory_status_index=2.0, opening_name_index=234.0)

In [101]:
# fit and transform the OneHotEncoderEstimator
encoder = feature.OneHotEncoderEstimator(inputCols=['victory_status_index', 'increment_code_index', 
                        'opening_name_index'], outputCols=['victory_status_dummy', 
                        'increment_code_dummy', 'opening_name_dummy'], dropLast=True)
dummy_df = encoder.fit(df_r).transform(df_r)
dummy_df.head()

Row(id='TZJHLljE', rated=False, created_at=Decimal('1504210000000'), last_move_at=Decimal('1504210000000'), turns=13, victory_status='outoftime', winner='white', increment_code='15+2', white_id='bourgris', white_rating=1500, black_id='a-00', black_rating=1191, moves='d4 d5 c4 c6 cxd5 e6 dxe6 fxe6 Nf3 Bb4+ Nc3 Ba5 Bf4', opening_eco='D10', opening_name='Slav Defense: Exchange Variation', opening_ply=5, duration=Decimal('0'), increment_code_index=21.0, victory_status_index=2.0, opening_name_index=234.0, victory_status_dummy=SparseVector(3, {2: 1.0}), increment_code_dummy=SparseVector(399, {21: 1.0}), opening_name_dummy=SparseVector(1476, {234: 1.0}))

In [102]:
# Change boolean values to 0 or 1, and set winner outcome to 0, 1, or 2
import pyspark.sql.functions as F
dummy_df = dummy_df.withColumn('rated', F.when(df.rated == 'False', 0).otherwise(1))
dummy_df = dummy_df.withColumn('winner', F.when(df.winner == 'white', 0).when(df.winner == 'black', 1).otherwise(2))
dummy_df.head()

Row(id='TZJHLljE', rated=0, created_at=Decimal('1504210000000'), last_move_at=Decimal('1504210000000'), turns=13, victory_status='outoftime', winner=0, increment_code='15+2', white_id='bourgris', white_rating=1500, black_id='a-00', black_rating=1191, moves='d4 d5 c4 c6 cxd5 e6 dxe6 fxe6 Nf3 Bb4+ Nc3 Ba5 Bf4', opening_eco='D10', opening_name='Slav Defense: Exchange Variation', opening_ply=5, duration=Decimal('0'), increment_code_index=21.0, victory_status_index=2.0, opening_name_index=234.0, victory_status_dummy=SparseVector(3, {2: 1.0}), increment_code_dummy=SparseVector(399, {21: 1.0}), opening_name_dummy=SparseVector(1476, {234: 1.0}))

In [103]:
# Drop un-needed columns
trimmed_df = dummy_df.drop('id', 'white_id', 'black_id', 'moves', 'last_move_at', 'created_at','opening_eco', 'opening_name', 'opening_name_index',
                    'victory_status', 'victory_status_index', 'increment_code', 'increment_code_index')
trimmed_df.dtypes

[('rated', 'int'),
 ('turns', 'int'),
 ('winner', 'int'),
 ('white_rating', 'int'),
 ('black_rating', 'int'),
 ('opening_ply', 'int'),
 ('duration', 'decimal(21,0)'),
 ('victory_status_dummy', 'vector'),
 ('increment_code_dummy', 'vector'),
 ('opening_name_dummy', 'vector')]

## Modeling

In [104]:
target = 'winner'
features = trimmed_df.drop('winner').columns

# create a vectorized dataframe using our feature colummns
vector = VectorAssembler(inputCols=features, outputCol='features')
vector_df = vector.transform(trimmed_df)
vector_df.head()

Row(rated=0, turns=13, winner=0, white_rating=1500, black_rating=1191, opening_ply=5, duration=Decimal('0'), victory_status_dummy=SparseVector(3, {2: 1.0}), increment_code_dummy=SparseVector(399, {21: 1.0}), opening_name_dummy=SparseVector(1476, {234: 1.0}), features=SparseVector(1884, {1: 13.0, 2: 1500.0, 3: 1191.0, 4: 5.0, 8: 1.0, 30: 1.0, 642: 1.0}))

In [105]:
# Train/Test Split
splits = [.75, .25]
train_data, test_data = vector_df.randomSplit(splits)

In [106]:
# instantiate and fit the model to training data
forest_model = RandomForestClassifier(featuresCol='features', labelCol='winner', 
                            predictionCol='prediction', maxDepth=30, 
                            impurity='gini', subsamplingRate= .5).fit(train_data)

In [107]:
# Predict on test data
predictions = forest_model.transform(test_data).select('winner', 'prediction')
predictions.head(10)

[Row(winner=0, prediction=1.0),
 Row(winner=0, prediction=0.0),
 Row(winner=0, prediction=0.0),
 Row(winner=0, prediction=1.0),
 Row(winner=1, prediction=1.0),
 Row(winner=1, prediction=1.0),
 Row(winner=1, prediction=1.0),
 Row(winner=1, prediction=0.0),
 Row(winner=0, prediction=1.0),
 Row(winner=0, prediction=0.0)]

## Model Evaluation

In [108]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='winner')

In [109]:
# inspect F1 and accuracy scores
print('F1 Score:',evaluator.evaluate(predictions,{evaluator.metricName: 'f1'}))
print('Accuracy:',evaluator.evaluate(predictions,{evaluator.metricName: 'accuracy'}))

F1 Score: 0.5598714481554765
Accuracy: 0.5901926444833625


In [110]:
# Inspect feature importance
sorted(list(zip(forest_model.featureImportances,features)),reverse=True)

[(0.11574097660852146, 'black_rating'),
 (0.1079818686928678, 'white_rating'),
 (0.07061671535143461, 'victory_status_dummy'),
 (0.05389688481245503, 'turns'),
 (0.04311558176022092, 'increment_code_dummy'),
 (0.01631043028686818, 'opening_ply'),
 (0.014358309278006357, 'opening_name_dummy'),
 (0.014072792937364286, 'duration'),
 (0.004944454015592373, 'rated')]