In [None]:
from datetime import date
from os.path import basename, splitext
import re

from pyspark.sql import Row
from pyspark.sql.types import *


# TODO figure out why when scanning all records grep got a count of
# 120924 but the data frame has 120700.
# TODO make a SQL comment for dataframes for defaulted ints (like -1)


In [None]:
retroFilePath = "retrosheet-data/*/*.EV[AN]"
## retroFilePath = "retrosheet-data/2010*/*.EV[AN]" # just the 10s

rosterPath = "retrosheet-data/*/*.ROS"
teamPath = "retrosheet-data/*/TEAM[0-9][0-9][0-9][0-9]"

DIST_PARQUET_DIR = "dist/parquet/"


In [None]:

# EVENT FILES: the following is generic stuff to processes
# the event files, ie *.EV[AN]


# proc to process a whole game's events. 
# by using '\nid,' as our delim above, we get called with a whole game's
# events. we resplit them on newlines so lines can be parsed with event type
# specific parsers, but first prepend the game_id and a sequence into the game's
# events to each record.
def processRecord( rec ):
    
    (k,recordsBetweenIdLines) = rec;

    # first, resplit the events
    events = recordsBetweenIdLines.splitlines()

    # first off, disgard non-game related records, ie they dont appear 
    # after an "id," token. examples:
    # 
    # com,"Copyright 2001 by Stats Inc."
    # com,"All Rights Reserved."
    if (events[0].startswith("com,")):
        return []
    
    # if we're not the first record, we dont still have the "id," token
    # restore the record so the id record parser can work like the others
    
    if (events[0].startswith("id,") is False):
        events[0] = "id," + events[0]
        
    # now, get the game_id that will be prepended to each record
    game_id = events[0].split(",")[1]    
    homeTeamCode = game_id[:3]
    gameYear = int(game_id[3:7])
    gameMonth = int(game_id[7:9])
    gameDay = int(game_id[9:11])
    gameDate = date(gameYear, gameMonth, gameDay)

    # now emit lines prepended with game_id and the seq into the game, then
    # the original lines from the file.
    return (( 
        game_id, 
        seq,
        homeTeamCode,
        gameDate,
        record.split(",") ## TODO consider flattening this in
    ) for seq, record in enumerate(events))

## some stuff to hide details of the global columns prepended 
## to each event in processRecord() so the mappers below dont
## need to know about them

# TODO this should probably be a func, not a global
BASE_GLOBAL_COLUMNS_SCHEMA = [    
    StructField("game_id",StringType(),False), 
    StructField("seq",ShortType(),False), # TODO should be short
    StructField("homeTeamCode",StringType(),False), 
    StructField("gameDate",DateType(),False), 
]
def getBaseColumns(record):
    return [
        record[0],
        int(record[1]),
        record[2],
        record[3]
    ]
def getFields(record): 
    return record[-1]


#### Here is where the processing game event processing begins

# first, make an RDD that has all of a games records combined

retrosheet = sc.newAPIHadoopFile( 
    retroFilePath, 
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat', 
    'org.apache.hadoop.io.LongWritable', 
    'org.apache.hadoop.io.Text', 
    conf={
        'textinputformat.record.delimiter':'\nid,'
    }
)

# then, do a flatmap to get the events between "id" records and to 
# process them using the proc above
combinedFlattenedEvents = retrosheet.flatMap(processRecord)
combinedFlattenedEvents


In [None]:

## EVENT TYPE SPECIFIC STUFF


# a record mapper for key, [value] types such as "info", etc
def multiValueRecordMapper(record):
    retArray = getBaseColumns(record)       # base fields
    retArray.append(getFields(record)[1])   # the key column
    retArray.append(getFields(record)[2:])  # the array of values
    return retArray

