In [None]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

In [None]:
from matplotlib import pyplot as plt
%matplotlib inline

import pandas as pd
pd.set_option('display.max_columns', None)  
pd.set_option('display.max_rows', None)  
pd.set_option('display.expand_frame_repr', False)
pd.set_option('max_colwidth', -1)

def pp(df, count=5):
    print(f'Total count: {df.count():,}')
    return df.limit(count).toPandas().head(count)

import pyspark.sql.functions as f
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import numpy as np
import os
from datetime import datetime

spark.sparkContext.setCheckpointDir('/tmp')

## Preprocess Raw Data

In [None]:
# INPUT_FILE = '../data/shots_2007-2018.csv'
INPUT_FILE = '../data/shots_2007-2018_with_stats.csv'
# INPUT_FILE = '../data/raw.csv'

### Clean Data

In [None]:
df = spark.read.csv(INPUT_FILE, header=True, inferSchema=True)

df = df.drop_duplicates(['season', 'game_id', 'time', 'shooterPlayerId', 'xCordAdjusted', 'yCordAdjusted'])
df = df.where('shotOnEmptyNet = 0')  # Remove empty net shots
df = df.where('shotDistance < 70')  # Remove shots from far away

# Reduce the lastEventCategory field
last_event_remove = ['GOAL', 'PENL', 'ANTHEM', 'STOP', 'PSTR', 'CHL', 'PEND', 'GEND', 'EISTR']
last_event_cond = f.col('lastEventCategory').isin(last_event_remove) | f.col('lastEventCategory').isNull()
col = f.when(last_event_cond, 'NONE').otherwise(f.col('lastEventCategory'))
df = df.withColumn('lastEventCategory', col)

# Reduce the *SkatersOnIce fields
field = 'homeSkatersOnIce'
col = f.when(f.col(field).isin([7, 8, 9, 10]), f.lit(6)) \
    .when(f.col(field).isin([1, 2]), f.lit(3)) \
    .otherwise(f.col(field))
df = df.withColumn(field, col)

field = 'awaySkatersOnIce'
col = f.when(f.col(field).isin([7, 8, 9, 10]), f.lit(6)) \
    .when(f.col(field).isin([1, 2]), f.lit(3)) \
    .otherwise(f.col(field))
df = df.withColumn(field, col)

# Reduce playerPositionThatDidEvent field
col = f.when(f.col('playerPositionThatDidEvent').isNull(), f.lit('C')) \
    .when(f.col('playerPositionThatDidEvent').isin(['G']), f.lit('C')) \
    .when(f.col('playerPositionThatDidEvent').isin(['L', 'R']), f.lit('W')) \
    .otherwise(f.col('playerPositionThatDidEvent'))
df = df.withColumn('playerPositionThatDidEvent', col)

# Reduce shotType field
col = f.when(f.col('shotType').isNull(), f.lit('WRIST')) \
    .when(f.col('shotType').isin(['WRAP', 'DEFL', 'TIP']), f.lit('QUICK')) \
    .otherwise(f.col('shotType'))
df = df.withColumn('shotType', col)

# Reduce location field
col = f.when(f.col('location').isNull(), f.lit('Neu. Zone')) \
    .otherwise(f.col('location'))
df = df.withColumn('location', col)

# goal_count = df.where('goal = 1').count()
# all_count = df.count()
# print(f'Goal to shot ratio: {goal_count:,} / {all_count:,} = {(goal_count / all_count):.2%}')

### Add Features

In [None]:
# Generate a unique identifier
df = df.withColumn('uid', f.concat(f.col('season'), f.col('game_id')))

# Add score state columns
shooting_team_goals = f.when(f.col('team') == 'AWAY', 
                             f.col('awayTeamGoals')).otherwise(f.col('homeTeamGoals'))
df = df.withColumn('ShootingTeamGoals', shooting_team_goals)

defending_team_goals = f.when(f.col('team') == 'AWAY', 
                              f.col('homeTeamGoals')).otherwise(f.col('awayTeamGoals'))
