In [3]:
## import libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import Lasso
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from math import sqrt
from pybaseball import statcast
from pybaseball import playerid_reverse_lookup
from bs4 import BeautifulSoup
import requests
import lxml
import datetime
import selenium
from selenium import webdriver
from selenium.webdriver.common.by import By
from pybaseball import batting_stats_bref
from pybaseball import statcast_batter_expected_stats
from pybaseball import statcast_pitcher_expected_stats
from pybaseball import pitching_stats



In [63]:
# functions



# get today's date formatted as yyyy-mm-dd
def get_today_date():
    # get the current date
    today = datetime.date.today()
    # format the date as yyyy-mm-dd
    date = today.strftime('%Y-%m-%d')
    # return the date
    return date

def get_starters():
    driver = webdriver.Chrome()
    # today's date
    date = get_today_date()
    # url for the website
    url = 'https://www.fangraphs.com/livescoreboard.aspx?date=' + date
    # get the website rendered with selenium
    driver.get(url)
    # get the html of the website
    # close any popups
    try:
      driver.find_element(by=By.CLASS_NAME, value='close').click()
    except:
      pass
    
    #driver.implicitly_wait(5)
    tables = driver.find_elements(by=By.CLASS_NAME, value='lineup')
    starting = []
    for table in tables:
        row = table.find_elements(by=By.TAG_NAME, value='tbody')[0].find_elements(by=By.TAG_NAME, value='tr')
        for r in row:
            col = r.find_elements(by=By.TAG_NAME, value='td')
            for c in col:
                a = c.find_elements(by=By.TAG_NAME, value='a')
                if len(a) > 0:
                    for i in a:
                        starting.append(i.text)
    return starting

            
def calculate_batting_fantasy_points(row):
  points = 0
  if row['events']=='single' or row['events'] == 'walk':
    points = points + 3
  elif row['events']=='double':
    points = points + 5
  elif row['events']=='triple':
    points = points + 8
  elif row['events']=='home_run':
    points = points + 10

  points = points + 4 * (row['post_bat_score']- row['bat_score'])

  return points

def calculate_pitching_fantasy_points(row):
  points = 0
  if row['events']=='strikeout':
    points = points + 2.75
  elif row['events']=='double_play' or row['events']=='grounded_into_double_play':
    points = points + 1.5
  elif row['events']=='strikeout_double_play' or row['events']=='strikeout_double_play':
    points = points + 3.5
  elif row['events']=='triple_play':
    points = points + 2.25
  elif row['events']=='single' or row['events'] == 'walk' or row['events'] == 'hit_by_pitch' or row['events'] == 'double' or row['events'] == 'triple' or row['events'] == 'home_run':
    points = points - .6
  else:
    points = points + .75

  points = points - 2 * (row['post_bat_score']- row['bat_score'])
  return points

def total_ball_movement(row):
  return sqrt(row['pfx_x']**2 + row['pfx_z']**2)

def batter_adjusted_plate_x(row):
  if row['stand'] == 'R':
    return row['plate_x']
  else:
    return -row['plate_x']

# sum batter stats per team for each game
def team_batters_games(df):
  df['team'] = df.apply(team, axis=1)
  team_batters = df.groupby(['game_pk', 'team'])['batter_fp'].sum().reset_index()
  return team_batters
    
# determine team based on inning_topbot
def team(row):
  if row['inning_topbot'] == 'Top':
    return row['away_team']
  else:
    return row['home_team']
  
# sum pitcher stats per team for each game
def team_pitchers_games(df):
  df['team'] = df.apply(team, axis=1)
  team_pitchers = df.groupby(['game_pk', 'team', 'player_name'])['pitcher_fp'].sum().reset_index()
  # get batters faced per pitcher
  batters_faced = df.groupby(['game_pk', 'team', 'player_name'])['batter'].count().reset_index()
  # merge the two dataframes
  team_pitchers = pd.merge(team_pitchers, batters_faced, on=['game_pk', 'team', 'player_name'])
  # for each game, select the pitcher with the most batters faced
  team_pitchers = team_pitchers.sort_values(by=['game_pk', 'batter'], ascending=False).drop_duplicates(subset=['game_pk', 'team'])
  return team_pitchers

