<a href="https://colab.research.google.com/github/varunpav/ETL_pipeline/blob/main/DS2002_ETL_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Import statements for libs that we need


In [43]:
pip install psycopg2-binary



In [44]:
import pandas as pd
import sqlite3
import requests
import psycopg2

### Create the Target DB Schema


DB SCHEMA:

     Teams - API - https://rapidapi.com/api-sports/api/api-nba/
        TeamID, Name, City, Conference, Division
     
     Players - CSV - https://www.kaggle.com/code/nathanlauga/nba-games-eda-let-s-dive-into-the-data/input?select=teams.csv
        PlayerID, Name

     PlayerSeason - CSV - https://www.kaggle.com/code/nathanlauga/nba-games-eda-let-s-dive-into-the-data/input?select=teams.csv
        ID, PlayerID, TeamID, Season

     Games - CSV - https://www.kaggle.com/code/nathanlauga/nba-games-eda-let-s-dive-into-the-data/input?select=teams.csv
        GameID, GameDate, HomeTeamID, VisitorTeamID, HomeTeamWins

     Stats - CSV - https://www.kaggle.com/code/nathanlauga/nba-games-eda-let-s-dive-into-the-data/input?select=teams.csv
        StatID, GameID, TeamID, Points, FGM, FGA, FG_PCT, FT_PCT, FG3M, FG3PCT, Assists, Rebounds

---
The tables below are unable to be populated due to the remote DB going offline during development. I did not have time to recreate the DB into a local copy and access it so I populated another table using a CSV.

     Rankings - DB - https://github.com/mpope9/nba-sql
        Year, TeamID, Score, Games

     PlayerMap - DB - https://github.com/mpope9/nba-sql
        RankingID, PlayerID, Season, gp_rank, fgm_rank, fg3m_rank, reb_rank, ast_rank, blk_rank, dd2_rank, td3_rank, nba_fantasy_pts_rank
---
Deployment Ideology:
Many things happened across the project such as splitting the Players table into the Players and PlayerSeason table due to issue with repeating primary keys but the ideology of the ETL is to use the CSV data that comes with its own primary keys and adapt those to my DB and utilize the related CSV to populate my db while using the API to populate the Teams and use the TeamID primary key it generated to reference other tables in the schema. Finally the idea was to use the nba sql remote db to populate a rankings table that would take a season and display what rank a player was in a category that I would have matched up on Player Name rather than PlayerID but trying to make PlayerName a unique identifier backfired. In the end, the remote DB stopped working forcing me to keep the code I used while not implementing it in my final structure and relying on CSV data much more to populate the rest of my tables. The API is used seamlessly to get every NBA team (excluding international teams) but in the end. the PlayerSeason Table does not work.

