In [1]:
%run ../config/initialize.ipynb
from pyspark.sql.window import Window

## Global functions

In [2]:
def get_outcomes_df():
    '''form a Spark DF, keyed on (game_id, team_id), 
    and including season, date, winner and
    whether each of the 3 spreads were covered/pushed
    '''
    games = spark.table('game').select('game_id','season',col('date').cast(TimestampType()),
                                       'v_team_id','h_team_id')
    scores = spark.table('game_outcome').select('game_id','h_final','v_final')

    spread_cols = ['open_fav','open_spread',
                   'close_fav','close_spread',
                   'pfr_fav','pfr_spread']

    lines = spark.table('game_line').select(
        *(['game_id'] + spread_cols)
    )

    ## winner of game
    teams_score = games.join(
            scores, on='game_id'
        ).join(
            lines, on='game_id'
        ).withColumn(
            'winner', 
            F.when(col('h_final') > col('v_final'), col('h_team_id'))
             .when(col('h_final') < col('v_final'), col('v_team_id'))
             .otherwise('')
        )

    ## whether home team covered spread
    ## since spread is the same for both teams
    ## inverse is true for visiting team
    for spread in ['open','close','pfr']:
        teams_score = teams_score.withColumn(
            'h_margin', col('h_final') - col('v_final')
        ).withColumn(
            'h_line', 
            ## minimum margin
            col('{}_spread'.format(spread))
            * F.when(col('{}_fav'.format(spread)) == 'H', 1).otherwise(-1)
        ).withColumn(
            '{}_h_cover'.format(spread),
            F.when(
                ## push
                col('h_margin') == col('h_line'), 0.5)
             .when(
                col('h_margin') > col('h_line'), 1.)
             .otherwise(0.)
        ).drop('h_margin','h_line')

    teams_score = teams_score.drop(*(['h_final','v_final'] + spread_cols))

    team_game = teams_score.withColumn(
            'tmp', F.array('h_team_id','v_team_id')
        ).select(
            '*', 
            F.explode('tmp').alias('team_id')
        ).drop('tmp').cache()

    for spread in ['open','close','pfr']:
        team_game = team_game.withColumn(
                '{}_cover'.format(spread),
                F.when(col('h_team_id') == col('team_id'), 
                       col('{}_h_cover'.format(spread)))
                 .otherwise(F.lit(1.) - col('{}_h_cover'.format(spread)))          
            ).drop('{}_h_cover'.format(spread))

    return team_game.withColumn(
            'won_game', 
            # win
            F.when(col('team_id') == col('winner'), 1.)
            # tie
             .when(col('winner') == '', 0.5)
            # loss
             .otherwise(0.)
        ).drop(
            'v_team_id','h_team_id','winner'
        ).cache()
    
def get_window(window_type, n, partition_cols, date_orderby_col):
    '''accepts window type (days, games, seasons),
     size of window n,
     and window fields (partition and sort), 
     and returns a spark Window'''
    assert type(partition_cols) is list
    assert type(date_orderby_col) is str
    assert type(n) is int
    assert window_type in ['games','days','seasons']
    
    from pyspark.sql.window import Window
    if window_type == 'games':
        ## fixed number of games
        return Window.partitionBy(*partition_cols)\
                  .orderBy(col(date_orderby_col).desc())\
                  .rowsBetween(1, n)

    elif window_type == 'days':
        ## fixed number of days
        seconds = 24*60*60*n
        return Window.partitionBy(*partition_cols)\
                  .orderBy(col(date_orderby_col).cast('timestamp').cast('long'))\
                  .rangeBetween(-seconds, -1)

    elif window_type == 'seasons':
        ## fixed number of seasons, including current
        ## n = 1 is YTD
        seconds = 24*60*60*(180 + 365*(n-1))
        return Window.partitionBy(*partition_cols)\
                  .orderBy(col(date_orderby_col).cast('timestamp').cast('long'))\
                  .rangeBetween(-seconds, -1)   

## Home/Away Features
* key: (game_id)
* table name: features.home_field
* __h_is_home__
* __v_is_home__
* __h_consecutive_home__
* __v_consecutive_visitor__
* __v_visitor____ovr_wr_ytd__
* __v_visitor____open_wr_ytd__
* __v_visitor____close_wr_ytd__
* __v_visitor____pfr_wr_ytd__
* __h_home____ovr_wr_ytd__
* __h_home____open_wr_ytd__
* __h_home____close_wr_ytd__
* __h_home____pfr_wr_ytd__
* __v_visitor____ovr_wr_last_4_games__
* __v_visitor____ovr_wr_last_8_games__
* __v_visitor____open_wr_last_4_games__
* __v_visitor____open_wr_last_8_games__
* __v_visitor____close_wr_last_4_games__
* __v_visitor____close_wr_last_8_games__
* __v_visitor____pfr_wr_last_4_games__
* __v_visitor____pfr_wr_last_8_games__
* __h_home____ovr_wr_last_4_games__
* __h_home____ovr_wr_last_8_games__
* __h_home____open_wr_last_4_games__
* __h_home____open_wr_last_8_games__
* __h_home____close_wr_last_4_games__
* __h_home____close_wr_last_8_games__
* __h_home____pfr_wr_last_4_games__
* __h_home____pfr_wr_last_8_games__