df = df.withColumn('DefendingTeamGoals', defending_team_goals)

# Score difference
df = df.withColumn('ScoreDiff', f.col('ShootingTeamGoals') - f.col('DefendingTeamGoals'))

# Score situation
score_situation = f.when(f.col('ScoreDiff') == 0, f.lit('even')) \
                  .when(f.col('ScoreDiff') == 1, f.lit('up1')) \
                  .when(f.col('ScoreDiff') == 2, f.lit('up2')) \
                  .when(f.col('ScoreDiff') > 2, f.lit('up3ormore')) \
                  .when(f.col('ScoreDiff') == -1, f.lit('down1')) \
                  .when(f.col('ScoreDiff') == -2, f.lit('down2')) \
                  .when(f.col('ScoreDiff') < -2, f.lit('down3ormore'))
df = df.withColumn('ScoreSituation', score_situation)

# Add team strength columns
shooting_team_skaters = f.when(f.col('team') == 'AWAY', 
                               f.col('awaySkatersOnIce')).otherwise(f.col('homeSkatersOnIce'))
df = df.withColumn('ShootingTeamSkaters', shooting_team_skaters)

defending_team_skaters = f.when(f.col('team') == 'AWAY', 
                                f.col('homeSkatersOnIce')).otherwise(f.col('awaySkatersOnIce'))
df = df.withColumn('DefendingTeamSkaters', defending_team_skaters)

# Calculate number of offensive team skaters versus defending team skaters and create a player situation label
df = df.withColumn('SkaterDiff', f.col('ShootingTeamSkaters') - f.col('DefendingTeamSkaters'))
df = df.withColumn('situation', f.concat(f.col('ShootingTeamSkaters'), 
                                         f.lit('on'), 
                                         f.col('DefendingTeamSkaters')))

df = df.withColumn('coarseSituation', f.when(f.col('SkaterDiff') == 0, f.lit('even')) \
                   .when(f.col('SkaterDiff') > 0, f.lit('power')).otherwise('under'))

# Shot distance columns
df = df.withColumn('Back_dist', f.when(f.col('shotType') == 'BACK', f.col('shotDistance')).otherwise(f.lit(0.)))
df = df.withColumn('Quick_dist', f.when(f.col('shotType') == 'QUICK', f.col('shotDistance')).otherwise(f.lit(0.)))
df = df.withColumn('Slap_dist', f.when(f.col('shotType') == 'SLAP', f.col('shotDistance')).otherwise(f.lit(0.)))
df = df.withColumn('Snap_dist', f.when(f.col('shotType') == 'SNAP', f.col('shotDistance')).otherwise(f.lit(0.)))
df = df.withColumn('Wrist_dist', f.when(f.col('shotType') == 'WRIST', f.col('shotDistance')).otherwise(f.lit(0.)))

# Simple calculated fields
df = df.withColumn('Dist_angle', f.col('shotAngleAdjusted') * f.col('shotDistance'))
df = df.withColumn('Rebound_dist', f.col('shotRebound') * f.col('shotDistance'))
df = df.withColumn('Square_dist', f.pow(f.col('shotDistance'), f.lit(2)))
df = df.withColumn('Square_angle', f.pow(f.col('shotAngleAdjusted'), f.lit(2)))
df = df.withColumn('Square_all', f.col('Square_angle') * f.col('Square_dist'))
df = df.withColumn('OverallRestDiff', 
                   f.col('shootingTeamAverageTimeOnIce') - f.col('defendingTeamAverageTimeOnIce'))

# Rate values
rateify = ['Skater_G', 'Skater_Ast', 'Skater_Pts', 'Skater_Plus_Minus', 'Skater_Pen_In_Min', 'Skater_PShares',
           'Skater_EV_G', 'Special_Teams_PP_G', 'Special_Teams_SH_G', 'Skater_GWinners', 'Assists_EV', 
           'Assists_PP', 'Assists_SH', 'Skater_Shots', 'BLK', 'HIT', 'FOW', 'FOL']
