In [1]:
import pandas as pd
import numpy as np
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import (IntegerType,
 StringType,DecimalType,StructType,StructField,
 ArrayType,DoubleType,FloatType)
import pyspark.sql.functions as func
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml import Pipeline
import pyspark.ml.evaluation as evals
from pyspark.ml.evaluation import Evaluator
import pyspark.ml.tuning as tune
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

sc = SparkContext('local[*]')
spark = SparkSession(sc)

In [2]:
class BinaryLogLossEvaluator(Evaluator):
    def __init__(self,predictionCol='probability',labelCol='label'):
        self.predictionCol = predictionCol
        self.labelCol = labelCol

    def _get_probabilities(self,row):
        return row[1].item()
        
    def _evaluate(self,dataset):
        _udf_get_probabilities = func.udf(self._get_probabilities,FloatType())
        dataset = dataset.withColumn('prob_1',_udf_get_probabilities(dataset.select(self.predictionCol)[0]))
        
        self.log_loss = (-1) * dataset.select(func.sum(dataset.select(self.labelCol)[0] 
                                                           * func.log(dataset.prob_1)
                                                       + (1 - dataset.select(self.labelCol)[0])
                                                           * func.log(1-dataset.prob_1)
                                                      )
                                             ).collect()[0][0]
        
        return self.log_loss
    
    
    def isLargerBetter(self):
        return False

In [36]:
data_path = '../data/raw/shots_2007-2018.csv'

sample = pd.read_csv(data_path,nrows=10)
column_names = sample.columns

schema = StructType([StructField(x,StringType(),True) for x in column_names])

shots = spark.read.csv(data_path,schema=schema,
                       enforceSchema=True,header=True,ignoreLeadingWhiteSpace=True,
                       ignoreTrailingWhiteSpace=True)

for ix,x in enumerate(column_names):
    print(ix,x)

0 shotID
1 homeTeamCode
2 awayTeamCode
3 season
4 isPlayoffGame
5 game_id
6 homeTeamWon
7 id
8 time
9 timeUntilNextEvent
10 timeSinceLastEvent
11 period
12 team
13 location
14 event
15 goal
16 shotPlayContinuedOutsideZone
17 shotPlayContinuedInZone
18 shotGoalieFroze
19 shotPlayStopped
20 shotGeneratedRebound
21 homeTeamGoals
22 awayTeamGoals
23 xCord
24 yCord
25 xCordAdjusted
26 yCordAdjusted
27 shotAngle
28 shotAngleAdjusted
29 shotAnglePlusRebound
30 shotAngleReboundRoyalRoad
31 shotDistance
32 shotType
33 shotOnEmptyNet
34 shotRebound
35 shotAnglePlusReboundSpeed
36 shotRush
37 speedFromLastEvent
38 lastEventxCord
39 lastEventyCord
40 distanceFromLastEvent
41 lastEventShotAngle
42 lastEventShotDistance
43 lastEventCategory
44 lastEventTeam
45 homeEmptyNet
46 awayEmptyNet
47 homeSkatersOnIce
48 awaySkatersOnIce
49 awayPenalty1TimeLeft
50 awayPenalty1Length
51 homePenalty1TimeLeft
52 homePenalty1Length
53 playerPositionThatDidEvent
54 playerNumThatDidEvent
55 playerNumThatDidLastEven

In [42]:
shots = shots.withColumn('skaterDifference',shots.homeSkatersOnIce - shots.awaySkatersOnIce)
features_list = ['arenaAdjustedShotDistance','shotAngleAdjusted','shotRush','skaterDifference',
                 'shotOnEmptyNet','shotRebound','offWing','shooterTimeOnIce',
                 'averageRestDifference','timeDifferenceSinceChange','speedFromLastEvent',
                 'playerPositionEncoder']

assert False
# make sure to implement data checks to make sure you're doing regression on "proper" data
# and won't get very wrong results

for col in features_list[:-1]:
    shots = shots.withColumn(col,shots[str(col)].cast(DecimalType()))

shots = shots.withColumn('goal',shots.goal.cast(IntegerType()))

AssertionError: 

In [64]:
player_positions = shots.select('playerPositionThatDidEvent').distinct().collect()[:]

player_indexer = StringIndexer(inputCol='playerPositionThatDidEvent',outputCol='playerPositionIndexer',handleInvalid='skip')
player_encoder = OneHotEncoder(inputCol='playerPositionIndexer',outputCol='playerPositionEncoder')