In [3]:
key = 'game_id'
out_tbl = 'features.home_field'
features_list = []

In [4]:
game = spark.table('game').select('game_id','h_team_id','v_team_id',
                                  'is_neutral','season','date')

### remove neutral games and add back at the end

In [5]:
neutral = game.filter(col('is_neutral') == 1).select('game_id')
game = game.filter(col('is_neutral') == 0).drop('is_neutral')

### binary H/V

In [6]:
hv_feats = game.withColumn(
        'h_is_home', F.lit(1.)
    ).withColumn(
        'v_is_home', F.lit(0)
    )

stage1_features = ['h_is_home','v_is_home']

### Consecutive home/away games

#### explode into individual team rows

In [7]:
stage2_features = []

In [8]:
hv_team = hv_feats.withColumn(
        'tmp', F.array('h_team_id','v_team_id')
    ).select(
        '*', 
        F.explode('tmp').alias('team_id')
    ).select(
        'game_id','team_id','season','date',
        F.when(col('h_team_id') == col('team_id'), 1)
         .otherwise(0)
         .alias('is_home')
    )

#### Consecutive home

In [9]:
w = Window.partitionBy(*['team_id','season'])\
        .orderBy(col('date').desc())\
        .rowsBetween(0, 99)

In [10]:
def count_consecutive(arr, val):
    '''given a list or Spark Array, count
    the number of consecutive values matching
    "val"'''
    i = 0
    while arr:
        if arr[0] == val:
            i += 1
            arr = arr[1:]
        else:
            break
    return i

count_consecutive_udf = udf(count_consecutive, IntegerType())

hv_team = hv_team.withColumn(
        'h_consecutive_home', 
        count_consecutive_udf(
            F.collect_list('is_home').over(w),
            F.lit(1)
        )
    ).withColumn(
        'v_consecutive_visitor', 
        count_consecutive_udf(
            F.collect_list('is_home').over(w),
            F.lit(0)
        )
    )

stage2_features += ['h_consecutive_home','v_consecutive_visitor']

#### test that it's working properly

In [11]:
checks = {
    ('nwe', '2008-09-07', 'h_consecutive_home'): 1,
    ('nwe', '2018-01-21', 'h_consecutive_home'): 4,
    ('nwe', '2017-12-17', 'v_consecutive_visitor'): 3,
    ('nwe', '2017-09-24', 'h_consecutive_home'): 1
}

for (team_id, date, feat), val in checks.iteritems():
    assert hv_team.filter(
         (col('team_id') == team_id) & (col('date') == date)
        ).select(feat).toPandas().iloc[0, 0] == val

### home/away record
### home/away record ATS

In [12]:
team_game = get_outcomes_df().drop('date','season')

hv_outcome = hv_team.join(team_game, on=['game_id','team_id'])

assert hv_outcome.count() == hv_team.count()

hv_outcome.limit(5).toPandas()

Unnamed: 0,game_id,team_id,season,date,is_home,h_consecutive_home,v_consecutive_visitor,open_cover,close_cover,pfr_cover,won_game
0,201511150ram,ram,2015,2015-11-15,1,1,0,0.0,0.0,0.0,0.0
1,201511150ram,chi,2015,2015-11-15,0,0,2,1.0,1.0,1.0,1.0
2,201511090sdg,sdg,2015,2015-11-09,1,1,0,0.0,0.0,0.0,0.0
3,201511090sdg,chi,2015,2015-11-09,0,0,1,1.0,1.0,1.0,1.0
4,201510180det,det,2015,2015-10-18,1,2,0,0.5,0.0,0.5,1.0


#### YTD winrate home/visitor

In [13]:
colname_map = {
    'won_game': 'ovr_wr',
    'open_cover': 'open_wr',
    'close_cover': 'close_wr',
    'pfr_cover': 'pfr_wr'
}
hv_map = {1: 'home', 0: 'visitor'}
line_cols = ['won_game','open_cover','close_cover','pfr_cover']

n = 99
w = get_window('games', n, ['season','team_id'], 'date')

for h_v in [0, 1]:
    for colname in line_cols:
        feat_name = '{}_{}__{}_ytd'\
                        .format(hv_map[h_v][0], hv_map[h_v], 
                                colname_map[colname])

        hv_outcome = hv_outcome.withColumn(
                feat_name,
                F.sum(colname).over(w)
                / F.count(F.lit(1)).over(w)
            ).fillna(0.5, feat_name)
        
        stage2_features.append(feat_name)

