In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from scipy.stats import binomtest
from sklearn.model_selection import train_test_split
import requests
from bs4 import BeautifulSoup
import re
from contextlib import redirect_stdout
import io
import sys
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import PowerTransformer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from tensorflow.keras import layers
from joblib import load


# Create a StringIO object to capture stdout so that tensorflow doesn't print extra outputs
stopper = io.StringIO()


mlb_teams = {
    'Arizona Diamondbacks': 'ARI',
    'Atlanta Braves': 'ATL',
    'Baltimore Orioles': 'BAL',
    'Boston Red Sox': 'BOS',
    'Chicago White Sox': 'CHW',
    'Chicago Cubs': 'CHC',
    'Cincinnati Reds': 'CIN',
    'Cleveland Guardians': 'CLE',
    'Colorado Rockies': 'COL',
    'Detroit Tigers': 'DET',
    'Houston Astros': 'HOU',
    'Kansas City Royals': 'KCR',
    'Los Angeles Angels': 'LAA',
    'Los Angeles Dodgers': 'LAD',
    'Miami Marlins': 'MIA',
    'Milwaukee Brewers': 'MIL',
    'Minnesota Twins': 'MIN',
    'New York Yankees': 'NYY',
    'New York Mets': 'NYM',
    'Oakland Athletics': 'OAK',
    'Philadelphia Phillies': 'PHI',
    'Pittsburgh Pirates': 'PIT',
    'San Diego Padres': 'SDP',
    'San Francisco Giants': 'SFG',
    'Seattle Mariners': 'SEA',
    'St. Louis Cardinals': 'STL',
    'Tampa Bay Rays': 'TBR',
    'Texas Rangers': 'TEX',
    'Toronto Blue Jays': 'TOR',
    'Washington Nationals': 'WSN'
}
totals = {
    'ARI': [0, 0],
    'ATL': [0, 0],
    'BAL': [0, 0],
    'BOS': [0, 0],
    'CHW': [0, 0],
    'CHC': [0, 0],
    'CIN': [0, 0],
    'CLE': [0, 0],
    'COL': [0, 0],
    'DET': [0, 0],
    'HOU': [0, 0],
    'KCR': [0, 0],
    'LAA': [0, 0],
    'LAD': [0, 0],
    'MIA': [0, 0],
    'MIL': [0, 0],
    'MIN': [0, 0],
    'NYY': [0, 0],
    'NYM': [0, 0],
    'OAK': [0, 0],
    'PHI': [0, 0],
    'PIT': [0, 0],
    'SDP': [0, 0],
    'SFG': [0, 0],
    'SEA': [0, 0],
    'STL': [0, 0],
    'TBR': [0, 0],
    'TEX': [0, 0],
    'TOR': [0, 0],
    'WSN': [0, 0]
}


def extract_numbers(string):
    numbers = re.findall(r'\d+', string)
    if len(numbers) >= 2:
        return int(numbers[0]), int(numbers[1])
    else:
        return None, None

def readInHistoricalData():
  df = pd.read_csv(r"C:\Users\Jack Leitzell\Documents\MLBDataframeWithRatingsAndScores.csv")

  #Getting rid of faulty entries
  df.dropna(inplace=True)
  df = df.drop(df[df.apply(lambda row: row.astype(str).str.contains('Not Found').any(), axis=1)].index)

  return df

def get_last_ratings(df_historical):
    df = df_historical
    ratings = {}

    # Get unique team codes from the dataframe
    teams = mlb_teams.values()

    # Iterate over unique team codes to get the last rating for each team
    for team_code in teams:
      found = False
      i = 1

      while found == False:
        i = i + 1
        j = len(df) - i

        try:

          if df.loc[j,"Home_ID"] == team_code:
            ratings[team_code] = df.loc[j,"Home Rating"]
            found = True
          elif df.loc[j,"Away_ID"] == team_code:
            ratings[team_code] = df.loc[j,"Away Rating"]
            found = True
        except KeyError:
          pass

    return ratings

def regressLastYearRatingsToMean():
  for key in ratings:
    ratings[key] = ratings[key] - (ratings[key] - 1500) / 2