# batter fantasy points by game
def batter_fp(df):
  df['batter_fp'] = df.apply(calculate_batting_fantasy_points, axis=1)
  batter_game_fp = df.groupby(['game_pk', 'batter'])['batter_fp'].sum().reset_index()
  return batter_game_fp

# get number of plate appearances for each batter
def get_plate_appearances(df):
  plate_appearances = df.groupby(['batter'])['batter'].count().reset_index(name='pa')
  return plate_appearances

# filter the df to only the last x days
def last_x_days(df, x):
  # get the current date
  today = datetime.date.today()
  # get the date x days ago
  x_days_ago = today - datetime.timedelta(days=x)
  # convert to date
  x_days_ago = x_days_ago.strftime('%Y-%m-%d')
  # filter the df to only the last x days
  df = df[df['game_date'] > x_days_ago]
  return df

# fix player names to remove latin characters
def fix_player_names(df, column_name):
  df[column_name] = df[column_name].str.normalize('NFKD').str.encode('ascii', errors='ignore').str.decode('utf-8')
  return df

# fix players names to move first name to the front and remove commas
def fix_player_names2(df, column_name):
  df[column_name] = df[column_name].str.split(',').str[1] + ' ' + df[column_name].str.split(',').str[0]
  return df

# get the frequency that a player has a stat between two values
def player_stat_frequency(df, player_name, column_name, lower, upper, is_batter=True):
  # filter the df to the selected player depending on if they are a batter or pitcher
  if is_batter:
    df = df[df['batter'] == player_name]
  else:
    df = df[df['player_name'] == player_name]

  # fix for when tyring to divide by zero
  if df.shape[0] == 0:
    return 0
  # find how often column name is between lower and upper
  freq = df[(df[column_name] >= lower) & (df[column_name] <= upper)].shape[0] / df.shape[0]
  return freq

# reformatting for merge
def convert_to_player_name(row):
  name = row['Name']
  # handle case for names with two spaces
  if len(name.split(' ')) > 2:
    return  name.split(' ')[1] + ' ' + name.split(' ')[2] + ', ' + name.split(' ')[0]
  else:
    return name.split(' ')[1] + ', ' + name.split(' ')[0]
  

In [74]:
# get starters and convert starters to csv
starting = get_starters()
starting_df = pd.DataFrame(starting)
starting_df.to_csv('C:\\Users\\jorda\\Downloads\\starting_lineups.csv', index=False)
starting_df

Unnamed: 0,0
0,Ronel Blanco
1,Marcus Stroman
2,Jose Altuve
3,Kyle Tucker
4,Yordan Alvarez
...,...
57,Gary Sanchez
58,Jake Bauers
59,Joey Ortiz
60,Michael Wacha


In [5]:
df = statcast(start_dt='2024-03-28', end_dt='2024-05-06')
df.head()

This is a large query, it may take a moment to complete


  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_cop

Unnamed: 0,pitch_type,game_date,release_speed,release_pos_x,release_pos_z,player_name,batter,pitcher,events,description,...,fld_score,post_away_score,post_home_score,post_bat_score,post_fld_score,if_fielding_alignment,of_fielding_alignment,spin_axis,delta_home_win_exp,delta_run_exp
1472,FF,2024-05-06,99.3,-1.13,6.17,"Suarez, Robert",666624,663158,field_out,hit_into_play,...,6,6,3,3,6,Standard,Standard,206,-0.005,-0.092
1530,FF,2024-05-06,99.8,-1.29,6.22,"Suarez, Robert",664023,663158,strikeout,called_strike,...,6,6,3,3,6,Infield shade,Standard,205,-0.012,-0.119
1550,FF,2024-05-06,100.2,-1.12,6.26,"Suarez, Robert",664023,663158,,foul,...,6,6,3,3,6,Infield shade,Standard,200,0.0,0.0
1609,FF,2024-05-06,100.0,-1.05,6.32,"Suarez, Robert",664023,663158,,foul_tip,...,6,6,3,3,6,Standard,Standard,206,0.0,-0.042
1648,FF,2024-05-06,99.6,-1.22,6.31,"Suarez, Robert",664023,663158,,ball,...,6,6,3,3,6,Standard,Standard,204,0.0,0.024