# a util to provide fields for the dataframe schemas
def baseSchemaFields(stringColumnsToAdd=[]):
    
    # clone this w a slice so we dont polute the global, should be a func TODO
    fields = BASE_GLOBAL_COLUMNS_SCHEMA[:] 
    for colName in stringColumnsToAdd:
        fields.append(StructField(colName,StringType(),False))
    return fields


# utility to return schema/mapper handler for the "adjustment" records
# ie 'padj', 'badj','ladj'
def getAdjustmentHandler():
    return { 
        'schema': StructType(baseSchemaFields(["who", "what"])),
        'mapper': lambda rec: getBaseColumns(rec) + getFields(rec)[1:]
    }

## TODO get consistent with underscore vs camelCase for columns
SCHEMA_BY_TYPE = {    
    
    # sample line: start,howar001,"Ryan Howard",0,4,3               
    # sample line: sub,waldj001,"Jordan Walden",0,0,1
    'start': { 
        'schema': StructType(baseSchemaFields() + [
            StructField("player_id",StringType(),False), 
            StructField("playerName",StringType(),False), 
            StructField("home",BooleanType(),False), 
            StructField("battingOrder",ByteType(),False),
            StructField("position",ByteType(),False)
        ]), 
        'mapper': lambda rec: getBaseColumns(rec) + [
            getFields(rec)[1],
            getFields(rec)[2],
            ("0" == getFields(rec)[3]),  ## TODO verify "0" is home
            int(getFields(rec)[4]),
            int(re.sub('[^0-9]','',getFields(rec)[5])) # make this a reusable proc            
        ]
    },  
    
    # sample line: play,6,1,bondb001,02,CFX,HR/9.3-H;2-H;1-H  
    'play': { 
        'schema': StructType(baseSchemaFields() + [ 

            StructField("inning",ByteType(),False), 
            StructField("topOfInning",BooleanType(),False), 
            StructField("player_id",StringType(),False), 
            StructField("count",ArrayType(ByteType(), False),True), 
            StructField("pitch_seq",StringType(),False),
            StructField("description",StringType(),False)
        ]),  
        'mapper': lambda rec: getBaseColumns(rec) + [
            int(getFields(rec)[1]),             
            bool("0" == getFields(rec)[2]),
            getFields(rec)[3],
            [int(c) for c in list(getFields(rec)[4])] if getFields(rec)[4].isdigit() else [],
            getFields(rec)[5],
            getFields(rec)[6]            
        ]
        
    },    
    
    # sample line: 
    # com,"$Career homer 587 to pass Frank Robinson for 4th all-time"
    
    # or sample multi-line comment:
    # com,"$Hall caught in rundown while Winn advanced to 3B; both players"
    # com,"ended up on 3B and Winn is tagged out; Hall thought he was the one"
    # com,"who was out and stepped off the bag and is tagged out"  
    ## TODO collapse all of these com records should be one comment
    'com': { 
        'schema': StructType(baseSchemaFields(["comment"])),
        'mapper': lambda rec: getBaseColumns(rec) + [
            getFields(rec)[1]
        ]
    },
    
    
    # sample line: info,attendance,41128
    # most have the type a key and value, but there are multiple values
    # so: info,scorer,96,387,269,107,80,104,163,274,395
    
   'info': { 
        'schema': StructType(
            baseSchemaFields(["key"]) + [StructField("values",ArrayType(StringType()),False)]
        ),
        'mapper': multiValueRecordMapper
    },
    
    # sample line: data,er,fyhrm001,0

    'data': { 
        'schema': StructType(baseSchemaFields(["type", "key", "value"])),
        'mapper': lambda rec: getBaseColumns(rec) + getFields(rec)[1:]
    },       

    # lastly, the adjustment records
    # 
    # sample line: padj,harrg001,L
    # sample line: ladj,0,9
    # sample line: badj,bonib001,R    
    
    'padj': getAdjustmentHandler(),       
    'badj': getAdjustmentHandler(),       
    'ladj': getAdjustmentHandler(),       
}

