In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.functions import udf, monotonically_increasing_id, first, split

In [2]:
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setCheckpointDir('Data/checkpoints')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/07 14:15:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Helper Functions

def concat_dataframes(dataframes: list):
    out = dataframes.pop()
    for df in dataframes:
        out = out.union(df)
    return out

def rename_columns(dataframe, col_name_map:dict[str, str]):
    for col in col_name_map.keys():
        dataframe = dataframe.withColumnRenamed(col, col_name_map[col])
    return dataframe

def convert_df_datatypes(df, types:dict[str, list[str]]):
    if isinstance(next(iter(types.values())), str): # types expects {'type': ['col1', ...]}
        raise TypeError("types expects list of columns {'type': ['col1', ...]}")
    types = {types[k][i]: k for k in types.keys() for i in range(len(types[k]))}
    return df.withColumns({k: df[k].cast(types[k]) for k in types})

def split_csv_column(dataframe, csv_col:str):
    col_count = dataframe.head().asDict()[csv_col].count(',') + 1
    split_col = split(dataframe[csv_col], ',')
    return dataframe.withColumns({f'csv{i}': split_col.getItem(i) for i in range(col_count)})

## Source Data - Events

In [4]:
DATA_PATH = 'Data/retrosheet/alldata/'
EVENT_DIRS = ['events/', 'ngldata/events/', 'allstar/']


In [5]:
schema = types.StructType([
    types.StructField('event_type', types.StringType(), False),
    types.StructField('col2', types.StringType()),
    types.StructField('col3', types.StringType()),
    types.StructField('col4', types.StringType()),
    types.StructField('col5', types.StringType()),
    types.StructField('col6', types.StringType()),
    types.StructField('col7', types.StringType()),
])

In [6]:
# import all events
events = [spark.read.csv(DATA_PATH + ed, schema=schema) for ed in EVENT_DIRS]
events = concat_dataframes(events)

In [7]:
# count of games
events.filter(events.event_type == 'id').count()

                                                                                

193453

In [8]:
# used in udf
class GameId:

    def __init__(self):
        self.current_id = None

    def set_row_id(self, event_type, col2):
        if event_type=='id':
            self.current_id = col2
        if not self.current_id:
            raise ValueError('current_id cannot be None.')
        return self.current_id

In [9]:
gi = GameId()
set_game_id = udf(gi.set_row_id)

events = events.withColumns({'game_id': set_game_id(events['event_type'], events['col2']),
                            'record_id': monotonically_increasing_id()})
events = events.checkpoint()  # checkpoint locks game_id & record_id

                                                                                

In [10]:
events.show(500)

+----------+--------------------+--------------------+--------+----+----+--------------------+------------+---------+
|event_type|                col2|                col3|    col4|col5|col6|                col7|     game_id|record_id|
+----------+--------------------+--------------------+--------+----+----+--------------------+------------+---------+
|        id|        NLS193807060|                NULL|    NULL|NULL|NULL|                NULL|NLS193807060|        0|
|   version|                   1|                NULL|    NULL|NULL|NULL|                NULL|NLS193807060|        1|
|      info|       inputprogvers|version 7RS(19) o...|    NULL|NULL|NULL|                NULL|NLS193807060|        2|
|      info|             visteam|                 ALS|    NULL|NULL|NULL|                NULL|NLS193807060|        3|
|      info|            hometeam|                 NLS|    NULL|NULL|NULL|                NULL|NLS193807060|        4|
|      info|                date|          1938/07/06|  

In [11]:
events.dtypes

[('event_type', 'string'),
 ('col2', 'string'),
 ('col3', 'string'),
 ('col4', 'string'),
 ('col5', 'string'),
 ('col6', 'string'),
 ('col7', 'string'),
 ('game_id', 'string'),
 ('record_id', 'bigint')]

In [12]:
# events.limit(1000).write.csv('out/test.csv')