In [6]:
hrs = df[df['events']=='home_run']
hrs.to_csv('C:\\Users\\jorda\\Downloads\\hrs.csv', index=False)

In [76]:
# get base df


# Get all Statcast data from 2024
df = statcast(start_dt='2024-03-28', end_dt=get_today_date())

# Only include events
df = df.dropna(subset=['events']).dropna(subset=['pitch_type'])
df = fix_player_names(df, 'player_name')

# get more info
df['pitcher_fp'] = df.apply(lambda row: calculate_pitching_fantasy_points(row), axis=1)
df['batter_fp'] = df.apply(lambda row: calculate_batting_fantasy_points(row), axis=1)
df['total_ball_movement'] = df.apply(lambda row: total_ball_movement(row), axis=1)
df['batter_adjusted_plate_x'] = df.apply(lambda row: batter_adjusted_plate_x(row), axis=1)

# get team batters
team_batters = team_batters_games(df)

# get team pitchers
team_pitchers = team_pitchers_games(df)

# get game_pks and game_dates
game_info = df[['game_pk', 'game_date', 'away_team', 'home_team']].drop_duplicates()

# combine team batters and team pitchers
team_stats = pd.merge(team_batters, team_pitchers, on=['game_pk', 'team'])
# merge with game info
team_stats = pd.merge(team_stats, game_info, on='game_pk')
team_stats.head()

# get 75th percentile outcomes for batter_fp and pitcher_fp for pitchers and teams
team_stats['batter_fp'] = team_stats['batter_fp'].astype(float)
team_stats['pitcher_fp'] = team_stats['pitcher_fp'].astype(float)

# use a dataframe for team-based stats that only uses data from the last 10 days
team_info_df = last_x_days(team_stats, 10)
team_info_df['team_fp_75'] = team_info_df.groupby('team')['batter_fp'].transform(lambda x: x.quantile(.75))
team_info_df['opp_pitcher_fp_75'] = team_info_df.groupby('team')['pitcher_fp'].transform(lambda x: x.quantile(.75))
team_info_df['team_fp_75'] = team_info_df['team_fp_75'].fillna(0)
team_info_df['opp_pitcher_fp_75'] = team_info_df['opp_pitcher_fp_75'].fillna(0)

# pitchers can have full season stats
team_stats['pitcher_fp_75'] = team_stats.groupby('player_name')['pitcher_fp'].transform(lambda x: x.quantile(.75))
team_stats['opp_team_fp_75'] = team_stats.groupby('player_name')['batter_fp'].transform(lambda x: x.quantile(.75))
team_stats['pitcher_fp_75'] = team_stats['pitcher_fp_75'].fillna(0)
team_stats['opp_team_fp_75'] = team_stats['opp_team_fp_75'].fillna(0)

# separate teams and pitchers 75th percentiles, and drop duplicates
pitcher_outliers = team_stats[['player_name', 'pitcher_fp_75', 'opp_team_fp_75']].drop_duplicates()
pitcher_outliers = fix_player_names(pitcher_outliers, 'player_name')
team_outliers = team_info_df[['team', 'team_fp_75', 'opp_pitcher_fp_75']].drop_duplicates()

# change team from AZ to ARI and WSH to WAS
team_outliers['team'] = team_outliers['team'].replace({'AZ': 'ARI', 'WSH': 'WAS'})

#pitcher_outliers.to_csv('C:\\Users\\jorda\\Downloads\\pitcher_outliers.csv', index=False)
team_outliers.to_csv('C:\\Users\\jorda\\Downloads\\team_outliers.csv', index=False)

# use last_x_days to filter df for recency
batter_recent_info = last_x_days(df, 10)

# batter game fp
batter_game_fp = batter_fp(batter_recent_info)

# get plate appearances for each batter
plate_appearances = get_plate_appearances(batter_recent_info)

# get batter names
batter_ids = batter_game_fp['batter'].unique()
batter_name_id_pairs = playerid_reverse_lookup(batter_ids, key_type='mlbam')

