# Dataload pipeline

## Setup

In [None]:
import os
from dotenv import load_dotenv
from ohmysportsfeedspy import MySportsFeeds
import json
from datetime import datetime
import pytz
from tqdm import tqdm
from airflow.models import Variable
import pandas as pd
import sqlalchemy

load_dotenv();

In [None]:
POSTGRES_USER = Variable.get("POSTGRES_USER")
POSTGRES_PW = Variable.get("POSTGRES_PW")
POSTGRES_HOST = Variable.get("POSTGRES_HOST")
MSF_API_KEY = Variable.get("MSF_API_KEY")
postgres_connection_str = f'postgres+psycopg2://{POSTGRES_USER}:{POSTGRES_PW}@{POSTGRES_HOST}/nba'

engine = sqlalchemy.create_engine(postgres_connection_str)

In [None]:
msf = MySportsFeeds(version="2.1")
msf.authenticate(MSF_API_KEY, "MYSPORTSFEEDS")

In [None]:
def flatten(d,sep="_"):
    import collections

    obj = {}

    def recurse(t,parent_key=""):
        
        if isinstance(t,list):
            for i in range(len(t)):
                recurse(t[i],parent_key + sep + str(i) if parent_key else str(i))
        elif isinstance(t,dict):
            for k,v in t.items():
                recurse(v,parent_key + sep + k if parent_key else k)
        else:
            obj[parent_key] = t

    recurse(d)

    return obj

## Parameters

In [None]:
season = "2016-2017-regular"

In [None]:
last_n_days = 1_000

## Season games

### Download games

In [None]:
output = msf.msf_get_data(league='nba', season=season, feed='seasonal_games', format='json', force=True)

### Parse output

In [None]:
upload = [
    {
        "season": season,
        **flatten({
            key: value for key, value in game["schedule"].items()
            if key not in ("officials", "broadcasters")
        }),
        **flatten({
            key: value for key, value in game["score"].items()
            if key not in ("quarters",)
        }),
        "quarters": game["score"]["quarters"],
        "officials": game["schedule"]["officials"],
        "broadcasters": game["schedule"]["broadcasters"]
    } for game in output["games"]
]

### Create table

In [None]:
# with engine.connect() as conn:
#     sqlq = """
#         CREATE TABLE IF NOT EXISTS public.games
#         (
#             season text,
#             id bigint,
#             startTime timestamp with time zone,
#             endedTime timestamp with time zone,
#             awayTeam_id bigint,
#             awayTeam_abbreviation text,
#             homeTeam_id bigint,
#             homeTeam_abbreviation text,
#             venue_id bigint,
#             venue_name text,
#             venueAllegiance text,
#             scheduleStatus text,
#             originalStartTime timestamp with time zone,
#             delayedOrPostponedReason text,
#             playedStatus text,
#             attendance bigint,
#             weather_type text,
#             weather_description text,
#             weather_wind_speed_milesPerHour bigint,
#             weather_wind_speed_kilometersPerHour bigint,
#             weather_wind_direction_degrees bigint,
#             weather_wind_direction_label text,
#             weather_temperature_fahrenheit bigint,
#             weather_temperature_celsius bigint,
#             weather_precipitation_type text,
#             weather_precipitation_percent text,
#             weather_precipitation_amount_millimeters double precision,
#             weather_precipitation_amount_centimeters text,
#             weather_precipitation_amount_inches text,
#             weather_precipitation_amount_feet text,
#             weather_humidityPercent bigint,
#             currentQuarter text,
#             currentQuarterSecondsRemaining text,
#             currentIntermission text,
#             awayScoreTotal bigint,
#             homeScoreTotal bigint,
#             quarters jsonb,
#             officials jsonb,
#             broadcasters jsonb
#         );
#     """
#     conn.execute(sqlq)

### Upload to postgres

In [None]:
with engine.begin() as conn:
    conn.execute("delete from games where season = %s", (season,))
    for game in tqdm(upload):
        game = {key: value for key, value in game.items() if value is not None}
        conn.execute(
            "insert into games ({columns}) values ({values})".format(
                columns=", ".join(game.keys()),
                values=", ".join(["%s"] * len(game))
            ),
            tuple(json.dumps(x) if type(x) in (dict, list) else x for x in game.values())
        )