for r in rateify:
    df = df.withColumn(f'{r}_rate', f.when(f.col('Skater_GP') != 0, 
                                           f.col(r) / f.col('Skater_GP')).otherwise(f.lit(0)))

rateify_adv = ['Corsi_EV_CF', 'Corsi_EV_CA', 'Fenwick_EV_FF', 'Fenwick_EV_FA', 'SAtt']
for r in rateify_adv:
    df = df.withColumn(f'{r}_rate', f.when(f.col('Adv_GP') != 0, 
                                           f.col(r) / f.col('Adv_GP')).otherwise(f.lit(0)))

### Calculate Advanced Features
 - *shTalent*: Ratio of goals over expected goals for shots up to that point
 - *shotsWithinX*: Number of shots within X seconds of current shot at different thresholds

In [None]:
# Shooter stat
GOAL_THRESHOLD = 5
DEFAULT_SCORE = 1.0

SORT_COLUMNS = ['season', 'game_id', 'time']

shooter_window = Window.partitionBy('shooterPlayerId').orderBy(*SORT_COLUMNS) \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
df = df.withColumn('goal_sum', f.sum('goal').over(shooter_window))
df = df.withColumn('xGoal_sum', f.sum('xGoal').over(shooter_window))

shTalent = f.when(f.col('goal_sum').isNull() | (f.col('goal_sum') < GOAL_THRESHOLD), 
                  f.lit(DEFAULT_SCORE)).otherwise(f.col('goal_sum') / f.col('xGoal_sum'))
df = df.withColumn('shTalent', shTalent)
df = df.drop('goal_sum', 'xGoal_sum')

# Shots within stat
SHOTS_WITHIN_TIMES = [5, 10, 15, 20, 30, 40, 60]

for tm in SHOTS_WITHIN_TIMES:
    shots_window = Window.partitionBy('season', 'game_id').orderBy('time').rangeBetween(-tm, 0)
    df = df.withColumn(f'shotsWithin{tm}', f.count('time').over(shots_window) - 1)

### Zone

In [None]:
def rslot(x1,y1,x2,y2,x,y,top,bot):
    slope = (y2-y1)/(x2-x1)
    addedY = slope*(x-x1)
    dividingY = y1 + addedY
    if float(y) >= float(dividingY):
        return(top)
    else:
        return(bot)

def hockeyZones(row):
    x = row['xCordAdjusted']
    y = row['yCordAdjusted']
    if x >= 50:
        if y >= 51.5:
            return"R-Point"
        elif y >= 33.5:
            return "C-Point"
        else:
            return "L-Point"
    elif x >= 31:
        if y >= 68.5:
            return "R-1"
        elif y >= 51.5:
            return "R-2"
        elif y >= 33.5:
            return "HighSlot"
        elif y >= 15.5:
            return "L-2"
        else:
            return "L-1"
    elif x >= 11:
        if y >= 68.5:
            return "R-Low"
        elif y >= 51.5:
            return rslot(11,51.5,31,68.5,x,y,"R-Low","R-Slot")
        elif y >= 33.5:
            if x >= 21:
                return "Slot"
            else:
                return "Lowslot"
        elif y >= 15.5:
            return rslot(11,33.5,31,15.5,x,y,"L-Slot","L-Low")
        else:
            return("L-Low")
    else:
        if( y >= 56.8):
            return("R-Back")
        elif y >= 51.5:
            return rslot(0,56.8,11,51.5,x,y,"R-Back","Downlow")
        elif  y >= 33.5:
            return "Downlow"
        elif y >= 28.5:
            return rslot(0,28.5,11,33.5,x,y,"Downlow","L-Back")
        else:
            return "L-Back"
        
df_pd = df.toPandas()
df_pd['Zone'] = df_pd.apply(hockeyZones, axis=1)

### Arrange columns and write data

