In [1]:
import json
import os

import numpy as np
import pandas as pd
import pyspark.sql.functions as F
import regex as re

from IPython.display import display
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, LinearSVC, LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import *
from typing import *

In [2]:
spark = SparkSession \
    .builder \
    .appName('group2nba') \
    .getOrCreate()

In [3]:
path_main = '/project/ds5559/group2nba'

T = TypeVar('T')

In [4]:
class ML_CV():
    __slots__: List[str] = [
          'Model'
        , 'TestParameters'
        , 'BestParameters'
    ]
    
    def __init__(self, model: T, test_params: Callable, best_params: Callable):
        self.Model = model
        self.TestParameters = test_params
        self.BestParameters = best_params

In [5]:
FIELDS: Dict[str, T] = {
      'Date': StringType
    , 'HomeTeam': StringType
    , 'AwayTeam': StringType
    , 'Team': StringType
    , 'Year': IntegerType
    , 'Won': IntegerType
    , 'ScoreDiff': IntegerType
    , 'Quarter': IntegerType
    , 'ScoreDiff': IntegerType
    , 'SecLeftTotal': IntegerType
    , 'LogSecLeftTotal': DoubleType
    , 'SecLeftTotalInverse': DoubleType
    , 'HasPossession': IntegerType
    , 'assist_team_cnt': LongType
    , 'assist_opponent_cnt': LongType
    , 'turnover_team_cnt': LongType
    , 'turnover_opponent_cnt': LongType
    , 'block_team_cnt': LongType
    , 'block_opponent_cnt': LongType
    , 'foul_team_cnt': LongType
    , 'foul_opponent_cnt': LongType
    , 'rebound_team_cnt': LongType
    , 'rebound_opponent_cnt': LongType
    , 'shotOnGoal_team_cnt': LongType
    , 'shotOnGoal_opponent_cnt': LongType
    , 'freeThrow_team_cnt': LongType
    , 'freeThrow_opponent_cnt': LongType
    , 'SecLeftTotalInverseTimesScoreDiff': DoubleType
    , 'assist_diff': IntegerType
    , 'turnover_diff': IntegerType
    , 'block_diff': IntegerType
    , 'foul_diff': IntegerType
    , 'rebound_diff': IntegerType
    , 'shotOnGoal_diff': IntegerType
    , 'freeThrow_diff': IntegerType
}

In [9]:
# EDIT
MODEL_FEATURES = [
      'SecLeftTotal'
    , 'ScoreDiff'
    , 'HasPossession'
]

In [10]:
# def cross_validate(df: DataFrame, ml_method: str, features: List[str], k_folds: int = 10) -> DataFrame:
#     '''...'''
#     # retrieve the necessary information to cross validate the model type
#     method = DICT_ML[ml_method]
#     
#     # create pipeline
#     pipeline = Pipeline(stages = [
#           VectorAssembler(inputCols = features, outputCol = 'features')
#         , method.Model(featuresCol = 'features', labelCol = 'Won')
#     ])
#     
#     # define the hyperparmeters to test
#     param_grid = method.TestParameters(pipeline.getStages()[1])
#     
#     # classification method
#     classifier = BinaryClassificationEvaluator(
#           metricName = 'areaUnderROC'
#         , rawPredictionCol = 'rawPrediction'
#         , labelCol = 'Won'
#     )
#     
#     # do cross validation
#     cv_model = CrossValidator(
#           estimator = pipeline
#         , estimatorParamMaps = param_grid
#         , evaluator = classifier
#         , numFolds = k_folds
#     ).setParallelism(k_folds).fit(df)
# 
#     # return results as pandas dataframe
#     return pd.DataFrame({
#           'Method': [ml_method]
#         , 'ROC': [cv_model.avgMetrics[0]]
#         , 'HyperParameters': [json.dumps(method.BestParameters(cv_model.bestModel.stages[1]))]
#     })

In [11]:
schema_data = StructType([StructField(k, v()) for k, v in FIELDS.items()])