# rename key_mlbam to batter; capitalize first letter of  combine name_first and name_last, then combine name_first and name_last to batter_name
batter_name_id_pairs = batter_name_id_pairs.rename(columns={'key_mlbam':'batter'})
batter_name_id_pairs['batter_name'] = batter_name_id_pairs['name_first'].str.capitalize() + ' ' + batter_name_id_pairs['name_last'].str.capitalize()

# merge batter names with batter game fp and plate_appearances
batter_game_fp = pd.merge(batter_game_fp, batter_name_id_pairs, on='batter')
batter_game_fp = pd.merge(batter_game_fp, plate_appearances, on='batter')

# filter out batters with less than 14 plate appearances
batter_game_fp = batter_game_fp[batter_game_fp['pa'] >= 14]

# get 25th and 75th percentile outcomes for batter_fp for batters
batter_game_fp['batter_fp'] = batter_game_fp['batter_fp'].astype(float)
batter_game_fp['batter_fp_25'] = batter_game_fp.groupby('batter')['batter_fp'].transform(lambda x: x.quantile(.25))
batter_game_fp['batter_fp_25'] = batter_game_fp['batter_fp_25'].fillna(0)
batter_game_fp['batter_fp_75'] = batter_game_fp.groupby('batter')['batter_fp'].transform(lambda x: x.quantile(.75))
batter_game_fp['batter_fp_75'] = batter_game_fp['batter_fp_75'].fillna(0)

# select only batter_name and batter_fp_75 and drop duplicates
batter_outliers = batter_game_fp[['batter_name', 'batter_fp_25', 'batter_fp_75', 'batter']].drop_duplicates()
# batter_outliers.to_csv('C:\\Users\\jorda\\Downloads\\batter_outliers.csv', index=False)



# get data for all qualified pitchers in 2024
pitcher_exp = pitching_stats(2024, qual=10)
pitcher_exp = pitcher_exp[['Name', 'WHIP', 'Stuff+', 'Location+', 'Pitching+']]

# apply convert_to_player_name to Name column
pitcher_exp['player_name'] = pitcher_exp.apply(lambda row: convert_to_player_name(row), axis=1)

# get pitcher frequency of plate_z between 2 and 3, release_speed between 90 and 95, and release_spin_rate between 2090 and 2470
pitcher_exp['hr_friendly_launch_speed_pitcher'] = pitcher_exp.apply(lambda row: player_stat_frequency(df, row['player_name'], 'launch_speed', 100, 109, is_batter=False), axis=1)
pitcher_exp['hr_friendly_launch_angle_pitcher'] = pitcher_exp.apply(lambda row: player_stat_frequency(df, row['player_name'], 'launch_angle', 25, 31.25, is_batter=False), axis=1)


# merge with pitcher_outliers
pitcher_exp = pd.merge(pitcher_exp, pitcher_outliers, on='player_name')
pitcher_exp.to_csv('C:\\Users\\jorda\\Downloads\\pitcher_exp.csv', index=False)


# get all of this season's expected batting data so far
batter_exp = statcast_batter_expected_stats(2024)
batter_exp = fix_player_names(batter_exp, 'last_name, first_name')
batter_exp = fix_player_names2(batter_exp, 'last_name, first_name')
# rename first column to batter_name and player_id to batter
batter_exp = batter_exp.rename(columns={batter_exp.columns[0]: 'batter_name'})
batter_exp = batter_exp.rename(columns={'player_id': 'batter'})

# get batter frequency of launch speed between 100 and 109 and launch angle between 25 and 31.25
batter_exp['hr_friendly_launch_speed_freq'] = batter_exp.apply(lambda row: player_stat_frequency(df, row['batter'], 'launch_speed', 100, 109), axis=1)
batter_exp['hr_friendly_launch_angle_freq'] = batter_exp.apply(lambda row: player_stat_frequency(df, row['batter'], 'launch_angle', 25, 31.25), axis=1)
# merge with batter_outliers

batter_exp = pd.merge(batter_exp, batter_outliers, on='batter')
batter_exp.to_csv('C:\\Users\\jorda\\Downloads\\batter_exp.csv', index=False)

This is a large query, it may take a moment to complete