def scrapeLatestData():

  #Scrapes 2024 game outcomes
  r=requests.get("https://www.baseball-reference.com/leagues/majors/2024-schedule.shtml")
  parsing = BeautifulSoup(r.text, 'html.parser')

  #Finds game entries
  games = parsing.find_all('p', class_='game')

  #Initialize lists
  home_teams = []
  away_teams = []
  home_scores = []
  away_scores = []

  #Loop through games
  for game in games:

      #Extract team names
      teams = game.find_all('a')
      home_team = teams[0].text.strip()
      away_team = teams[1].text.strip()

      #Handle cases where game is in the future
      gtext = game.text
      if ':' not in gtext:

        #Get scores
        home_score, away_score = extract_numbers(gtext)
        #Add to lists
        home_teams.append(home_team)
        away_teams.append(away_team)
        home_scores.append(home_score)
        away_scores.append(away_score)

  # Create a Pandas DataFrame
  data = {
      'Home Team': home_teams,
      'Away Team': away_teams,
      'R Home': home_scores,
      'R Away': away_scores
  }

  df = pd.DataFrame(data)

  #Get rid of faulty rows
  df.dropna(inplace=True)

  return df

def populateTotals(dfnew):
    #Populating the dictionary to find team average runs
    for i in range(len(dfnew)+50):
        try:
            home_team = dfnew.loc[i, 'Home Team']
            if home_team == "Arizona D'Backs":
                home_team = "Arizona Diamondbacks"
            home_code = mlb_teams[home_team]
            away_team = dfnew.loc[i, 'Away Team']
            if away_team == "Arizona D'Backs":
                away_team = "Arizona Diamondbacks"
            away_code = mlb_teams[away_team]
            #Finding teams then summing their runs
            totals[home_code][0] += dfnew.loc[i, 'R Home']
            totals[away_code][0] += dfnew.loc[i, 'R Away']
            #Counting games
            totals[home_code][1] += 1
            totals[away_code][1] += 1
        except Exception as e:
            pass
        

def expectation(elo1,elo2): #That player 1 wins
    #Elo equation
    expectation = (1+10**((elo2-elo1)/400))**-1

    return expectation

def update_rating(expectation,player_rating,result,score1,score2,k):
    #k = 20
    normal_win_factor = 3.55 #This is the average value that a team wins a baseball game by
    #Rating updater
    blowout_proportion = abs(score1-score2) # Swings elo changes more in blowouts
    normalized_blowout = blowout_proportion  / normal_win_factor
    normalized_blowout = normalized_blowout ** 0.5 #Square root maps it closer to 1

    return player_rating + k * normalized_blowout * (result - expectation)

def resolveOneGame(i,df,k):

    #Look at home and away teams
    home = df.loc[i,'Home Team']
    if home == "Arizona D'Backs":
      home = "Arizona Diamondbacks"

    away = df.loc[i,'Away Team']
    if away == "Arizona D'Backs":
      away = "Arizona Diamondbacks"
    #print(home,away)

    #Get Codes
    home_code = mlb_teams[home]
    away_code = mlb_teams[away]

    #Look up their ratings
    helo = ratings[home_code]
    aelo = ratings[away_code]
    exp= expectation(helo,aelo)
    #print(aelo,helo,exp)

    #Look up the game scores
    homescore = df.loc[i,'R Home']
    awayscore = df.loc[i,'R Away']

    #Stores their ratings from before (potentially usefull later)
    bhratings = ratings[home_code]
    baratings = ratings[away_code]

    #Updates ratings based on game outcome
    if homescore > awayscore:
        ratings[home_code] = update_rating(exp,helo,1,homescore,awayscore,k)
        ratings[away_code] = update_rating(1-exp,aelo,0,homescore,awayscore,k)
    elif homescore < awayscore:
        ratings[home_code] = update_rating(exp,helo,0,homescore,awayscore,k)
        ratings[away_code] = update_rating(1-exp,aelo,1,homescore,awayscore,k)
    else:
        ratings[home_code] = update_rating(exp,helo,0.5,homescore,awayscore,k)
        ratings[away_code] = update_rating(1-exp,aelo,0.5,homescore,awayscore,k)

    #Not strictly necessary but I used it when scraping the 2023 data
    return bhratings, baratings

def resolve_all_games(df_new):
    #Edited to give a heavier weight to games played more recently
    recent_games = 60
    inclusion = len(df_new) - recent_games
    for i in range(len(df_new)):
        if k_boost == True:
            if i < inclusion:
                k = 20
            else:
                k = 40
        else:
            k = 20
        resolveOneGame(i,df_new, k)