df_train = spark.read \
    .format('csv') \
    .option('header', True) \
    .schema(schema_data) \
    .load(f'{path_main}/stacked_data/*')

display(df_train.count())
display(df_train.printSchema())
display(df_train.head(2))

6078776

root
 |-- Date: string (nullable = true)
 |-- HomeTeam: string (nullable = true)
 |-- AwayTeam: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Won: integer (nullable = true)
 |-- ScoreDiff: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- SecLeftTotal: integer (nullable = true)
 |-- LogSecLeftTotal: double (nullable = true)
 |-- SecLeftTotalInverse: double (nullable = true)
 |-- HasPossession: integer (nullable = true)
 |-- assist_team_cnt: long (nullable = true)
 |-- assist_opponent_cnt: long (nullable = true)
 |-- turnover_team_cnt: long (nullable = true)
 |-- turnover_opponent_cnt: long (nullable = true)
 |-- block_team_cnt: long (nullable = true)
 |-- block_opponent_cnt: long (nullable = true)
 |-- foul_team_cnt: long (nullable = true)
 |-- foul_opponent_cnt: long (nullable = true)
 |-- rebound_team_cnt: long (nullable = true)
 |-- rebound_opponent_cnt: long (nullable = true)
 |-- shotOnGoal_team_cnt: lon

None