In [71]:
[x[0] for x in player_positions]

[None, 'L', 'D', 'C', 'R', 'G']

In [32]:

vector_assembler = VectorAssembler(inputCols=features_list,outputCol='features')
pipeline = Pipeline(stages=[player_indexer,player_encoder,vector_assembler])

piped_data = pipeline.fit(shots).transform(shots)
training, test = piped_data.randomSplit([0.7,0.3])
lr = LogisticRegression(labelCol='goal')

best_lr = lr.fit(training)
test_results = best_lr.transform(test)
# best_lr.write().overwrite().save('models/result_lr')

# best_lr = LogisticRegressionModel.load('../models/result_lr')

In [None]:
best_lr.write().overwrite().save('models/result_lr')

In [8]:
# lr = LogisticRegression(labelCol='goal')
# evaluator = BinaryLogLossEvaluator(labelCol='goal')
# grid = (ParamGridBuilder()
#            .addGrid(lr.regParam,[0,0.001,0.01,0.1])
#            .addGrid(lr.elasticNetParam,[0,0.5,1])
#            .build())
# cv = CrossValidator(estimator=lr,
#                    estimatorParamMaps=grid,
#                     evaluator=evaluator)

# models = cv.fit(training)
# cv_best_lr = models.bestModel
# cv_best_lr.write().overwrite().save('models/cv_best_lr')

cv_best_lr = LogisticRegressionModel.load('../models/cv_best_lr')

In [9]:
cv_best_lr.extractParamMap()

{Param(parent='LogisticRegression_4fa4ec7f1bb1', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_4fa4ec7f1bb1', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_4fa4ec7f1bb1', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_4fa4ec7f1bb1', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_4fa4ec7f1bb1', name='maxIter', doc='maximum number of iterations (>= 0)'): 100,
 Param(parent='LogisticRegression_4fa4ec7f1bb1', name='predictionCol', doc='prediction column name'): 'prediction',
 Param(parent='LogisticRegression_4fa4ec7f1bb1', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability esti

In [10]:
print(features_list)
print(best_lr.coefficients)
print(cv_best_lr.coefficients)

['arenaAdjustedShotDistance', 'shotAngleAdjusted', 'shotRush', 'skaterDifference', 'shotOnEmptyNet', 'shotRebound', 'offWing', 'shooterTimeOnIce', 'averageRestDifference', 'timeDifferenceSinceChange', 'speedFromLastEvent', 'playerPositionEncoder']
[-0.043571159252612054,-0.01517419364720867,0.23853285279188827,0.00760283923304499,4.232534437738093,0.8447489468464554,0.04985917942448185,0.0018669528647201152,0.0011726283756016117,0.002595781833321914,0.010084269863482676,4.5217493084009694,4.558117642433642,4.530295873782485,4.517759732078638]
[-0.043531216292313865,-0.015189184260977073,0.19020529731797944,0.007439194617935236,4.237498106492696,0.8613774410298074,0.051011760328185156,0.0018805678149342267,0.001872806470126273,0.0018397470146465855,0.01048536243130829,3.718653023623734,3.756539395801524,3.7325618709825643,3.7079601190479052]


In [22]:
pd.DataFrame.from_dict({'no cv':best_lr.coefficients.toArray(),'with cv':cv_best_lr.coefficients.toArray()},columns=features_list,orient='index')

ValueError: 12 columns passed, passed data had 15 columns

In [58]:
player_indexer.isSet('stringOrderType')

False

In [23]:
best_lr.coefficients.toArray()

array([-4.35711593e-02, -1.51741936e-02,  2.38532853e-01,  7.60283923e-03,
        4.23253444e+00,  8.44748947e-01,  4.98591794e-02,  1.86695286e-03,
        1.17262838e-03,  2.59578183e-03,  1.00842699e-02,  4.52174931e+00,
        4.55811764e+00,  4.53029587e+00,  4.51775973e+00])

In [33]:
best_lr.coefficients.toArray()

array([-4.37052959e-02, -1.51133582e-02,  1.92735591e-01,  1.34854879e-02,
        4.25616824e+00,  8.53215617e-01,  5.02196473e-02,  1.40036715e-03,
        1.57791200e-03,  2.34539510e-03,  9.86072491e-03, -4.16310305e-02,
       -1.41039507e-02, -3.60775491e-02, -5.29493568e-02])