## Game logs

### Get game dates

In [None]:
with engine.connect() as conn:
    sqlq = """
        with completed_games as (
            select
                distinct (starttime at time zone 'EST')::date as game_date
            from games
            where playedStatus = 'COMPLETED'
            and season = %s
        )
        , date_rank as (
            select
                game_date
                , row_number() over(order by game_date desc) as date_rank
            from
                completed_games
        )
        select game_date
        from date_rank
        where date_rank <= %s
        order by 1 desc;
    """
    completed_game_dates = list(map(lambda x: x[0], conn.execute(sqlq, (season, last_n_days)).fetchall()))

### Download games

In [None]:
output = [
    msf.msf_get_data(league='nba', season=season, feed='daily_player_gamelogs', date=game_date.strftime("%Y%m%d"), format='json', force=True)
    for game_date in tqdm(completed_game_dates)
]

### Parse output

In [None]:
upload = [
    flatten(log)
    for day in output for log in day["gamelogs"]
]

### Create table

In [None]:
# with engine.connect() as conn:
#     pd.DataFrame(upload).to_sql("gamelogs", conn, index=False, if_exists="replace")

### Upload to postgres

In [None]:
with engine.begin() as conn:
    for gamelog in tqdm(upload):
        gamelog = {key: value for key, value in gamelog.items() if value is not None}
        conn.execute(
            "delete from player_gamelogs where game_id = %s and player_id = %s and team_id = %s",
            (gamelog["game_id"], gamelog["player_id"], gamelog["team_id"])
        )
        conn.execute(
            "insert into player_gamelogs ({columns}) values ({values})".format(
                columns=", ".join(gamelog.keys()),
                values=", ".join(["%s"] * len(gamelog))
            ),
            tuple(gamelog.values())
        )

## Game lineup

### Game game ids

In [None]:
with engine.connect() as conn:
    sqlq = """
        with season_games as (
            select *
            from games
            where season = %s
        )
        , completed_games as (
            select
                distinct (starttime at time zone 'EST')::date as game_date
            from season_games
            where playedStatus = 'COMPLETED'
        )
        , date_rank as (
            select
                game_date
                , row_number() over(order by game_date desc) as date_rank
            from
                completed_games
        )
        select distinct id
        from season_games
        where
            (starttime at time zone 'EST')::date
                >= coalesce(
                    (select min(game_date) from date_rank where date_rank <= %s),
                    '1970-01-01'
                )
        order by 1 desc;
    """
    game_ids = list(map(lambda x: x[0], conn.execute(sqlq, (season, last_n_days)).fetchall()))

### Download game lineups

In [None]:
output = [
    msf.msf_get_data(league='nba', season=season, feed='game_lineup', game=game_id, format='json', force=True)
    for game_id in tqdm(game_ids)
]

### Parse output

In [None]:
def parse_game(game):
    output = {key: game.get(key) for key in game.keys() if key not in ("broadcasters", "officials")}
    output = flatten(output)
    output["game_id"] = output.pop("id")
    return output

def parse_team(team):
    output = flatten(team)
    output["team_id"] = output.pop("id")
    output["team_abbreviation"] = output.pop("abbreviation")
    return output

def parse_lineup(lineup):
    print(lineup)
    team = flatten(lineup["team"])
    output = {**team}
    return output

upload = [
    {**parse_game(game["game"]), **parse_team(home_away["team"]), "type": expected_actual, **flatten(player)}
    for game in output
    for home_away in game["teamLineups"]
    for expected_actual in ("expected", "actual") if home_away[expected_actual] is not None
    for player in home_away[expected_actual]["lineupPositions"] if player["player"] is not None
#     for game_id in tqdm([33974])
]

### Create table

In [None]:
# with engine.connect() as conn:
#     pd.DataFrame(upload).to_sql("lineups", conn, index=False, if_exists="replace")

### Upload to postgres

In [None]:
with engine.begin() as conn:
    for player in tqdm(upload):
        player = {key: value for key, value in player.items() if value is not None}
        conn.execute(
            "delete from lineups where game_id = %s and player_id = %s and team_id = %s",
            (player["game_id"], player["player_id"], player["team_id"])
        )
        conn.execute(
            "insert into lineups ({columns}) values ({values})".format(
                columns=", ".join(player.keys()),
                values=", ".join(["%s"] * len(player))
            ),
            tuple(player.values())
        )