def trainNueralNet(df):

  #Convert dataframe data into y data
  #y_tuple = np.array([list(row) for row in df[['R Home', 'R Away']].values])
  y_tuple = np.array([[row[0], row[1], row[0] + row[1], row[0]-row[1]] for row in df[['R Home', 'R Away']].values])

  #Doing the same for x
  x_tuple = np.array([list(row) for row in df[['Home Rating', 'Away Rating', 'HomePitcherERA', 'AwayPitcherERA','Home avg Runs', 'Away avg Runs']].values.astype(float)])

  #Train Test Split
  ts = 0.2
  rs = np.random.randint(20,60)
  print(f"Random seed = {rs}")
  x_train, x_test, y_train, y_test = train_test_split(x_tuple, y_tuple, test_size=ts, random_state=rs)

  #Normalize data
  global scaler
  scaler = MinMaxScaler()
  X_normalized = scaler.fit_transform(x_train)

  global scalerY
  scalerY = PowerTransformer(method = 'yeo-johnson')

  Y_normalized = scalerY.fit_transform(y_train)

  #Global
  global model
  model = tf.keras.Sequential([
        tf.keras.layers.Dense(16, activation='relu', input_shape=(6,)),  # Input layer
        layers.Dropout(0.5),  # dropout rate of 0.5
        tf.keras.layers.Dense(48, activation='relu'),  # Hidden layer
        tf.keras.layers.Dense(32, activation='relu'),  # Hidden layer
        tf.keras.layers.Dense(16, activation='relu'),  # Hidden layer
        tf.keras.layers.Dense(4)  # Output layer with 2 neurons for 2 output features
    ])

  # Compile the model
  model.compile(optimizer='adam', loss=tf.keras.losses.Huber(), metrics=['mae'])

  # Train the model
  history = model.fit(X_normalized, Y_normalized, epochs=20, batch_size=32, validation_split=0.2)

  # Evaluate the model
  loss, mae = model.evaluate(x_test, y_test)

  print(f"Test Loss: {loss}")
  print(f"Test MAE: {mae}")

  return x_train, x_test, y_train, y_test

def getNNpreds(home_rating, away_rating, homeERA, awayERA,homeAvgRun, awayAvgRun):
  X = np.array([[home_rating, away_rating, homeERA, awayERA,homeAvgRun, awayAvgRun]])
  X_norm = scaler.transform(X)

  #Blocks a print line
  with redirect_stdout(stopper):

    #Makes predictions
    prediction = model.predict(X_norm)
    prediction = scalerY.inverse_transform(prediction)
    prediction = prediction*1.0625

  return prediction

