In [33]:
import requests
from datetime import datetime
import json
import psycopg2 
import pandas as pd 
from sqlalchemy import create_engine

In [34]:
# adapter Layer

def read_json_to_df(raw_json):
    df = pd.DataFrame()
    for team in raw_json["response"]:
        df_info = pd.json_normalize(team["team"])
        df_ven = pd.json_normalize(team["venue"])
        df_ven["id_team"] = team["team"]["id"]
        df_delta = pd.merge(df_info, df_ven, left_on='id', right_on='id_team', how='left')
        df = pd.concat([df, df_delta], ignore_index=True)
    return df

def return_objects(url, querystring, headers):
    dict_obj = requests.get(url, headers=headers, params=querystring)
    dict_obj = dict_obj.json()
    qs_temp = querystring
    max_page = dict_obj["paging"]["total"]
    current_page = 1
    objects = []
    while current_page <= max_page:
        objects.append(dict_obj)
        current_page = current_page + 1
        if current_page <= max_page:
            qs_temp = querystring
            qs_temp.update({'page':str(current_page)})
            print(qs_temp)
            dict_obj = requests.get(url, headers=headers, params=qs_temp)
            dict_obj = dict_obj.json()
    return objects

def write_df_to_postgres(df, conn_string, table_name, schema_name):
    db = create_engine(conn_string) 
    conn = db.connect()
    df.to_sql(table_name, con=conn, schema=schema_name, if_exists='replace', index=False) 
    conn1 = psycopg2.connect(conn_string) 
    conn1.autocommit = True
    conn.close()
    conn1.close()
    return True

In [35]:
# application Layer

def extract(objects):
    df = pd.concat([read_json_to_df(raw_json) for raw_json in objects], ignore_index=True)
    return df

def transform(df):
    df = df.drop(columns=['name_y', 'address', 'city', 'capacity', 'surface', 'image',
       'id_team'])
    df = df.rename(columns={'id_x':'team_id','name_x':'name','id_y':'venue_id'})
    ct = datetime.now()
    df['load_timestamp'] = ct
    return df

def load(df, conn_string, table_name, schema_name):
    write_df_to_postgres(df, conn_string, table_name, schema_name)
    return True

def etl(objects, conn_string, table_name, schema_name):
    df = extract(objects)
    df = transform(df)
    load(df, conn_string, table_name, schema_name)
    return True

In [36]:
# main application entrypoint

def main():

    # params/configs
    # later read config
    headers = {
	    "X-RapidAPI-Key": "[key value]",
	    "X-RapidAPI-Host": "api-football-v1.p.rapidapi.com"
    }
    url = "https://api-football-v1.p.rapidapi.com/v3/teams"
    conn_string = 'postgresql://api_football_user:tkilper42@127.0.0.1/api_football_db'
    table_name = 'dim_teams'
    schema_name = 'Staging'
    querystring = {"league":"39","season":"2023"}

    # init

    # run application
    objects = return_objects(url, querystring, headers)
    etl(objects, conn_string, table_name, schema_name)

In [37]:
# run

main()