#### Combine these features into 1 row per game

In [14]:
h_hv_outcome = hv_outcome.select(
        *([key, 'is_home'] 
          + filter(lambda x: x.startswith('h_'), stage2_features))
    ).filter(
        col('is_home') == 1
    ).drop('is_home')

v_hv_outcome = hv_outcome.select(
        *([key, 'is_home'] 
          + filter(lambda x: x.startswith('v_'), stage2_features))
    ).filter(
        col('is_home') == 0
    ).drop('is_home')

ytd_and_streak_features = h_hv_outcome.join(
    v_hv_outcome, on='game_id'
)

assert v_hv_outcome.count() == h_hv_outcome.count()
assert ytd_and_streak_features.count() == h_hv_outcome.count()

In [15]:
ytd_and_streak_features.limit(5).toPandas()

Unnamed: 0,game_id,h_consecutive_home,h_home__ovr_wr_ytd,h_home__open_wr_ytd,h_home__close_wr_ytd,h_home__pfr_wr_ytd,v_consecutive_visitor,v_visitor__ovr_wr_ytd,v_visitor__open_wr_ytd,v_visitor__close_wr_ytd,v_visitor__pfr_wr_ytd
0,200709160oti,1,1.0,1.0,1.0,1.0,1,1.0,1.0,1.0,1.0
1,200712020ram,2,0.181818,0.227273,0.272727,0.272727,1,0.272727,0.545455,0.5,0.545455
2,200810190tam,2,0.666667,0.75,0.833333,0.833333,1,0.2,0.2,0.2,0.2
3,200812140dal,1,0.615385,0.461538,0.461538,0.461538,1,0.846154,0.692308,0.769231,0.769231
4,200910250car,1,0.4,0.2,0.2,0.2,2,0.333333,0.5,0.5,0.5


#### last {4, 8} games winrate home/visitor

In [16]:
stage3_features = []

In [17]:
feat_dict = {}
for h_v in [0, 1]:
    feat_dict[h_v] = hv_outcome.filter(
                        col('is_home') == h_v
                    ).select(*(
                        ['game_id','team_id','date','season'] + line_cols
                    ))
    for colname in line_cols:
        for n in [4, 8]:
            feat_name = '{}_{}__{}_last_{}_games'\
                            .format(hv_map[h_v][0], hv_map[h_v], 
                                    colname_map[colname], n)

            w = get_window('games', n, ['season','team_id'], 'date')

            feat_dict[h_v] = feat_dict[h_v].withColumn(
                    feat_name,
                    F.sum(colname).over(w)
                    / F.count(F.lit(1)).over(w)
                )
            
            stage3_features.append(feat_name)
            
    feat_dict[h_v] = feat_dict[h_v]\
                        .drop(*(line_cols + ['date','season']))\
                        .withColumnRenamed(
                            'team_id',
                            '{}_team_id'.format(hv_map[h_v][0])
                        )
    
hv_n_games_feats = feat_dict[0].join(feat_dict[1], on=['game_id']).fillna(0.5)

### Join 3 features tables

In [18]:
sdf_stage1 = hv_feats.select(
    *([key] + stage1_features)
)

sdf_stage2 = ytd_and_streak_features.select(
    *([key] + stage2_features)
)

sdf_stage3 = hv_n_games_feats.select(
    *([key] + stage3_features)
)

features_list = stage1_features + stage2_features + stage3_features

assert sdf_stage1.count() == sdf_stage2.count()
assert sdf_stage1.count() == sdf_stage3.count()

homeaway_feats = sdf_stage1\
                    .join(sdf_stage2, on='game_id')\
                    .join(sdf_stage3, on='game_id')

assert homeaway_feats.count() == sdf_stage1.count()

### Left todo
* join 3 together
 * original with (game_id, h_team_id, v_team_id): __hv_feats__
 * YTD winrates (game_id, team_id): __hv_outcome__
 * last N games winrates (game_id, h_team_id, v_team_id): __hv_n_games_feats__
* better maintain features list
* join in neutral
* write table

### add neutral games back in, and impute

#### mapping of fillna values
* default is 0.5

In [19]:
fillna_dict = {
    'h_consecutive_home': 0,
    'v_consecutive_visitor': 0
}

for k,v in fillna_dict.iteritems():
    homeaway_feats = homeaway_feats.fillna(v, k)
homeaway_feats = homeaway_feats.fillna(0.5, features_list)

In [20]:
for f in features_list:
    neutral = neutral.withColumn(f, F.lit(fillna_dict.get(f, 0.5)))

In [21]:
neutral.limit(5).toPandas()