## DFS

### Get game dates

In [None]:
with engine.connect() as conn:
    sqlq = """
        with season_games as (
            select *
            from games
            where season = %s
        )
        , completed_games as (
            select
                distinct (starttime at time zone 'EST')::date as game_date
            from season_games
            where playedStatus = 'COMPLETED'
        )
        , date_rank as (
            select
                game_date
                , row_number() over(order by game_date desc) as date_rank
            from
                completed_games
        )
        select distinct (starttime at time zone 'EST')::date as game_date
        from season_games
        where
            (starttime at time zone 'EST')::date
                >= coalesce(
                    (select min(game_date) from date_rank where date_rank <= %s),
                    '1970-01-01'
                )
        order by 1 desc;
    """
    game_dates = list(map(lambda x: x[0], conn.execute(sqlq, (season, last_n_days)).fetchall()))

### Download DFS

In [None]:
output = [
    msf.msf_get_data(league='nba', season=season, feed='daily_dfs', date=game_date.strftime("%Y%m%d"), format='json', force=True)
    for game_date in tqdm(game_dates)
]
output = [x for x in output if x.get("sources") is not None]

### Parse output

In [None]:
upload = [
    {
        "source": source["source"],
        "slate_minGameStart": slate["minGameStart"],
        "slate_date": slate["forDate"],
        "slate_id": str(slate["identifier"]),
        "label": slate["label"],
        "player_id": player["player"]["id"],
        "rosterSlots": player["rosterSlots"],
    }
    for day in output
    for source in day["sources"]
    for slate in source["slates"]
    for player in slate["players"] if player["player"] is not None
]

### Create table

In [None]:
# with engine.connect() as conn:
#     sqlq = """
#         create table if not exists public.dfs
#         (
#             source text,
#             slate_minGameStart timestamp with time zone,
#             slate_date timestamp with time zone,
#             slate_id bigint,
#             label text,
#             player_id bigint,
#             rosterSlots text[]
#         );
#     """
#     conn.execute(sqlq)

### Upload to postgres

In [None]:
with engine.begin() as conn:
    for player in tqdm(upload):
        conn.execute(
            """
                delete from dfs
                where
                    source = %s and slate_id = %s and player_id = %s 
                    and slate_minGameStart = %s and slate_date = %s and label = %s
            """,
            (player["source"], player["slate_id"], player["player_id"], player["slate_minGameStart"], player["slate_date"], player["label"])
        )
        conn.execute(
            "insert into dfs ({columns}) values ({values})".format(
                columns=", ".join(player.keys()),
                values=", ".join(["%s"] * len(player))
            ),
            [f"{{{','.join(x)}}}" if type(x) == list else x for x in player.values()]
        )

## Play-by-play

### Game game ids

In [None]:
with engine.connect() as conn:
    sqlq = """
        select distinct id
        from games
        where playedStatus = 'COMPLETED'
        and season = %s
        order by 1 desc;
    """
    completed_game_ids = list(map(lambda x: x[0], conn.execute(sqlq, (season,)).fetchall()))

### Download play by play

In [None]:
output = [
    msf.msf_get_data(league='nba', season=season, feed='game_playbyplay', game=game_id, format='json', force=True)
    for game_id in tqdm(completed_game_ids)
]

### Parse output

In [None]:
upload = [
    {
        "game_id": game["game"]["id"],
        "playStatus": play["playStatus"],
        "description": play["description"],
        "playType": [x for x in play.keys() if x not in ("description", "playStatus")][0],
        "play": play[[x for x in play.keys() if x not in ("description", "playStatus")][0]],
    }
    for game in output
    for play in game["plays"]
]

### Create table

In [None]:
# with engine.connect() as conn:
#     sqlq = """
#         create table if not exists public.playbyplay
#         (
#             game_id bigint,
#             description text,
#             playType text,
#             playStatus json,
#             play json
#         );
#     """
#     conn.execute(sqlq)

### Upload to postgres

