# 03 - Feature Engineering

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
cur_path = "/content/drive/MyDrive/BDB 2025/"
os.chdir(cur_path)
!pwd

Mounted at /content/drive
/content/drive/MyDrive/BDB 2025


In [None]:
!pip install pyspark

# The entry point to programming Spark with the DataFrame API.
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("DataFrame").getOrCreate()



In [None]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

pd.set_option('display.max_columns', 500)

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import sqrt
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.types import IntegerType

import time

In [None]:
games = spark.read.option("header",'True').csv('./data/games.csv')
players = spark.read.option("header",'True').csv('./data/players.csv')
plays = spark.read.option("header",'True').csv('./data/plays.csv')
player_play = spark.read.option("header",'True').csv('./data/player_play.csv')
tracking = spark.read.option("header",'True').parquet('./data/tracking_week*.parquet')

## One hot encoding

One hot encode the teams to add context to the model. Not every team will run each route at the same rate.

In [None]:
# filter to offense, don't care about defensive tendencies
onehot = tracking.filter(tracking.isOnOffense==1)

# convert club to integer for one hot encoding
indexer = StringIndexer(inputCol="club", outputCol="clubId")
indexed = indexer.fit(onehot).transform(onehot)

# one hot encode club
encoder = OneHotEncoder(inputCols=["clubId"],
                        outputCols=["offenseVec"])
model = encoder.fit(indexed)
encoded = model.transform(indexed)
# encoded.show(5)

## Pivot Original Table

Pivot the original tracking table to get all players on one row. Additionally, get the distances to every player on the field.

In [None]:
# join table to itself to get distance to each player on the field
df = tracking.join(tracking.select('gameId','playId','frameId',col("nflId").alias("nflId2"),col("isOnOffense").alias("isOnOffense2"),
                                   col("club").alias("club2"),col("X_std").alias("X_std2"),col("Y_std").alias("Y_std2"),col("dir_std").alias("dir_std2"),
                                   col("o_std").alias("o_std2"),col("s").alias("s2"),col("a").alias("a2"),col("dis").alias("dis2")),
                   on=['gameId','playId','frameId'],how='left')#.show(5)

# filter to make it easier to view for now
# df = df.filter((df.gameId=='2022100901') & (df.playId=='1536') & (df.frameId=='60'))

# calculate distance
df = df.withColumn('dist',sqrt(pow(df.X_std-df.X_std2,2)+pow(df.Y_std-df.Y_std2,2)))
# remove where joined player equals main player
df = df.filter((df.nflId!=df.nflId2))
# each player should have 11 players on defense and 11 players on offense
window_spec = Window.partitionBy('gameId','playId','frameId','nflId','isOnOffense2','club2').orderBy("dist")
df = df.withColumn("row_number",row_number().over(window_spec))
# add offense/defense to the row number
df = df.withColumn("offDef", F.when((F.col('isOnOffense2') == '0') & (F.col('club2') != 'football'),concat(col('row_number'),lit('_def'))).otherwise(concat(col('row_number'),lit('_off'))))
# special value for the football
df = df.withColumn("offDef", F.when((F.col('club2') == 'football'),'football').otherwise(df.offDef))
# df = df.filter(df.nflId=='37075')
# df.show()

In [None]:
start = time.time()

games = games.filter((games.week=='2') | (games.week=='4')).sort("week", "gameId", ascending=[True, True])
for week in games.select('week').distinct().collect():
  print(f'Week {week[0]} start')
  # Create an empty RDD
  emp_RDD = spark.sparkContext.emptyRDD()
  # Create empty schema
  columns = StructType([])
  # Create an empty RDD with empty schema
  final_df = spark.createDataFrame(data = emp_RDD,schema = columns)
  for game in games.filter(games.week==week[0]).select('gameId').collect():
    # pivot table to get each frame on the same row for each player
    tracking_dist = df.filter((df.gameId == game[0])).groupBy(['gameId','playId','frameId','nflId']).\
                        pivot('offDef').\
                        agg(F.first('dist'),F.first('dir_std2'),F.first('o_std2'),F.first('s2'),F.first('a2'),F.first('dis2'))
    # only look at offensive players since they are the ones running the routes
    # join back to main df
    offensive_tracking = tracking.filter(tracking.isOnOffense==1).join(tracking_dist,on=['gameId','playId','frameId','nflId'])
    # only look at the players that ran routes
    offensive_tracking = offensive_tracking.join(player_play.filter(player_play.wasRunningRoute == '1').\
                                                select('gameId','playId','nflId','routeRan'),on=['gameId','playId','nflId'],how='inner')
    if final_df.count()==0:
      final_df = offensive_tracking
      print('Initialized data')
    else:
      final_df = final_df.union(offensive_tracking)
      print('Unioned data')
    # track games
    print(game[0])
  # offensive_tracking.show(5)
  final_df.write.option("header",True).mode('overwrite').parquet(f"./data/games/tracking_week_{week[0]}.parquet")
  final_df = final_df.unpersist()
  print(f'Week {week[0]} complete')
  end1 = time.time()
  print(f'total time elapsed for week {week[0]}: {end1-start}')

end = time.time()
print(f'total time elapsed: {end-start}')

Week 4 start
Initialized data
2022092900
Unioned data
2022100200
Unioned data
2022100201
Unioned data
2022100202
Unioned data
2022100203
Unioned data
2022100204
Unioned data
2022100205
Unioned data
2022100206
Unioned data
2022100207
Unioned data
2022100208
Unioned data
2022100209
Unioned data
2022100210
Unioned data
2022100211
Unioned data
2022100212
Unioned data
2022100213
Unioned data
2022100300
Week 4 complete
total time elapsed for week 4: 5601.552707195282
Week 2 start
Initialized data
2022091500
Unioned data
2022091800
Unioned data
2022091801
Unioned data
2022091802
Unioned data
2022091803
Unioned data
2022091804
Unioned data
2022091805
Unioned data
2022091806
Unioned data
2022091807
Unioned data
2022091808
Unioned data
2022091809
Unioned data
2022091810
Unioned data
2022091811
Unioned data
2022091812
Unioned data
2022091900
Unioned data
2022091901
Week 2 complete
total time elapsed for week 2: 11491.418672800064
total time elapsed: 11491.420011281967


In [None]:
# add time remaining to the plays table using pandas
# time remaining
# include seconds remaining in the half because plays at the end of the quarter vs beginning of the quarter in the same half will behave the same
# whereas plays at the end of a half and beginning of the next half will behave differently
splitTime = plays['gameClock'].str.split(':', n=1, expand=True).rename(columns={0:'minutes',1:'seconds'})
plays['secRemainHalf'] = splitTime['minutes'].astype(int) * 60 + splitTime['seconds'].astype(int)
plays['secRemainHalf'] = np.where(plays['quarter'].isin([1,3]),plays['secRemainHalf']+900,plays['secRemainHalf'])

In [None]:
plays.to_csv('./data/plays.csv',index=False)