In [45]:
# Function to create database and tables
def create_database_schema():
    conn = sqlite3.connect('nba_data.db')
    c = conn.cursor()

    # Teams
    c.execute('''
    CREATE TABLE IF NOT EXISTS Teams (
        TeamID INTEGER PRIMARY KEY,
        Name TEXT,
        City TEXT,
        Conference TEXT,
        Division TEXT
    )
    ''')

    # Players
    c.execute('''
    CREATE TABLE IF NOT EXISTS Players (
        PlayerID INTEGER PRIMARY KEY,
        PlayerName TEXT
    )
    ''')

    # PlayerSeason
    c.execute('''
    CREATE TABLE IF NOT EXISTS PlayerSeasons (
        ID INTEGER PRIMARY KEY AUTOINCREMENT,
        TeamID INTEGER,
        PlayerID INTEGER,
        Season INTEGER,
        FOREIGN KEY (PlayerID) REFERENCES Players(PlayerID),
        FOREIGN KEY (TeamID) REFERENCES Teams(TeamID)
    )
    ''')


    # Games
    c.execute('''
    CREATE TABLE IF NOT EXISTS Games (
        GameID INTEGER PRIMARY KEY,
        GameDate DATE,
        HomeTeamID INTEGER,
        VisitorTeamID INTEGER,
        HomeTeamWins TEXT,
        FOREIGN KEY (HomeTeamID) REFERENCES Teams(TeamID),
        FOREIGN KEY (VisitorTeamID) REFERENCES Teams(TeamID)
    )
    ''')

    # Stats
    c.execute('''
    CREATE TABLE IF NOT EXISTS Stats (
        StatID INTEGER PRIMARY KEY AUTOINCREMENT,
        GameID INTEGER,
        TeamID INTEGER,
        PlayerID INTEGER,
        Points INTEGER,
        FGM INTEGER,
        FGA INTEGER,
        FGpct REAL,
        FTpct REAL,
        FG3M INTEGER,
        FG3pct REAL,
        Assists INTEGER,
        Rebounds INTEGER,
        FOREIGN KEY (GameID) REFERENCES Games(GameID),
        FOREIGN KEY (TeamID) REFERENCES Teams(TeamID),
        FOREIGN KEY (PlayerID) REFERENCES Players(PlayerID)
    )
    ''')

    # Rankings
    c.execute('''
    CREATE TABLE Rankings (
        RankingID INTEGER PRIMARY KEY AUTOINCREMENT,
        PlayerID INTEGER,
        Season INTEGER,
        gp_rank INTEGER,
        fgm_rank INTEGER,
        fg3m_rank INTEGER,
        reb_rank INTEGER,
        ast_rank INTEGER,
        blk_rank INTEGER,
        dd2_rank INTEGER,
        td3_rank INTEGER,
        nba_fantasy_pts_rank INTEGER,
        FOREIGN KEY (PlayerID) REFERENCES Players(PlayerID)
   )
   ''')

    # PlayerMap
    c.execute('''
    CREATE TABLE PlayerMapping (
        PlayerName TEXT PRIMARY KEY,
        ExternalID INTEGER,
        LocalID INTEGER
    )
    ''')

    conn.commit()
    conn.close()

### Load data from the CSV

In [None]:
# CSV was taken from: https://www.kaggle.com/code/nathanlauga/nba-games-eda-let-s-dive-into-the-data/input

# Load games data from CSV into the database
def load_csv_into_db(games, stats, players):
   # Load dataframes
    games_df = pd.read_csv(games)
    stats_df = pd.read_csv(stats)
    players_df = pd.read_csv(players)

    # Games table
    games_df['HOME_TEAM_WINS'] = games_df['HOME_TEAM_WINS'].map({1: 'Win', 0: 'Loss'})
    games_to_insert = games_df[['GAME_ID', 'GAME_DATE_EST', 'HOME_TEAM_ID', 'VISITOR_TEAM_ID', 'HOME_TEAM_WINS']]
    games_to_insert.columns = ['GameID', 'GameDate', 'HomeTeamID', 'VisitorTeamID', 'HomeTeamWins']

    # Stats table
    stats_to_insert = stats_df[['GAME_ID', 'TEAM_ID', 'PLAYER_ID', 'PTS', 'FGM', 'FGA', 'FG_PCT', 'FT_PCT', 'FG3M', 'FG3_PCT', 'AST', 'REB']]
    stats_to_insert.columns = ['GameID', 'TeamID', 'PlayerID', 'Points', 'FGM', 'FGA', 'FGpct', 'FTpct', 'FG3M', 'FG3pct', 'Assists', 'Rebounds']

    # Players table
    unique_players = players_df[['PLAYER_ID', 'PLAYER_NAME']].drop_duplicates()
    unique_players.columns = ['PlayerID', 'PlayerName']

    # PlayerSeason table
    player_seasons = players_df[['TEAM_ID', 'PLAYER_ID', 'SEASON']]
    player_seasons.columns = ['TeamID', 'PlayerID', 'Season']

    conn = sqlite3.connect('nba_data.db')


    # Insert into database
    unique_players.to_sql('Players', conn, if_exists='append', index=False)
    stats_to_insert.to_sql('Stats', conn, if_exists='append', index=False)
    # seperated games insert to a seperate block due to many errors
    insert_games(games_df, conn)
    player_seasons.to_sql('PlayerSeasons', conn, if_exists='append', index=False)

    conn.commit()
    conn.close()