In [13]:
events.count()

                                                                                

29632387

## Game Info Table

In [14]:
type_map = {'short': ['number', 'temp', 'timeofgame', 'windspeed'],
            'integer': ['attendance'],
            'date': ['date'],
            'timestamp': ['inputtime']}
# also works:
# type_map = {types.ShortType(): ['number', 'temp', 'timeofgame', 'windspeed'],
#             types.IntegerType(): ['attendance'],
#             types.DateType(): ['date'],
#             types.TimestampType(): ['inputtime']}

game_info_table = (events.filter(events.event_type == 'info')
                         .select('game_id', 'col2', 'col3')
                         .groupBy('game_id').pivot('col2').agg(first('col3')))
game_info_table = convert_df_datatypes(game_info_table, types=type_map)



CodeCache: size=131072Kb used=34648Kb max_used=34658Kb free=96424Kb
 bounds [0x00000001071e8000, 0x0000000109408000, 0x000000010f1e8000]
 total_blobs=13411 nmethods=12447 adapters=876
 compilation: disabled (not enough contiguous free space left)


                                                                                

In [15]:
game_info_table.show()

24/07/07 14:15:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+------------+----------+----+--------+--------+---------+------------+-----+--------+----------+---------+----+-------+--------------------+--------------------+---------+--------+------+--------+-------+-------+--------+------------------+-----+--------+---------+----+----------+----------+--------------------+--------+--------+--------+--------+--------+--------+-----+-------+-------+---------+--------+
|     game_id|attendance|date|daynight|edittime|fieldcond|    gametype|gwrbi|hometeam|howentered|howscored|htbf|innings|       inputprogvers|            inputter|inputtime|      lp|number| oscorer|pitches| precip|    save|            scorer| site|     sky|starttime|temp|tiebreaker|timeofgame|          translator|   ump1b|   ump2b|   ump3b| umphome|   umplf|   umprf|usedh|visteam|winddir|windspeed|      wp|
+------------+----------+----+--------+--------+---------+------------+-----+--------+----------+---------+----+-------+--------------------+--------------------+---------+--------

In [16]:
game_info_table.dtypes

[('game_id', 'string'),
 ('attendance', 'int'),
 ('date', 'date'),
 ('daynight', 'string'),
 ('edittime', 'string'),
 ('fieldcond', 'string'),
 ('gametype', 'string'),
 ('gwrbi', 'string'),
 ('hometeam', 'string'),
 ('howentered', 'string'),
 ('howscored', 'string'),
 ('htbf', 'string'),
 ('innings', 'string'),
 ('inputprogvers', 'string'),
 ('inputter', 'string'),
 ('inputtime', 'timestamp'),
 ('lp', 'string'),
 ('number', 'smallint'),
 ('oscorer', 'string'),
 ('pitches', 'string'),
 ('precip', 'string'),
 ('save', 'string'),
 ('scorer', 'string'),
 ('site', 'string'),
 ('sky', 'string'),
 ('starttime', 'string'),
 ('temp', 'smallint'),
 ('tiebreaker', 'string'),
 ('timeofgame', 'smallint'),
 ('translator', 'string'),
 ('ump1b', 'string'),
 ('ump2b', 'string'),
 ('ump3b', 'string'),
 ('umphome', 'string'),
 ('umplf', 'string'),
 ('umprf', 'string'),
 ('usedh', 'string'),
 ('visteam', 'string'),
 ('winddir', 'string'),
 ('windspeed', 'smallint'),
 ('wp', 'string')]

In [17]:
game_info_table.count()

                                                                                

193415

## Game Data Table

In [18]:
game_data_table = events.filter(events.event_type == 'data').select(['game_id', 'col2', 'col3', 'col4'])
game_data_table = rename_columns(game_data_table, 
                                 {'col2': 'metric_type', 'col3': 'player_code', 'col4': 'metric_value'})