def evaluateModel(x_test, y_test):
    
    #Initialize counters
    home_win_pred = 0
    home_win_wrong = 0
    away_win_pred = 0
    away_win_wrong = 0
    
    high_pred = 0
    high_wrong = 0
    low_pred = 0
    low_wrong = 0

    score_dif = []
    outcome = []

    #Loop through test data and update counters
    for i in range(len(y_test)):
    
        #Home vs away
        pred = getNNpreds(x_test[i][0],x_test[i][1],x_test[i][2],x_test[i][3],x_test[i][4],x_test[i][5])
        score_dif.append(pred[0][0]-pred[0][1])
        if y_test[i][0] > y_test[i][1]:
          #Home win
          outcome.append(1)
          if pred[0][0] > pred[0][1]:
            #Home predicted to win
            home_win_pred += 1
          else:
            home_win_wrong += 1
        else:
          #Away win
          outcome.append(0)
          if pred[0][0] < pred[0][1]:
            #Away predicted to win
            away_win_pred += 1
          else:
            away_win_wrong += 1
        
        #High scoring vs low scoring
        true_score = y_test[i][0] + y_test[i][1]
        pred_score = pred[0][0] + pred[0][1]
        
        if true_score > 8.5:
          if pred_score > 8.5:
            high_pred +=1
          else:
            high_wrong += 1
        else:
          if pred_score < 8.5:
            low_pred += 1
          else:
            low_wrong += 1
        
        #Give feedback on how long it might take
        if i%100 == 0:
          print(f"The model has run prediction {i} out of {len(y_test)}")
        
    #Plotting
    confusion_matrix_ha = np.array([[home_win_pred, home_win_wrong],
                            [away_win_wrong, away_win_pred]])
    plt.imshow(confusion_matrix_ha, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion Matrix (Home vs away)')
    plt.colorbar()
    plt.xlabel('Winner prediction: Home (left) vs Away (right)')
    plt.ylabel('True Outcome: Home Win (top) vs Away Win (bottom)')
    for i in range(2):
      for j in range(2):
          plt.text(j, i, str(confusion_matrix_ha[i][j]), ha='center', va='center', color='orange')
    
    plt.show()
    
    confusion_matrix_hl = np.array([[high_pred, high_wrong],
                            [low_wrong,low_pred]])
    plt.imshow(confusion_matrix_hl, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion Matrix (High vs Low Scoring)')
    plt.colorbar()
    plt.xlabel('Score prediction: High (left) vs Low (right)')
    plt.ylabel('True Outcome: High (top) vs Low (bottom)')
    for i in range(2):
      for j in range(2):
          plt.text(j, i, str(confusion_matrix_hl[i][j]), ha='center', va='center', color='orange')
    
    plt.show()


  #Return how many games the model successfuly predicted
    return home_win_pred + away_win_pred, home_win_wrong + away_win_wrong, score_dif, outcome

def binomialTest(right,wrong):
  # Total number of trials
  n_trials = right + wrong
  print(f"Correct Fraction: {right/n_trials}")
  # Probability of success for a fair coin flip (0.5)
  p_null = 0.5

  # Perform binomial test
  result = binomtest(right, n_trials, p_null)
  p_value = result.pvalue

  # Print the p-value
  print(f"The p-value for the binomial test is: {p_value}")

  # Check if the process guesses better than a coin flip
  alpha = 0.05  # significance level
  if p_value < alpha:
      print("The nueral net guesses the winner better than a coin flip (reject the null hypothesis).")
  else:
      print("The nueral net does cannot be said to guess better than a coin flip (fail to reject the null hypothesis at a significance level of 0.05).")
  return p_value

def logistic_regression(score_dif, outcome):
    if trainer == 'train':
        global score2prob
        score2prob = LogisticRegression()
    X = np.array(score_dif).reshape(-1, 1)
    #print(X)
    Y = np.array(outcome)
    #print(Y)
    if trainer == 'train':
        score2prob.fit(X, Y)
    
    # Predict probabilities for the test set
    y_pred_proba = score2prob.predict_proba(X)[:, 1]  # Probability of class 1 (home team wins)
    
    # Convert probabilities to binary outcomes based on a threshold (e.g., 0.5)
    threshold = 0.5
    y_pred = (y_pred_proba > threshold).astype(int)

    print("Logistic Regression Training Report")
    # Evaluate the model
    accuracy = accuracy_score(Y, y_pred)
    print("Accuracy:", accuracy)
    
    # Print classification report
    print(classification_report(Y, y_pred))
    

def initialize_k_booster():
    k_booster = input("Enter k to turn on k boosting. This makes the model more sensitive to the results of recent games. ")
    global k_boost
    k_boost = False
    if k_booster == "k":
        k_boost = True


def promptForData():

    try:
        #Take inputs
        home_team = input("Enter the home team: ")
        home_id = mlb_teams[home_team]
        away_team = input("Enter the away team: ")
        away_id = mlb_teams[away_team]
        home_era = float(input("Enter the ERA of the home pitcher: "))
        away_era = float(input("Enter the ERA of the away pitcher: "))
        raway = float(ratings[away_id])
        rhome = float(ratings[home_id])
        avgHome = totals[home_id][0]/totals[home_id][1]
        avgAway = totals[away_id][0]/totals[away_id][1]
        
        #Printing
        print(f"{home_team}: {rhome} {away_team}: {raway}")
        #Finding probability of a home win
        prob = expectation(rhome,raway)
        print(f"According to the elos, {home_team} have a {prob} chance of winning while {away_team} have a {1 - prob} chance. This does not take into acount additional information like ERAs.")
        prediction = getNNpreds(rhome, raway, home_era, away_era,avgHome,avgAway)
        print(f"{home_team} score prediction: {prediction[0][0]}")
        print(f"{away_team} score prediction: {prediction[0][1]}")
        difference = [[ prediction[0][0]-prediction[0][1] ]]
        logpred = score2prob.predict_proba(difference)
        print(f"Running logistic regression on the score difference suggests that {home_team} have a {logpred[0][1]} chance of winning while {away_team} have a {logpred[0][0]} chance.")
        print()
        if kel == True:
            if logpred[0][1] > logpred[0][0]:
                kelly(logpred[0][1], home_team)
            else:
                kelly(logpred[0][0], away_team)
        
        
    except Exception as e:
        # Handle the user entering something that would cause an error
        print(f"An error occurred: {e}")
        print()

def imports(df):
    
    global model
    model = tf.keras.models.load_model(r'C:\Users\Jack Leitzell\Downloads\MLBPredictor5.7.2024')

    global score2prob
    score2prob = load('score2prob.joblib')

    global scaler
    scaler = load('scaler.joblib')

    global scalerY
    scalerY = load('scalerY.joblib')

    eval = input("Do you want to see model evaluations? (y/n) ")
    if eval == 'y':
    
        #Convert dataframe data into y data
        #y_tuple = np.array([list(row) for row in df[['R Home', 'R Away']].values])
        y_tuple = np.array([[row[0], row[1], row[0] + row[1], row[0]-row[1]] for row in df[['R Home', 'R Away']].values])
        
        #Doing the same for x
        x_tuple = np.array([list(row) for row in df[['Home Rating', 'Away Rating', 'HomePitcherERA', 'AwayPitcherERA','Home avg Runs', 'Away avg Runs']].values.astype(float)])
        
        #Train Test Split
        ts = 0.2
        rs = np.random.randint(20,60)
        print(f"Random seed = {rs}")
        x_train, x_test, y_train, y_test = train_test_split(x_tuple, y_tuple, test_size=ts, random_state=rs)
        right, wrong, score_dif,outcome = evaluateModel(x_test, y_test)
        binomialTest(right,wrong)
        logistic_regression(score_dif, outcome)

def kel_setup():
    ks = input("Enter the value of your account if you wish to run Kelly fractions on the predictions. Otherwise click enter. ")
    global kel
    try:
        global act_val
        act_val = float(ks)
        kel = True
    except ValueError:
        kel = False

def american_to_decimal(american_odds):
    if american_odds > 0:
        final = 100 + american_odds
        original = 100
    else:
        original = abs(american_odds)
        final = 100 + original
    return (final - original)/original
    
def kelly(winprob, name):
    try:
        american_odds = float(input(f"Enter the American odds that the {name} win: "))
        decimal_odds = american_to_decimal(american_odds)
        q = 1 - winprob
        frac = winprob - q/decimal_odds
        if frac > 0:
            print(f"The Kelly fraction would be to bet {frac*100}% of your money.")
            print(f"Given your current account value this ammounts to ${frac * act_val}.")
            print(f"At 50% Kelly you should instead bet ${frac * act_val*0.50}.")
        else:
            print("There is no profitable bet availible on this game")
    except Error:
        print("It seems like you entered the odds incorrectly.")
    print()
            
            

def main():
    global trainer
    trainer = input('Enter "train" to retrain the models. Otherwise defaults to models trained on 5/7/2024. ')
    kel_setup()
    df_hist = readInHistoricalData()
    initialize_k_booster()
    global ratings
    
    ratings = get_last_ratings(df_hist)
    regressLastYearRatingsToMean()
    newDf = scrapeLatestData()
    populateTotals(newDf)
    resolve_all_games(newDf)
    if trainer == 'train':
        x_train, x_test, y_train, y_test = trainNueralNet(df_hist)
        right, wrong, score_dif,outcome = evaluateModel(x_test, y_test)
        binomialTest(right,wrong)
        logistic_regression(score_dif, outcome)
    else:
        imports(df_hist)
    while True:
        promptForData()

main()



ModuleNotFoundError: No module named 'tensorflow'

In [8]:
model.save(r'C:\Users\Jack Leitzell\Downloads\MLBPredictor5.7.2024')
from joblib import dump

# Assuming 'model' is your trained scikit-learn model
dump(score2prob, 'score2prob.joblib')

INFO:tensorflow:Assets written to: C:\Users\Jack Leitzell\Downloads\MLBPredictor5.7.2024\assets


['score2prob.joblib']

In [23]:
from joblib import dump

# Assuming 'normalization_model_1' and 'normalization_model_2' are your fitted normalization models
dump(scaler, 'scaler.joblib')
dump(scalerY, 'scalerY.joblib')

['scalerY.joblib']