def insert_games(games_df, conn):
    games_to_insert = games_df[['GAME_ID', 'GAME_DATE_EST', 'HOME_TEAM_ID', 'VISITOR_TEAM_ID', 'HOME_TEAM_WINS']]
    games_to_insert.columns = ['GameID', 'GameDate', 'HomeTeamID', 'VisitorTeamID', 'HomeTeamWins']

    insert_query = """
    INSERT OR IGNORE INTO Games (GameID, GameDate, HomeTeamID, VisitorTeamID, HomeTeamWins)
    VALUES (?, ?, ?, ?, ?);
    """

    for _, row in games_to_insert.iterrows():
        conn.execute(insert_query, tuple(row))
    conn.commit()




### Load data from the API

In [47]:
# Placeholder function to fetch and insert API data
def fetch_and_insert_api_data(api_key):
    # This function will fetch data from the NBA API using the provided API key
    # and insert it into the database. Implement based on API endpoints available.

    url = "https://api-nba-v1.p.rapidapi.com/seasons"

    headers = {
	    "X-RapidAPI-Key": api_key,
	    "X-RapidAPI-Host": "api-nba-v1.p.rapidapi.com"
    }

    response = requests.get(url, headers=headers)
    print(response.json())

    pass

# Accessing the data
def fetch_teams_data(api_key):
    url = "https://api-nba-v1.p.rapidapi.com/teams"
    headers = {
        'x-rapidapi-key': api_key,
        'x-rapidapi-host': 'api-nba-v1.p.rapidapi.com'
    }
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()
        print("API response received:", data)
        return data
    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
    except Exception as err:
        print(f"An error occurred: {err}")
    return None

# Using accessed data to populate table
def insert_teams_data(conn, teams_data):
    for team in teams_data:
        try:
            # Filter out non-NBA franchise teams and international teams
            if team['nbaFranchise']:
                team_id = team['id']
                name = team['name']
                city = team.get('city', '')
                conference = team['leagues']['standard'].get('conference', None)
                division = team['leagues']['standard'].get('division', None)

                insert_query = '''INSERT OR IGNORE INTO Teams (TeamID, Name, City, Conference, Division)
                                  VALUES (?, ?, ?, ?, ?);'''
                data_tuple = (team_id, name, city, conference, division)
                conn.execute(insert_query, data_tuple)

        except KeyError as e:
            # Handle cases where the expected keys are not found
            print(f"Key error: {e} - possibly missing data in response for team {name}")
        except Exception as e:
            # Handle other exceptions
            print(f"Failed to insert data for team {name}: {e}")
    conn.commit()


### Load data from local DB

In [48]:
def fetch_remote_player_ids():

    # Establish connection to remote DB: https://github.com/mpope9/nba-sql
    # Connection no longer works as of 4/6/24 however worked 4/5/24 night.
    conn = psycopg2.connect(
      dbname="nba",
      user="nba_sql",
      password="nba_sql",
      host="localhost"
    )

    # Select data that I want from remote players
    cur = conn.cursor()
    cur.execute("SELECT player_name, player_id FROM player")
    remote_players = {name: player_id for name, player_id in cur.fetchall()}
    cur.close()
    conn.close()
    return remote_players

def fetch_local_player_ids():

    # Select local players with the PlayerID to match the foreign ones to
    conn = sqlite3.connect('nba_data')
    cur = conn.cursor()
    cur.execute("SELECT PlayerName, PlayerID FROM Players")
    local_players = {name: player_id for name, player_id in cur.fetchall()}
    cur.close()
    conn.close()
    return local_players