[Row(Date='December 22 2018', HomeTeam='WAS', AwayTeam='PHO', Team='PHO', Year=2018, Won=0, ScoreDiff=0, Quarter=1, SecLeftTotal=3761, LogSecLeftTotal=8.232706009860976, SecLeftTotalInverse=0.0002658160552897395, HasPossession=1, assist_team_cnt=0, assist_opponent_cnt=0, turnover_team_cnt=0, turnover_opponent_cnt=0, block_team_cnt=0, block_opponent_cnt=0, foul_team_cnt=0, foul_opponent_cnt=0, rebound_team_cnt=0, rebound_opponent_cnt=0, shotOnGoal_team_cnt=1, shotOnGoal_opponent_cnt=0, freeThrow_team_cnt=0, freeThrow_opponent_cnt=0, SecLeftTotalInverseTimesScoreDiff=0.0, assist_diff=0, turnover_diff=0, block_diff=0, foul_diff=0, rebound_diff=0, shotOnGoal_diff=1, freeThrow_diff=0),
 Row(Date='December 22 2018', HomeTeam='WAS', AwayTeam='PHO', Team='PHO', Year=2018, Won=0, ScoreDiff=0, Quarter=1, SecLeftTotal=3760, LogSecLeftTotal=8.232440158470336, SecLeftTotalInverse=0.00026588673225206064, HasPossession=0, assist_team_cnt=0, assist_opponent_cnt=0, turnover_team_cnt=0, turnover_opponen

In [14]:
df_train = df_train \
    .where(F.col('SecLeftTotal') <= 300)

display(df_train.count())
display(df_train.printSchema())
display(df_train.head(2))

730846

root
 |-- Date: string (nullable = true)
 |-- HomeTeam: string (nullable = true)
 |-- AwayTeam: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Won: integer (nullable = true)
 |-- ScoreDiff: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- SecLeftTotal: integer (nullable = true)
 |-- LogSecLeftTotal: double (nullable = true)
 |-- SecLeftTotalInverse: double (nullable = true)
 |-- HasPossession: integer (nullable = true)
 |-- assist_team_cnt: long (nullable = true)
 |-- assist_opponent_cnt: long (nullable = true)
 |-- turnover_team_cnt: long (nullable = true)
 |-- turnover_opponent_cnt: long (nullable = true)
 |-- block_team_cnt: long (nullable = true)
 |-- block_opponent_cnt: long (nullable = true)
 |-- foul_team_cnt: long (nullable = true)
 |-- foul_opponent_cnt: long (nullable = true)
 |-- rebound_team_cnt: long (nullable = true)
 |-- rebound_opponent_cnt: long (nullable = true)
 |-- shotOnGoal_team_cnt: lon

None

[Row(Date='December 22 2018', HomeTeam='WAS', AwayTeam='PHO', Team='PHO', Year=2018, Won=0, ScoreDiff=0, Quarter=6, SecLeftTotal=300, LogSecLeftTotal=5.707110264748875, SecLeftTotalInverse=0.0033222591362126247, HasPossession=1, assist_team_cnt=27, assist_opponent_cnt=38, turnover_team_cnt=15, turnover_opponent_cnt=22, block_team_cnt=5, block_opponent_cnt=6, foul_team_cnt=26, foul_opponent_cnt=23, rebound_team_cnt=59, rebound_opponent_cnt=57, shotOnGoal_team_cnt=105, shotOnGoal_opponent_cnt=101, freeThrow_team_cnt=32, freeThrow_opponent_cnt=22, SecLeftTotalInverseTimesScoreDiff=0.0, assist_diff=-11, turnover_diff=-7, block_diff=-1, foul_diff=3, rebound_diff=2, shotOnGoal_diff=4, freeThrow_diff=10),
 Row(Date='December 22 2018', HomeTeam='WAS', AwayTeam='PHO', Team='PHO', Year=2018, Won=0, ScoreDiff=0, Quarter=6, SecLeftTotal=300, LogSecLeftTotal=5.707110264748875, SecLeftTotalInverse=0.0033222591362126247, HasPossession=0, assist_team_cnt=27, assist_opponent_cnt=38, turnover_team_cnt=1

In [15]:
schema_cv = StructType([
      StructField('Method', StringType())
    , StructField('ROC', FloatType())
    , StructField('HyperParameters', StringType())
])

cv_results = spark.createDataFrame({}, schema = schema_cv)

# Gradient Boost

In [16]:
# # cv_results = pd.concat([cv_results, cross_validate(df_train, 'GradientBoost', MODEL_FEATURES)])
# '''...'''
# # create pipeline
# pipeline = Pipeline(stages = [
#       VectorAssembler(inputCols = MODEL_FEATURES, outputCol = 'features')
#     , GBTClassifier(featuresCol = 'features', labelCol = 'Won')
# ])
# 
# # define the hyperparmeters to test
# param_grid = ParamGridBuilder() \
#     .addGrid(pipeline.getStages()[1].maxBins, [2, 3]) \
#     .addGrid(pipeline.getStages()[1].maxDepth, [3, 5, 10]) \
#     .build()
# 
# # classification method
# classifier = BinaryClassificationEvaluator(
#       metricName = 'areaUnderROC'
#     , rawPredictionCol = 'rawPrediction'
#     , labelCol = 'Won'
# )
# 
# # do cross validation
# cv_model = CrossValidator(
#       estimator = pipeline
#     , estimatorParamMaps = param_grid
#     , evaluator = classifier
#     , numFolds = 5
# ).setParallelism(2).fit(df_train)
# 
# # return results as a dataframe
# cv_results = cv_results.union(spark.createDataFrame([(
#       'GradientBoost'
#     , cv_model.avgMetrics[0]
#     , json.dumps({
#           'maxBins': cv_model.bestModel.stages[1]._java_obj.getMaxBins()
#         , 'maxDepth': cv_model.bestModel.stages[1]._java_obj.getMaxDepth()
#       })
# )], schema_cv))
# cv_results

# Linear Support Vector Machine

In [None]:
# cv_results = pd.concat([cv_results, cross_validate(df_train, 'LinearSVC', MODEL_FEATURES)])
'''...'''
# create pipeline
pipeline = Pipeline(stages = [
      VectorAssembler(inputCols = MODEL_FEATURES, outputCol = 'features')
    , LinearSVC(featuresCol = 'features', labelCol = 'Won')
])

# define the hyperparmeters to test
param_grid = ParamGridBuilder() \
    .addGrid(pipeline.getStages()[1].aggregationDepth, [3, 5, 10]) \
    .addGrid(pipeline.getStages()[1].maxIter, [10, 20, 50]) \
    .build()

# classification method
classifier = BinaryClassificationEvaluator(
      metricName = 'areaUnderROC'
    , rawPredictionCol = 'rawPrediction'
    , labelCol = 'Won'
)

# do cross validation
cv_model = CrossValidator(
      estimator = pipeline
    , estimatorParamMaps = param_grid
    , evaluator = classifier
    , numFolds = 5
).setParallelism(2).fit(df_train)

# return results as a dataframe
cv_results = cv_results.union(spark.createDataFrame([(
      'LinearSVC'
    , cv_model.avgMetrics[0]
    , json.dumps({
          'maxIter': cv_model.bestModel.stages[1].getMaxIter()
        , 'regParam': cv_model.bestModel.stages[1].getRegParam()
      })
)], schema_cv))
cv_results

# Logist Regression

In [14]:
# cv_results = pd.concat([cv_results, cross_validate(df_train, 'LogisticRegression', MODEL_FEATURES)])
'''...'''
# create pipeline
pipeline = Pipeline(stages = [
      VectorAssembler(inputCols = MODEL_FEATURES, outputCol = 'features')
    , LogisticRegression(featuresCol = 'features', labelCol = 'Won')
])

# define the hyperparmeters to test
param_grid = ParamGridBuilder() \
    .addGrid(pipeline.getStages()[1].maxIter, [10, 20]) \
    .addGrid(pipeline.getStages()[1].regParam, [0.1, 0.5]) \
    .build()

# classification method
classifier = BinaryClassificationEvaluator(
      metricName = 'areaUnderROC'
    , rawPredictionCol = 'rawPrediction'
    , labelCol = 'Won'
)

# do cross validation
cv_model = CrossValidator(
      estimator = pipeline
    , estimatorParamMaps = param_grid
    , evaluator = classifier
    , numFolds = 5
).setParallelism(2).fit(df_train)

# return results as a dataframe
cv_results = cv_results.union(spark.createDataFrame([(
      'LogisticRegression'
    , cv_model.avgMetrics[0]
    , json.dumps({
          'maxIter': cv_model.bestModel.stages[1].getMaxIter()
        , 'regParam': cv_model.bestModel.stages[1].getRegParam()
      })
)], schema_cv))
cv_results

DataFrame[Method: string, ROC: float, HyperParameters: string]

# Random Forest

In [15]:
# cv_results = pd.concat([cv_results, cross_validate(df_train, 'RandomForest', MODEL_FEATURES)])
'''...'''
# create pipeline
pipeline = Pipeline(stages = [
      VectorAssembler(inputCols = MODEL_FEATURES, outputCol = 'features')
    , RandomForestClassifier(featuresCol = 'features', labelCol = 'Won')
])

# define the hyperparmeters to test
param_grid = ParamGridBuilder() \
    .addGrid(pipeline.getStages()[1].maxBins, [2, 3]) \
    .addGrid(pipeline.getStages()[1].maxDepth, [3, 5]) \
    .addGrid(pipeline.getStages()[1].numTrees, [100, 500]) \
    .build()

# classification method
classifier = BinaryClassificationEvaluator(
      metricName = 'areaUnderROC'
    , rawPredictionCol = 'rawPrediction'
    , labelCol = 'Won'
)

# do cross validation
cv_model = CrossValidator(
      estimator = pipeline
    , estimatorParamMaps = param_grid
    , evaluator = classifier
    , numFolds = 5
).setParallelism(2).fit(df_train)

# return results as a dataframe
cv_results = cv_results.union(spark.createDataFrame([(
      'RandomForest'
    , cv_model.avgMetrics[0]
    , json.dumps({
          'maxBins': cv_model.bestModel.stages[1]._java_obj.getMaxBins()
        , 'maxDepth': cv_model.bestModel.stages[1]._java_obj.getMaxDepth()
        , 'numTrees': cv_model.bestModel.stages[1]._java_obj.getNumTrees()
      })
)], schema_cv))
cv_results

DataFrame[Method: string, ROC: float, HyperParameters: string]

In [16]:
path_write = f'{path_main}/models_cv'
    
cv_results.write.csv(f'{path_write}/cv_results_{len(os.listdir(path_write))}.csv')