In [None]:
with engine.begin() as conn:
    for play in tqdm(upload):
        conn.execute(
            "delete from playbyplay where game_id = %s", (play["game_id"],)
        )
        conn.execute(
            "insert into playbyplay ({columns}) values ({values})".format(
                columns=", ".join(play.keys()),
                values=", ".join(["%s"] * len(play))
            ),
            tuple(json.dumps(x) if type(x) == dict else x for x in play.values())
        )

## Players

### Download players

In [None]:
output = msf.msf_get_data(league='nba', season=season, feed='players', format='json', force=True)

### Parse output

In [None]:
upload = [
    {
        **flatten({
            key: value 
            for key, value in player["player"].items()
            if key not in ("externalMappings", "socialMediaAccounts")
        }),
        "externalMappings": {src["source"]: src["id"] for src in player["player"].get("externalMappings", [])},
        "socialMediaAccounts": {media["mediaType"]: media["value"] for media in player["player"].get("socialMediaAccounts", [])},
    }
    for player in output["players"]
]

### Create table

In [None]:
# with engine.connect() as conn:
#     sqlq = """
#         create table if not exists public.players
#         (
#             id bigint,
#             firstName text,
#             lastName text,
#             primaryPosition text,
#             jerseyNumber bigint,
#             currentTeam_id bigint,
#             currentTeam_abbreviation text,
#             currentRosterStatus text,
#             height text,
#             weight bigint,
#             birthDate date,
#             age bigint,
#             birthCity text,
#             birthCountry text,
#             rookie bool,
#             highSchool text,
#             college text,
#             handedness_shoots text,
#             officialImageSrc text,
#             currentContractYear bigint,
#             drafted_year bigint,
#             drafted_team_id bigint,
#             drafted_team_abbreviation text,
#             drafted_pickTeam_id bigint,
#             drafted_pickTeam_abbreviation text,
#             drafted_round bigint,
#             drafted_roundPick bigint,
#             drafted_overallPick bigint,
#             currentInjury_description text,
#             currentInjury_playingProbability text,
#             externalMappings json,
#             socialMediaAccounts json
#         );
#     """
#     conn.execute(sqlq)

### Upload to postgres

In [None]:
with engine.begin() as conn:
    for player in tqdm(upload):
        player = {key: value for key, value in player.items() if value is not None}
        conn.execute(
            "delete from players where id = %s",
            (player["id"],)
        )
        conn.execute(
            "insert into players ({columns}) values ({values})".format(
                columns=", ".join(player.keys()),
                values=", ".join(["%s"] * len(player))
            ),
            tuple(json.dumps(x) if type(x) == dict else x for x in player.values())
        )

## Game Lines

### Get game dates

In [None]:
with engine.connect() as conn:
    sqlq = """
        select distinct (starttime at time zone 'EST')::date
        from games
        where season = %s
        order by 1 desc;
    """
    game_dates = list(map(lambda x: x[0], conn.execute(sqlq, (season,)).fetchall()))

### Download game lines

In [None]:
output = [
    msf.msf_get_data(league='nba', season=season, feed='daily_game_lines', date=game_date.strftime("%Y%m%d"), format='json', force=True)
    for game_date in tqdm(game_dates)
]

### Parse outputs

In [None]:
upload = [
    {
        "game_id": game["game"]["id"],
        "source": source["source"]["name"],
        "moneyLines": source["moneyLines"],
        "pointSpreads": source["pointSpreads"],
    }
    for day in output
    for game in day["gameLines"]
    for source in game["lines"]
]

### Create table

In [None]:
# with engine.connect() as conn:
#     sqlq = """
#         create table if not exists public.gamelines
#         (
#             game_id bigint,
#             source text,
#             moneyLines json,
#             pointSpreads json
#         );
#     """
#     conn.execute(sqlq)

### Upload to postgres

In [None]:
with engine.begin() as conn:
    for gameline in tqdm(upload):
        gameline = {key: value for key, value in gameline.items() if value is not None}
        conn.execute(
            "delete from gamelines where game_id = %s and source = %s",
            (gameline["game_id"], gameline["source"])
        )
        conn.execute(
            "insert into gamelines ({columns}) values ({values})".format(
                columns=", ".join(gameline.keys()),
                values=", ".join(["%s"] * len(gameline))
            ),
            tuple(json.dumps(x) if type(x) == list else x for x in gameline.values())
        )