def map_and_insert_player_ids():
    # Maps the remote players to the local ones through a player mapping table
    # Essentially a bridge between the remote and local DB
    remote_players = fetch_remote_player_ids()
    local_players = fetch_local_player_ids()

    player_mapping = [
        (name, remote_players[name], local_players[name])
        for name in set(remote_players) & set(local_players)
    ]

    # Add teh created mapping to the schema
    conn = sqlite3.connect('nba_data.db')
    c = conn.cursor()
    c.executemany(
        "INSERT OR IGNORE INTO PlayerMapping (PlayerName, ExternalPlayerID, LocalPlayerID) VALUES (?, ?, ?)",
        player_mapping
    )
    conn.commit()
    conn.close()

def rankings_players_data():

    # Connect to the online nba-sql database
    online_conn = psycopg2.connect(
    dbname="nba",
    user="nba_sql",
    password="nba_sql",
    host="localhost"
    )


    # Select then insert data from the DB into our local DB
    online_cursor = online_conn.cursor()

    # Perform your SELECT query on the online database
    online_cursor.execute("""
        SELECT
            pl.player_name,
            pgtt.season_id,
            pgtt.gp_rank,
            pgtt.fgm_rank,
            pgtt.fg3m_rank,
            pgtt.reb_rank,
            pgtt.ast_rank,
            pgtt.blk_rank,
            pgtt.dd2_rank,
            pgtt.td3_rank,
            pgtt.nba_fantasy_points_rank
            FROM player_general_traditional_total pgtt
            JOIN player pl ON pgtt.player_id = pl.player_id;
    """)

    # Fetch the data
    online_data = online_cursor.fetchall()

    # Now, connect to your local SQLite database
    local_conn = sqlite3.connect('nba_data.db')
    local_cursor = local_conn.cursor()

    # Insert the data into the local database
    for row in online_data:
        local_cursor.execute("""
            INSERT INTO Rankings (PlayerID, PlayerName, gp_rank, fgm_rank, fg3m_rank, reb_rank, ast_rank, blk_rank, dd2_rank, td3_Rank, nba_fantasy_pts_rank)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
        """, row)

    # Update PlayerID's
    update_player_id_sql = '''
    UPDATE Players
    SET PlayerID = (SELECT ExternalPlayerID FROM PlayerMapping WHERE PlayerMapping.PlayerName = Players.PlayerName)
    WHERE EXISTS (
    SELECT 1 FROM PlayerMapping WHERE PlayerMapping.PlayerName = Players.PlayerName
    );
    '''

    # Execute the SQL command using your database connection
    local_conn.execute(update_player_id_sql)

    # Commit the changes and close the connections
    local_conn.commit()
    local_conn.close()
    online_conn.close()

#Putting it all together


In [57]:
def main():
    create_database_schema()

    games = 'games.csv'
    players = 'players.csv'
    stats = 'games_details.csv'
    load_csv_into_db(games, stats, players)

    api_key = "bfc1be55edmshb834ba74c5c6eb8p1ae233jsn9d280909012f" # TODO: POPULATE
    teams_data = fetch_teams_data(api_key)
    if teams_data and 'response' in teams_data:
      with sqlite3.connect('nba_data.db') as conn:
        insert_teams_data(conn, teams_data['response'])
    else:
      print("Failed to fetch teams data or data structure is unexpected.")

    # I am unable to access the Remote DB despite having proper functionality
    # I am met with this message: (Instead I have pivoted and used an extra CSV)
      # OperationalError: connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
	    # Is the server running on that host and accepting TCP/IP connections?
      # connection to server at "localhost" (::1), port 5432 failed: Cannot assign requested address
	    # Is the server running on that host and accepting TCP/IP connections?

    #map_and_insert_player_ids()
    #rankings_data(players)