game_data_table = convert_df_datatypes(game_data_table, {'short': ['metric_value']})

In [19]:
game_data_table.show()

+------------+-----------+-----------+------------+
|     game_id|metric_type|player_code|metric_value|
+------------+-----------+-----------+------------+
|NLS193807060|         er|   gomel102|           0|
|NLS193807060|         er|   allej102|           1|
|NLS193807060|         er|   grovl101|           0|
|NLS193807060|         er|   vandj101|           0|
|NLS193807060|         er|   lee-b103|           0|
|NLS193807060|         er|   browm103|           1|
|ASW193808210|         er|   walke103|           5|
|ASW193808210|         er|   browb107|           0|
|ASW193808210|         er|   taylj108|           0|
|ASW193808210|         er|   cornw101|           3|
|ASW193808210|         er|   smith111|           1|
|ASW193808210|         er|   radct101|           0|
|NNL193809240|         er|   barnv103|           9|
|NNL193809240|         er|   haysc101|           2|
|NNL193809240|         er|   mcdut101|           3|
|NNL193809250|         er|   trent101|           4|
|NNL19380925

In [20]:
game_data_table.count()

                                                                                

1115275

## Game Rosters Table

In [21]:
game_rosters_table = (events.filter(events.event_type.isin(['start', 'sub']))
                            .select(['game_id', 'record_id', 'event_type', 
                                     'col2', 'col3', 'col4', 'col5', 'col6']))
game_rosters_table = rename_columns(game_rosters_table,
                                    {'col2': 'player_id', 'col3': 'player_name', 
                                     'col4': 'is_home_team', 'col5': 'batting_order',
                                     'col6': 'fielding_position'})
game_rosters_table = convert_df_datatypes(game_rosters_table,
                                          {'boolean': ['is_home_team'],
                                           'short': ['batting_order', 'fielding_position']})

In [22]:
game_rosters_table.show()

+------------+---------+----------+---------+------------------+------------+-------------+-----------------+
|     game_id|record_id|event_type|player_id|       player_name|is_home_team|batting_order|fielding_position|
+------------+---------+----------+---------+------------------+------------+-------------+-----------------+
|NLS193807060|       34|     start| kreem101|     Mike Kreevich|       false|            1|                7|
|NLS193807060|       35|     start| gehrc101| Charlie Gehringer|       false|            2|                4|
|NLS193807060|       36|     start| avere101|      Earl Averill|       false|            3|                8|
|NLS193807060|       37|     start| foxxj101|       Jimmie Foxx|       false|            4|                3|
|NLS193807060|       38|     start| dimaj101|      Joe DiMaggio|       false|            5|                9|
|NLS193807060|       39|     start| dickb101|       Bill Dickey|       false|            6|                2|
|NLS193807

In [23]:
#TODO: dtypes

## Umpire Change Events Table

In [24]:
umpchange_events_table = events.filter(events.col2.contains('umpchange'))
umpchange_events_table = split_csv_column(umpchange_events_table, 'col2')
umpchange_events_table = umpchange_events_table.select(['record_id', 'game_id', 
                                                        'csv1', 'csv2', 'csv3'])
umpchange_events_table = rename_columns(umpchange_events_table, {'csv1': 'inning',
                                                                 'csv2': 'position',
                                                                 'csv3': 'umpire_id'})
umpchange_events_table = convert_df_datatypes(umpchange_events_table, {'short': ['inning']})

In [25]:
umpchange_events_table.show()

