In [None]:
# import necessary libraries
import airflow
import requests
import json

from airflow import DAG
from airflow.operators.python import PythonOperator


# testing out the functions for our DAG
# GOAL: Get pitcher and hitter stats for the Dodgers (only bring back relevant attributes) and load to Postgres


In [None]:
## FIRST - USE TEAM ID TO GET THE TEAM'S PLAYERS' INFO (X)

# grab team_id
# I will be using the Dodgers in this scenario (GO BLUE)

response = requests.get("http://lookup-service-prod.mlb.com/json/named.team_all_season.bam?sports_code='mlb'&season='2022'")
teams = response.json()['team_all_season']['queryResults']['row']
print(f"MLB lookup service API response code for team data: {response.status_code}")

team_id = list(x['team_id'] for x in teams if x['name_display_full'] == "Los Angeles Dodgers")[0]


# pulls a list of dictionaries, one for each Dodgers player
response = requests.get(f"https://lookup-service-prod.mlb.com/json/named.roster_40.bam?team_id='{team_id}'")
players = response.json()['roster_40']['queryResults']['row']
print(f"MLB lookup service API response code for players data: {response.status_code}")

# We want only items in player_attributes
player_attributes = ['player_id', 'name_display_first_last', 'birth_date', 'bats',  'position_txt', 'primary_position', 'starter_sw', 'start_date', 'end_date', 'throws', 'team_id']

# retrieve only the player_ids from the last players request
players_list = []
for count, dict in enumerate(players):
    new_dict = {}
    print(f"Now running {count}, '{dict['name_display_first_last']}'")
    new_dict.update({k: v for k, v in players[count].items() if k in player_attributes})
    players_list.append(new_dict)

players_ids = list(dict['player_id'] for dict in players_list)


In [None]:
## SECOND - USE PLAYER ID'S TO GET THEIR HITTING AND PITCHING STATS (X)

# retrieving stats for each player

# we want these stats attributes 
stats_attributes = ['team_id', 'hr', 'season', 'ab', 'hldr', 'ao', 'slg', 'ops', 'hbp', 'rbi', 'go_ao', 'hfly', 'lob', 'xbh', 'end_date', 'bb', 'np', 'hgnd', 'roe', 'sb', 'player_id', 'avg', 'sf', 'sac', 'wo', 'hpop', 'so', 'gidp_opp', 'gidp', 'ppa', 'd', 'tpa', 'g', 'h', 'ibb', 'go', 'team_seq', 'tb', 'cs', 'r', 't', 'babip', 'obp']


# create a loop that gets every players stats from id's list
# mlb api only lets me run two at a time
hit_stats = []
pitch_stats = []
for count, id in enumerate(players_ids[:2]):
    print(f"Stats running {count}, '{id}'")
    hit_response = requests.get(f"http://lookup-service-prod.mlb.com/json/named.sport_hitting_tm.bam?league_list_id='mlb'&game_type='R'&season='2022'&player_id='{id}'")
    stats = hit_response.json()['sport_hitting_tm']['queryResults']['row']
    hit_stats.append({k:v for k,v in stats.items() if k in stats_attributes})

    pitch_response = requests.get(f"http://lookup-service-prod.mlb.com/json/named.sport_hitting_tm.bam?league_list_id='mlb'&game_type='R'&season='2022'&player_id='{id}'")
    stats = pitch_response.json()['sport_hitting_tm']['queryResults']['row']
    pitch_stats.append({k:v for k,v in stats.items() if k in stats_attributes})


In [None]:
# THIRD - INSERT VALUES TO POSTGRES TABLES
# players_list, hit_stats, pitch_stats



conn = psycopg2.connect(
    dbname='',
    user='',
    password='',
    host='',
    port=''
)
cursor = conn.cursor()

# Creating the placeholders for the SQL query
columns = ', '.join(data[0].keys())
placeholders = ', '.join(['%s'] * len(data[0]))

insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

values = [tuple(d.values()) for d in data]

# Execute the SQL query
cursor.executemany(insert_query, values)

# Commit changes and close connection
conn.commit()
cursor.close()
conn.close()


# Inserting player information into PostgreSQL
insert_into_db(players_list, 'players_table')

# Inserting hitting stats into PostgreSQL
insert_into_db(hit_stats, 'hitting_stats_table')

# Inserting pitching stats into PostgreSQL
insert_into_db(pitch_stats, 'pitching_stats_table')