# Display everything testing method (does not count for my official SQL query requirement)
def display_first_10_rows_of_each_table():
    conn = sqlite3.connect('nba_data.db')
    cursor = conn.cursor()

    # List of tables (excluded ranking and playermap due to remote DB not func)
    tables = ['Teams', 'Players', 'Games', 'Stats',]

    for table in tables:
        print(f"Displaying first 10 rows from {table} table:")
        try:
            cursor.execute(f"SELECT * FROM {table} LIMIT 10")
            rows = cursor.fetchall()
            for row in rows:
                print(row)
        except Exception as e:
            print(f"An error occurred while fetching data from {table}: {e}")
        print("\n")

    conn.close()

def delete_tables():
    conn = sqlite3.connect('nba_data.db')
    c = conn.cursor()
    c.execute('DROP TABLE IF EXISTS Rankings')
    c.execute('DROP TABLE IF EXISTS PlayerMapping')
    c.execute('DROP TABLE IF EXISTS Players')
    c.execute('DROP TABLE IF EXISTS PlayerSeason')
    conn.commit()
    conn.close()

def sql():
    # Connect to the SQLite database
    conn = sqlite3.connect('nba_data.db')
    cursor = conn.cursor()

    # SQL query as defined above
    query = """
    SELECT
      Players.PlayerName,
      COUNT(DISTINCT Games.GameID) AS GamesPlayed,
      AVG(Stats.Points) AS AvgPoints,
      AVG(Stats.Assists) AS AvgAssists,
      AVG(Stats.Rebounds) AS AvgRebounds
    FROM
      Stats
    JOIN
      Players ON Stats.PlayerID = Players.PlayerID
    JOIN
      Games ON Stats.GameID = Games.GameID
    GROUP BY
      Players.PlayerName
    ORDER BY
      AvgPoints DESC
    LIMIT 20;
    """

    try:
        # Execute the SQL query
        cursor.execute(query)

        # Fetch all the results
        results = cursor.fetchall()

        # Print the results
        print(f"{'Player Name':25} | {'Games Played':12} | {'Avg Points':10} | {'Avg Assists':10} | {'Avg Rebounds':10}")
        print("-" * 70)
        for row in results:
          player_name, games_played, avg_points, avg_assists, avg_rebounds = row
          print(f"{player_name:25} | {games_played:12} | {avg_points:<10.2f} | {avg_assists:<10.2f} | {avg_rebounds:<10.2f}")


    except sqlite3.Error as e:
        print("An error occurred:", e)

    finally:
        # Close the database connection
        conn.close()

# Main script execution
if __name__ == "__main__":
    delete_tables() #Having issues after changing schema over and over
    main()
    display_first_10_rows_of_each_table()
    sql()

API response received: {'get': 'teams/', 'parameters': [], 'errors': [], 'results': 66, 'response': [{'id': 1, 'name': 'Atlanta Hawks', 'nickname': 'Hawks', 'code': 'ATL', 'city': 'Atlanta', 'logo': 'https://upload.wikimedia.org/wikipedia/fr/e/ee/Hawks_2016.png', 'allStar': False, 'nbaFranchise': True, 'leagues': {'standard': {'conference': 'East', 'division': 'Southeast'}, 'vegas': {'conference': 'summer', 'division': None}, 'utah': {'conference': 'East', 'division': 'Southeast'}, 'sacramento': {'conference': 'East', 'division': 'Southeast'}}}, {'id': 2, 'name': 'Boston Celtics', 'nickname': 'Celtics', 'code': 'BOS', 'city': 'Boston', 'logo': 'https://upload.wikimedia.org/wikipedia/fr/thumb/6/65/Celtics_de_Boston_logo.svg/1024px-Celtics_de_Boston_logo.svg.png', 'allStar': False, 'nbaFranchise': True, 'leagues': {'standard': {'conference': 'East', 'division': 'Atlantic'}, 'vegas': {'conference': 'summer', 'division': None}, 'utah': {'conference': 'East', 'division': 'Atlantic'}, 'sacra