In [None]:
import numpy as np
import pandas as pd

# initialize list of all pitches
df = pd.read_csv('cleanedPitches.csv')
pitchList = df['pitch_type'].tolist()

# initialize count lists
ballList = df['b_count'].tolist()
strikeList = df['s_count'].tolist()

# initialize pitch types
pitchTypes = []
numStates = len(pitchTypes)

# initialize matrices 
emissionMatrix = np.zeros((20, numStates), dtype=int)
transitionMatrix = np.zeros((numStates, numStates))

# initialize probabilities
startProb = np.array(numStates)
transitionProb = None
emissionProb = None

# add pitches that are in arsenal
necessaryPitches = []

for i in range (len(pitchList)):
    if (pitchList[i] in pitchTypes):
        necessaryPitches.append(pitchList[i])

The observation sequence is inputed manually.

In [None]:
obsSequence = []

The transition matrix will hold the probabilities of transitioning between different pitches. The dataset of all pitches thrown in the 2019 MLB season will be used.

In [None]:
def createTransitionMatrix(pitchTypes):
    # initialize transition matrix
    transitionMatrix_counts = np.zeros((numStates, numStates))
    
    # append to matrix
    for i in range(len(necessaryPitches) - 1):
        currentState = necessaryPitches[i]
        nextState = necessaryPitches[i + 1]
        transitionMatrix_counts[pitchTypes.index(currentState)][pitchTypes.index(nextState)] += 1

    # change counts to percentages
    transition_rowSums = transitionMatrix_counts.sum(axis=1)
    transitionMatrix = transitionMatrix_counts / transition_rowSums[:, np.newaxis]

Emission Matrix for count

In [None]:
def createEmissionMatrix(pitchTypes):
    numStates = len(pitchTypes)

    # list for possible counts (balls * 4 + strikes)
    possibleCounts = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

    # initialize and fill 2x2 count matrix with pitch thrown
    countMatrix = np.zeros((20, numStates), dtype=int)

    for i in range(len(necessaryPitches) - 1):
        currBalls = ballList[i]
        currStrikes = strikeList[i]

        currCount = (currBalls * 4) + currStrikes

        pitchThrown = necessaryPitches[i]
        countMatrix[possibleCounts.index(currCount), pitchTypes.index(pitchThrown)] += 1

    # change counted data to percentages
    emission_rowSums = countMatrix.sum(axis=1)
    emissionMatrix = countMatrix / emission_rowSums[:, np.newaxis]

In [None]:
def calculateProbabilities():
    # start probabilities assume every pitch is used equally
    for i in range(numStates):
        startProb[i] = 1/numStates

    # calculate transition probabilities
    transitionProb = transitionMatrix / np.sum(transitionMatrix, axis=1, keepdims=True)

    # calculate emission probabilities
    emissionProb = emissionMatrix / np.sum(emissionMatrix, axis=1, keepdims=True)

The viterbi algorithm then determines the next pitch

In [None]:
def viterbi():
    time = len(obsSequence)

    # initialize viterbi matrix
    vitMatrix = np.zeros((time, numStates))

    # initialize backpointer matrix
    backpointers = np.zeros((time, numStates), dtype=int)

    # initialize first column of viterbi matrix
    vitMatrix[:,0] = startProb * emissionProb[:, obsSequence[0]]

    for i in range (1, time):
        for j in range (numStates):
            # calculate max probability and corresponding state
            maxProb = vitMatrix[:, i-1] * transitionProb[:, j] * emissionProb[j, obsSequence[i]]
            maxState = np.argmax(maxProb)

            # update backpointers and viterbi matrix
            vitMatrix[i, j] = maxProb[maxState]
            backpointers[i, j] = maxState
    
    # update best path through backtracking
    bestPath_prob = np.max(vitMatrix[:, time - 1])
    bestPath = [np.argmax(0, backpointers[bestPath[0], time])]

    for i in range(time - 1, 0, -1):
        bestPath.insert(0, backpointers[bestPath[0], i])

In [None]:
def main():
    createTransitionMatrix
    createEmissionMatrix
    calculateProbabilities
    viterbi