That's a nice request you got there. It'd be a shame if something were to happen to it.
We strongly recommend that you enable caching before running this. It's as simple as `pybaseball.cache.enable()`.
Since the Statcast requests can take a *really* long time to run, if something were to happen, like: a disconnect;
gremlins; computer repair by associates of Rudy Giuliani; electromagnetic interference from metal trash cans; etc.;
you could lose a lot of progress. Enabling caching will allow you to immediately recover all the successful
subqueries if that happens.
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[column].apply(pd.to_datetime, errors='ignore', format=date_format)
  data_copy[column] = data_copy[col

In [52]:
# scrape draftkings and fanduel MLB contest data 
driver = webdriver.Chrome()
url = 'https://www.draftkings.com/lobby#/MLB/0/All'
response = driver.get(url)
soup = BeautifulSoup(response.text, 'lxml')
contests = driver.find_elements(by=By.CLASS_NAME, value='TileSlider_item')
table = driver.find_element(by=By.ID, value='lobby-grid')
contest_info = []
for contest in contests:
    description = contest.find_element(by=By.CLASS_NAME, value='GameSetTile_description-row').text
    date_time = contest.find('div', class_='GameSetTile_date-row').text
    description = date_time + '_' + description
    if('Games' in description):
        # click the contest to filter the table
        contest.click()
        # get contest_id from onclick attribute of the first <a> tag
        contest_id = contest.find('a')['onclick'].split('\'contest_id\': ')[1].split(',')[0]
        # go to the contest page then get the .csv from the <a> tag with title 'Export to CSV'
        contest_url = 'https://www.draftkings.com/draft/contest/' + contest_id
        contest_response = requests.get(contest_url)
        contest_soup = BeautifulSoup(contest_response.text, 'lxml')
        export = contest_soup.find('a', title='Export to CSV')
        export_url = export['href']
        contest_info.append([description, contest_id, export_url])
contest_info_df = pd.DataFrame(contest_info, columns=['description', 'contest_id', 'export_url'])

def download_contest_data(row):
    data = pd.read_csv(row['export_url'])
    id = row['contest_id']
    description = row['description']
    data.to_csv('C:\\Users\\jorda\\Downloads\\' + description + '.csv', index=False)

contest_info_df.apply(download_contest_data, axis=1)


Unnamed: 0,description,contest_id,export_url


In [92]:
# get mean pitch coordinates
mean_pitch_coordinates = df.groupby(['pitcher', 'player_name'])[['plate_x', 'plate_z', 'batter_adjusted_plate_x']].mean()
mean_pitch_coordinates.rename(columns={'plate_x':'mean_pitcher_plate_x', 'plate_z':'mean_pitcher_plate_z', 'batter_adjusted_plate_x':'mean_pitcher_batter_adjusted_plate_x'}, inplace=True)
mean_pitch_coordinates.to_csv("C:\\Users\\jorda\\Downloads\\pitch_coordinates.csv")
mean_pitch_coordinates.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,mean_pitcher_plate_x,mean_pitcher_plate_z,mean_pitcher_batter_adjusted_plate_x
pitcher,player_name,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
434378,"Verlander, Justin",-0.010952,2.425714,0.340476
445276,"Jansen, Kenley",-0.293714,2.740857,0.144571
445926,"Chavez, Jesse",0.268947,2.49,0.437895
450203,"Morton, Charlie",0.137474,2.151263,0.146316
455119,"Martin, Chris",-0.159474,2.504211,-0.215263


In [50]:
#get pitcher_outliers where player_name contains 'Rod'
rod = check_pitcher_exp[check_pitcher_exp['Name'].str.contains('Simeon')]
# remove accents from player_name
rod = fix_player_names(rod, 'Name')
rod

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[column_name] = df[column_name].str.normalize('NFKD').str.encode('ascii', errors='ignore').str.decode('utf-8')


Unnamed: 0,IDfg,Season,Name,Team,Age,W,L,WAR,ERA,G,...,Pit+ FC,Stf+ FS,Loc+ FS,Pit+ FS,Stuff+,Location+,Pitching+,Stf+ FO,Loc+ FO,Pit+ FO
51,24494,2024,Simeon Woods Richardson,MIN,23,1,0,0.8,1.74,4,...,,,,,85,109,104,,,