In [None]:
# first_columns = ['goal', 'goalieIdForShot', 'goalieNameForShot', 'shooterPlayerId', 'shooterName', 'uid', 'xGoal']
# cols = list(filter(lambda x: x not in first_columns, df.columns))
# df = df.select(*(first_columns + cols))

# df.coalesce(1).write.mode('overwrite').csv(f'{os.path.splitext(INPUT_FILE)[0]}_processed', header=True)

df_pd.to_csv(f'{os.path.splitext(INPUT_FILE)[0]}_processed.csv', index=False)

---

## Postprocessing

In [None]:
NUM_SIMULATIONS = 200

INPUT_FILE = 'shots_2007-2018_with_stats_processed.csv'

INPUT_PATH = f'../data/{INPUT_FILE}'
PRED_PATH = f'../training-runs/xgboost/pred_{INPUT_FILE}'

COLUMNS = [
    'goalieIdForShot',
    'season',
    'game_id',
    'goal',
]

print(datetime.now())

In [None]:
data_df = pd.read_csv(INPUT_PATH)
data_df = data_df[COLUMNS]

pred_df = pd.read_csv(PRED_PATH)

data_df['prob_goal'] = pred_df['prob_goal']

data_df = spark.createDataFrame(data_df)

In [None]:
column_names = []
for i in range(NUM_SIMULATIONS):
    data_df = data_df.withColumn(f'g{i}', f.when(f.rand() < f.col('prob_goal'), f.lit(1)).otherwise(f.lit(0)))
    column_names.append(f'g{i}')
    
# Aggregrate by game for each goalie
sims = [f.sum(cn).alias(cn) for cn in column_names]
data_df = data_df.groupBy('goalieIdForShot', 'season', 'game_id').agg(f.sum('goal').alias('goal'), *sims)

# Build the cumlative sums over games of each simulation
goalie_window = Window.partitionBy('goalieIdForShot').orderBy('season', 'game_id') \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
data_df = data_df.withColumn('goal', f.sum('goal').over(goalie_window))
for i in range(NUM_SIMULATIONS):
    data_df = data_df.withColumn(column_names[i], f.sum(column_names[i]).over(goalie_window))

# Get game number
goalie_game_window = Window.partitionBy('goalieIdForShot').orderBy('season', 'game_id')
data_df = data_df.withColumn('game', f.rank().over(goalie_game_window))

data_df = data_df.withColumn('sim', f.array(*column_names)).drop(*column_names)

# pp(data_df)

In [None]:
@f.udf(returnType=MapType(IntegerType(), FloatType()))
def pmf(arr):
    sz = len(arr)
    unique, counts = np.unique(arr, return_counts=True)
    cnt = dict(zip(unique.tolist(), counts.tolist()))
    prb = {k: v / sz for (k, v) in cnt.items()}
    return prb

# Build the probability mass function
data_df = data_df.withColumn('pmf', pmf('sim'))

# Explode the pmf as a row per sim goal value
data_df = data_df.select(f.col('goalieIdForShot').alias('goalie_id'), 
                         f.col('game'),
                         f.col('goal').alias('actual_goals'),
                         f.explode('pmf'))
data_df = data_df.withColumnRenamed('key', 'sim_goals')

# Accumulate the pmf to get the cdf and adjust column names
goalie_game_window = Window.partitionBy('goalie_id', 'game').orderBy('sim_goals') \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
data_df = data_df.withColumn('cdf_score', f.sum('value').over(goalie_game_window)).drop('value')

data_df = data_df.orderBy('goalie_id', 'game', 'sim_goals')

data_df.write.mode('overwrite').csv('sim_cdf_dbf.csv', header=True)
# pp(data_df.orderBy('goalie_id', 'game', 'sim_goals'), 100)

In [None]:
print(datetime.now())

---

### Testing

In [None]:
df = spark.read.csv('sim_cdf_dbf.csv', header=True, inferSchema=True)
df = df.orderBy('goalie_id', 'game', 'sim_goals')
pp(df, 100)