# "sub" records are the same as "start" records
SCHEMA_BY_TYPE['sub'] = SCHEMA_BY_TYPE['start']

print("done")


In [None]:
dfByType = {}

# for recType,typeInfo in {col: SCHEMA_BY_TYPE[col] for col in ['info']}.items():
for recType,typeInfo in SCHEMA_BY_TYPE.items():    
    print("processing %s ..." % (recType))

    curRdd = combinedFlattenedEvents.filter(lambda rec: rec[-1][0]==recType)
    if curRdd.isEmpty(): continue 
    dfByType[recType] = sqlContext.createDataFrame(
        curRdd.map(typeInfo['mapper']), typeInfo['schema']
    )

    dfByType[recType].registerTempTable(recType)
    print("%s has %d records" % (recType,dfByType[recType].count()))
          
print("done")

In [None]:
# the following utils for both the roster files, ie *.ROS, and team files 
# TEAMyyyy. it prepends the rows in the file with its filename to extract
# things like the year


## this proc does two things, first it is a map function for a wholeTextFiles() call. It is
## meant to be called with a flatmap because it returns the rows of the file with the last part
## of the filename preprended to the records from the file. Additionally, this method can do a
## transforms on the file part which is passed in with the optional pathTransform parameter. this
## defaults to just an identity function but can be used to trim the path and cast to an int, say.
def getWholeTextParserWithPathTransform(pathTransform=lambda x:x, targetRecords=None):
    def wholeTextParserWithPath( record ): 
        (path, content) = record
        fileName = splitext(basename(path))[0]
        return [ 
            [pathTransform(fileName)] + record.split(",") 
            for record in content.splitlines() if (
                    targetRecords is None or len(record.split(",")) == targetRecords
                )
        ]
    return wholeTextParserWithPath

print("done")


In [None]:

### ROSTER files
ROSTER = "roster"

# sample greiz001,Greinke,Zack,R,R,LAN,P
rosterSchema = StructType([
    # prepended from filename
    StructField("year",ShortType(),False), 

    # rest from record
    StructField("player_id",StringType(),False), 
    StructField("firstName",StringType(),False), 
    StructField("lastName",StringType(),False), 
    StructField("bats",StringType(),False), 
    StructField("throws",StringType(),False), 
    StructField("team",StringType(),False), 
    StructField("position",StringType(),False)
])

rosterRowsRDD = sc.wholeTextFiles(rosterPath).flatMap(
    getWholeTextParserWithPathTransform(lambda x:int(x[3:])) # trim year part and cast to int
)
rosterDataFrame = sqlContext.createDataFrame(rosterRowsRDD,rosterSchema)
rosterDataFrame.registerTempTable(ROSTER)
print("%s has %d rows" % (ROSTER,rosterDataFrame.count()))



In [None]:

### TEAM files
TEAM = "team"

# sample MIL,N,Milwaukee,Brewers
teamSchema = StructType([
    # prepended from filename
    StructField("year",ShortType(),False), 

    # rest from record
    StructField("team_id",StringType(),False), 
    StructField("league",StringType(),False), 
    StructField("city",StringType(),False), 
    StructField("teamName",StringType(),False)
])

teamRowsRDD = sc.wholeTextFiles(teamPath).flatMap(
    getWholeTextParserWithPathTransform(lambda x:int(x[4:]),targetRecords=4)    
)
teamDataFrame = sqlContext.createDataFrame(teamRowsRDD, teamSchema)
teamDataFrame.registerTempTable(TEAM)
print("%s has %d rows" % (TEAM, teamDataFrame.count()))


print("done")

In [None]:
for tableName in sqlContext.tableNames():
    path = DIST_PARQUET_DIR + tableName
    print("saving %s to %s" % (tableName, path))
    sqlContext.table(tableName).write.save(path=path, format="parquet", mode="overwrite")

print("done")    