Unnamed: 0,game_id,h_is_home,v_is_home,h_consecutive_home,v_consecutive_visitor,v_visitor__ovr_wr_ytd,v_visitor__open_wr_ytd,v_visitor__close_wr_ytd,v_visitor__pfr_wr_ytd,h_home__ovr_wr_ytd,...,v_visitor__pfr_wr_last_4_games,v_visitor__pfr_wr_last_8_games,h_home__ovr_wr_last_4_games,h_home__ovr_wr_last_8_games,h_home__open_wr_last_4_games,h_home__open_wr_last_8_games,h_home__close_wr_last_4_games,h_home__close_wr_last_8_games,h_home__pfr_wr_last_4_games,h_home__pfr_wr_last_8_games
0,201610230ram,0.5,0.5,0,0,0.5,0.5,0.5,0.5,0.5,...,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5
1,201610020jax,0.5,0.5,0,0,0.5,0.5,0.5,0.5,0.5,...,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5
2,201611210rai,0.5,0.5,0,0,0.5,0.5,0.5,0.5,0.5,...,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5
3,201710010mia,0.5,0.5,0,0,0.5,0.5,0.5,0.5,0.5,...,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5
4,201709240jax,0.5,0.5,0,0,0.5,0.5,0.5,0.5,0.5,...,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5,0.5


In [22]:
homeaway_feats_w_neutral = homeaway_feats.union(neutral)

## Write table

In [23]:
print '* __' + '__\n* __'.join(features_list) + '__'

* __h_is_home__
* __v_is_home__
* __h_consecutive_home__
* __v_consecutive_visitor__
* __v_visitor__ovr_wr_ytd__
* __v_visitor__open_wr_ytd__
* __v_visitor__close_wr_ytd__
* __v_visitor__pfr_wr_ytd__
* __h_home__ovr_wr_ytd__
* __h_home__open_wr_ytd__
* __h_home__close_wr_ytd__
* __h_home__pfr_wr_ytd__
* __v_visitor__ovr_wr_last_4_games__
* __v_visitor__ovr_wr_last_8_games__
* __v_visitor__open_wr_last_4_games__
* __v_visitor__open_wr_last_8_games__
* __v_visitor__close_wr_last_4_games__
* __v_visitor__close_wr_last_8_games__
* __v_visitor__pfr_wr_last_4_games__
* __v_visitor__pfr_wr_last_8_games__
* __h_home__ovr_wr_last_4_games__
* __h_home__ovr_wr_last_8_games__
* __h_home__open_wr_last_4_games__
* __h_home__open_wr_last_8_games__
* __h_home__close_wr_last_4_games__
* __h_home__close_wr_last_8_games__
* __h_home__pfr_wr_last_4_games__
* __h_home__pfr_wr_last_8_games__


In [24]:
homeaway_feats_w_neutral.select(*([key] + features_list))\
              .write.mode('overwrite').saveAsTable(out_tbl)

In [25]:
assert homeaway_feats_w_neutral.join(
            spark.table('game'), on='game_id'
        ).count() == homeaway_feats_w_neutral.count()

In [26]:
spark.table(out_tbl).limit(5).toPandas().T

Unnamed: 0,0,1,2,3,4
game_id,200709300dal,200710070htx,200811090nyj,200910180nor,200910250cle
h_is_home,1,1,1,1,1
v_is_home,0,0,0,0,0
h_consecutive_home,1,1,1,2,1
v_consecutive_visitor,2,1,1,1,1
v_visitor__ovr_wr_ytd,0,0,0.25,1,0.6
v_visitor__open_wr_ytd,0,0.25,0.3125,0.9,0.6
v_visitor__close_wr_ytd,0,0.25,0.3125,0.9,0.6
v_visitor__pfr_wr_ytd,0,0.25,0.375,0.9,0.6
h_home__ovr_wr_ytd,1,0.5,0.625,1,0.166667


### Days since last home/visitor game
* __a bye week is like a home game <-- tabling for now due to complexity this causes__
  * this is only needed to assess length of road trip -- byes are irrelevant for home streak
* impute 7 days for visitor since last home, since it's likely the beginning of the season. 
* if this turns out to be a good feature, look at a more precise method.

In [None]:
w = Window.partitionBy(*['team_id','season'])\
        .orderBy(col('date').desc())\
        .rowsBetween(1, 2)
         
## is_home == 1 means last time team was at home
## excludes home teams for this week
is_home = 1
hv_name = 'home'

df = hv_team.withColumn(
    'days_since_last_{}'.format(hv_name), 
    F.when(col('is_home') != is_home,
        F.datediff(
            'date',
            F.max(
                F.when(col('is_home') == 1, col('date')).otherwise(None)
            ).over(w)
        ))
    .otherwise(0)
).filter(col('team_id') == 'nwe').toPandas()

In [None]:
df.sort_values(by='date')