+---------+------------+------+--------+---------+
|record_id|     game_id|inning|position|umpire_id|
+---------+------------+------+--------+---------+
|      843|NLS196107110|     5| umphome| runge901|
|      844|NLS196107110|     5|   ump1b| craws901|
|      845|NLS196107110|     5|   ump2b| umonf901|
|      846|NLS196107110|     5|   ump3b| lands901|
|     1415|NLS194207060|     5| umphome| mcgob901|
|     1416|NLS194207060|     5|   ump1b| balll901|
|     1417|NLS194207060|     5|   ump2b| stewe901|
|     1418|NLS194207060|     5|   ump3b| barla901|
|     1946|NLS194707080|     5| umphome| passa901|
|     1947|NLS194707080|     5|   ump1b| henlb101|
|     1948|NLS194707080|     5|   ump2b| boyej901|
|     1949|NLS194707080|     5|   ump3b| conlj102|
|     2496|ALS194607090|     5| umphome| goetl901|
|     2497|ALS194607090|     5|   ump1b| romme101|
|     2498|ALS194607090|     5|   ump2b| boggd901|
|     2499|ALS194607090|     5|   ump3b| summb901|
|     3005|NLS194407110|     5|

In [26]:
umpchange_events_table.count()

                                                                                

1553

In [27]:
umpchange_events_table.dtypes

[('record_id', 'bigint'),
 ('game_id', 'string'),
 ('inning', 'smallint'),
 ('position', 'string'),
 ('umpire_id', 'string')]

## Comments Table

In [28]:
comments_table = (events.filter(events.event_type == 'com')
                              .filter(~events.col2.contains('umpchange'))
                              .select(['record_id', 'game_id', 'col2']))

In [29]:
comments_table.show(20, False)

+---------+------------+------------------------------------------------------------------------+
|record_id|game_id     |col2                                                                    |
+---------+------------+------------------------------------------------------------------------+
|52       |NLS193807060|$Managers: Bill Terry (NL New York), Joe McCarthy (AL New York)         |
|130      |NLS193807060|$Charlie Gehringer failed to cover 1B on the attempted sacrifice;       |
|131      |NLS193807060|Jimmie Foxx's throw went into RF and Joe DiMaggio overthrew HP          |
|352      |NNL193809240|$Play-by-play deduced from newspaper game stories and box score         |
|504      |NNL193809250|$Play-by-play deduced from newspaper game stories and box score         |
|658      |SAS193810020|$Play-by-play deduced from newspaper game stories and box score         |
|797      |NLS196107110|$Managers: Danny Murtaugh (NL Pittsburgh), Paul Richards (AL Baltimore);|
|798      |NLS196107

In [30]:
comments_table.count()

                                                                                

227801

## Adjustments Table

In [31]:
@udf(returnType=types.MapType(types.StringType(), types.StringType()))
def _adjustment_values_map(event_type, col2, col3):
    if event_type == 'badj':
        out = {'event_name': 'Batter Adjustment','player_id': col2, 'hand': col3}
    elif event_type == 'padj':
        out = {'event_name': 'Pitcher Adjustment', 'player_id': col2, 'hand': col3}
    elif event_type == 'ladj':
        out = {'event_name': 'Lineup Adjustment', 'batting_team': col2, 'batting_order_position': col3}
    elif event_type == 'radj':
        out = {'event_name': 'Runner Adjustment', 'player_id': col2, 'base': col3}
    elif event_type == 'presadj':
        out = {'event_name': 'Pitcher Responsiblity Adjustment', 'player_id': col2, 'base': col3}
    else:
        out = None
    return out


In [32]:
adjustments_table = events.filter(events.event_type.isin(['badj', 'padj', 'ladj', 'radj', 'presadj']))
adjustments_table = (adjustments_table.withColumn('adjustment_events_map',
                                                  _adjustment_values_map(adjustments_table.event_type,
                                                                         adjustments_table.col2,
                                                                         adjustments_table.col3))
                                      .select(['record_id', 'game_id', 
                                               'event_type', 'adjustment_events_map']))

In [33]:
adjustments_table.show(20, False)

+-----------+------------+----------+----------------------------------------------------------------------------------+
|record_id  |game_id     |event_type|adjustment_events_map                                                             |
+-----------+------------+----------+----------------------------------------------------------------------------------+
|68719492828|DET194406160|presadj   |{event_name -> Pitcher Responsiblity Adjustment, player_id -> orrej101, base -> 3}|
|68719494521|DET194408272|presadj   |{event_name -> Pitcher Responsiblity Adjustment, player_id -> eatoz101, base -> 3}|
|68719545278|PHA194505240|ladj      |{batting_order_position -> 6, event_name -> Lineup Adjustment, batting_team -> 1} |
|68719545281|PHA194505240|ladj      |{batting_order_position -> 5, event_name -> Lineup Adjustment, batting_team -> 1} |
|68719545290|PHA194505240|ladj      |{batting_order_position -> 6, event_name -> Lineup Adjustment, batting_team -> 1} |
|68719554816|DET194606262|presad

In [34]:
adjustments_table.dtypes

[('record_id', 'bigint'),
 ('game_id', 'string'),
 ('event_type', 'string'),
 ('adjustment_events_map', 'map<string,string>')]

In [35]:
adjustments_table.count()

                                                                                

7278

## Events Table

In [36]:
events_table = (events.filter(events.event_type == 'play')
                      .select(['record_id', 'game_id', 'col2', 'col3', 
                               'col4', 'col5', 'col6', 'col7']))
col_names = {'col2': 'inning', 'col3': 'is_home_team', 'col4': 'player_id', 
             'col5': 'count', 'col6': 'pitches', 'col7': 'play_description'}
events_table = rename_columns(events_table, col_names)
events_table = convert_df_datatypes(events_table, {'short': ['inning'], 
                                                   'boolean': ['is_home_team']})

In [37]:
events_table.show()

+---------+------------+------+------------+---------+-----+-------+----------------+
|record_id|     game_id|inning|is_home_team|player_id|count|pitches|play_description|
+---------+------------+------+------------+---------+-----+-------+----------------+
|       53|NLS193807060|     1|       false| kreem101|   ??|   NULL|             8/F|
|       54|NLS193807060|     1|       false| gehrc101|   ??|   NULL|              13|
|       55|NLS193807060|     1|       false| avere101|   ??|   NULL|              43|
|       56|NLS193807060|     1|        true| hacks101|   ??|   NULL|              S7|
|       57|NLS193807060|     1|        true| hermb101|   ??|   NULL|        E6/G.1-3|
|       58|NLS193807060|     1|        true| goodi101|   ??|   NULL|             K/C|
|       59|NLS193807060|     1|        true| medwj101|   ??|   NULL|       8.3-H(UR)|
|       60|NLS193807060|     1|        true| ott-m101|   ??|   NULL|             8/F|
|       61|NLS193807060|     2|       false| foxxj101|

In [38]:
events_table.dtypes

[('record_id', 'bigint'),
 ('game_id', 'string'),
 ('inning', 'smallint'),
 ('is_home_team', 'boolean'),
 ('player_id', 'string'),
 ('count', 'string'),
 ('pitches', 'string'),
 ('play_description', 'string')]

In [39]:
events_table.count()

                                                                                

17032081

## Test Write to Database

In [44]:
from my_secrets import airflow_pgres_pw


# db_writer = DataFrameWriter(game_info_table)
db_url = 'jdbc:postgresql://localhost:5432/sports'
table = 'test.test_spark'
mode = 'overwrite'
properties = {"user": "airflow", 'password': airflow_pgres_pw, 
              'driver': 'org.postgresql.Driver'}
game_data_table.write.jdbc(url=db_url, table=table, mode=mode,
                           properties=properties)

                                                                                

In [45]:
# events_table.write.jdbc(url=db_url, table='test.test_events_table', mode=mode,
#                            properties=properties)

docker run -p 9000:9000 -p 9001:9001 --name minio \
  -e "MINIO_ROOT_USER=minioadmin" \
  -e "MINIO_ROOT_PASSWORD=minioadmin" \
  minio/minio server /data --console-address ":9001"


                                                                                