In [1]:
import psycopg2
import pandas as pd
import boto3
from io import StringIO
import json
import psycopg2
from config import config
from sqlalchemy import create_engine

In [3]:
my_bucket = 'formula1-project-bucket'
s3_resource = boto3.resource('s3')

# List all the csv in the bucket
bucket = s3_resource.Bucket(my_bucket)
objects = bucket.objects.all()
object_list = []
for o in objects:
    object_list.append(o.key)

In [4]:
# # Import all the csv in the bucket as dataframe
# dfs = []
# for file_name in object_list:
#     obj = s3_resource.Object(bucket_name=my_bucket, key=file_name)
#     response = obj.get()
#     data = response['Body'].read().decode('utf-8')
#     df = pd.read_csv(StringIO(data),on_bad_lines='skip')
#     dfs.append(df)

# name_list = [object.split('.')[0] for object in object_list]

# for name,df in zip(name_list,dfs):
#     globals()[name] = df

In [7]:
# Import all the csv in the bucket as dataframe
# dfs = []
for file_name in object_list:
    obj = s3_resource.Object(bucket_name=my_bucket, key=file_name)
    response = obj.get()
    data = response['Body'].read().decode('utf-8')
    globals()[file_name.split('.')[0]] = pd.read_csv(StringIO(data),on_bad_lines='skip')

In [8]:
race_result_df = race_result.copy()
race_result_df['Driver'] = race_result_df['Driver'].apply(lambda x: x.replace("'",'"'))
race_result_df['Constructor'] = race_result_df['Constructor'].apply(lambda x: x.replace("'",'"'))
race_result_df['Time'] = race_result_df['Time'].apply(lambda x: str(x).replace("'",'"'))
race_result_df['FastestLap'] = race_result_df['FastestLap'].apply(lambda x: str(x).replace("'",'"'))

In [9]:
constructor_df = race_result_df['Constructor'].apply(lambda x: pd.Series(json.loads(x))).drop_duplicates().reset_index(drop=True)

In [10]:
driver_df = race_result_df['Driver'].apply(lambda x: pd.Series(json.loads(x))).drop_duplicates().reset_index(drop=True)

In [11]:
# add columns of grass dates for slowly changing dimension
driver_df['start_grass_date'] = '2020-01-01'
driver_df['end_grass_date'] = '9999-12-31'
driver_df['is_current'] = 'Yes'

In [12]:
# add current driver number
current_wc = driver_df[driver_df['code']=='VER'].copy()
current_wc['permanentNumber'] = 1
current_wc['start_grass_date'] = '2022-01-01'
driver_df = driver_df._append(current_wc,ignore_index=True)

In [13]:
driver_df.loc[(driver_df['code']=='VER') & (driver_df['permanentNumber']=='33'),['start_grass_date','end_grass_date','is_current']] = ['2020-01-01','2021-12-31','No']

In [14]:
race_result_df['permanentNumber'] = race_result_df['Driver'].apply(lambda x: json.loads(x)['permanentNumber'])
race_result_df['driverId'] = race_result_df['Driver'].apply(lambda x: json.loads(x)['driverId'])
race_result_df['constructorId'] = race_result_df['Constructor'].apply(lambda x: json.loads(x)['constructorId'])

In [15]:
fastest_lap_df = race_result_df.copy()

def is_json(x):
    try:
        json.loads(x)
        return True
    except Exception as e:
        return False

fastest_lap_df = fastest_lap_df[fastest_lap_df['FastestLap'].apply(lambda x: is_json(x))]
fastest_lap_df.drop(columns=['position','positionText','Driver','Constructor','grid','laps','status','Time'],inplace=True)
fastest_lap_df['rank_fastest_lap'] = fastest_lap_df['FastestLap'].apply(lambda x: json.loads(x)['rank'])
fastest_lap_df['lap'] = fastest_lap_df['FastestLap'].apply(lambda x: json.loads(x)['lap'])
fastest_lap_df['time'] = fastest_lap_df['FastestLap'].apply(lambda x: json.loads(x)['Time']['time'])
fastest_lap_df['average_speed'] = fastest_lap_df['FastestLap'].apply(lambda x: json.loads(x)['AverageSpeed']['speed'])

In [49]:
fastest_lap_df.drop(columns=['FastestLap'],inplace=True)

In [16]:
# fixed invalid values in column 'Time'
race_result_df.loc[race_result_df['Time']=='nan','Time'] = {"millis": "", "time": ""}

In [78]:
# def parse_json(x):
#     try:
#         if pd.notna(x):
#             return json.loads(x)
#         else:
#             None
#     except (json.JSONDecodeError, TypeError, KeyError):
#         return None

# race_result_df['absolute_millisecond'] = race_result_df['Time'].apply(lambda x: parse_json(x)['millis'] if isinstance(x, dict) and 'millis' in x.keys() else None)
# race_result_df['relative_time'] = race_result_df['Time'].apply(lambda x: parse_json(x)['relative_time'] if isinstance(x, dict) and 'time' in x.keys() else None)

In [17]:
race_result_df['absolute_millisecond'] = race_result_df['Time'].apply(lambda x: json.loads(x)['millis'] if pd.notnull(x) else None)
race_result_df['relative_time'] = race_result_df['Time'].apply(lambda x: json.loads(x)['time'] if pd.notnull(x) else None)

In [18]:
race_result_df.drop(columns=['Driver','Constructor','Time','FastestLap'],inplace=True)

In [19]:
season_list['Circuit'] = season_list['Circuit'].apply(lambda x: str(x).replace("'",'"'))
season_list['FirstPractice'] = season_list['FirstPractice'].apply(lambda x: str(x).replace("'",'"'))
season_list['SecondPractice'] = season_list['SecondPractice'].apply(lambda x: str(x).replace("'",'"'))
season_list['ThirdPractice'] = season_list['ThirdPractice'].apply(lambda x: str(x).replace("'",'"'))
season_list['Qualifying'] = season_list['Qualifying'].apply(lambda x: str(x).replace("'",'"'))
season_list['Sprint'] = season_list['Sprint'].apply(lambda x: str(x).replace("'",'"'))
season_list['Location'] = season_list['Location'].apply(lambda x: str(x).replace("'",'"'))

In [20]:
season_list['circuitId'] = season_list['Circuit'].apply(lambda x: json.loads(x)['circuitId'])
season_list['circuit_url'] = season_list['Circuit'].apply(lambda x: json.loads(x)['url'])
season_list['circuitName'] = season_list['Circuit'].apply(lambda x: json.loads(x)['circuitName'])
season_list['Location'] = season_list['Circuit'].apply(lambda x: json.loads(x)['Location'])

In [21]:
season_list.drop(columns=['Circuit'],inplace=True)

In [24]:
meeting_info = meeting_info.sort_values(by=['year', 'date_start'], ascending=[True, True])

# Then, use groupby to group the DataFrame by 'year' and calculate the rank
meeting_info['round'] = meeting_info.groupby('year').cumcount()

In [25]:
merged_df = race_result_df.merge(meeting_info[['year','round','meeting_key']],left_on=['season','round'],right_on=['year','round'],how='left')
race_result_df = merged_df[merged_df['meeting_key'].notna()].astype({'meeting_key':'int64'}).drop(columns=['year'])

In [26]:
session_info = session_info.sort_values(by=['year','date_start'])
session_info['round'] = session_info.groupby('year').cumcount()+1
session_info.drop(columns=['circuit_key','circuit_short_name','country_key','country_code','country_name'],inplace=True)

In [27]:
merged_df2 = race_result_df.merge(session_info[['year','round','session_key']],left_on=['season','round'],right_on=['year','round'],how='left')
race_result_df = merged_df2[merged_df2['meeting_key'].notna()].astype({'meeting_key':'int64'}).drop(columns=['year'])

In [28]:
# extract meeting_key column to season_list dataframe
merged_df = season_list.merge(meeting_info[['year','round','meeting_key']],left_on=['season','round'],right_on=['year','round'],how='left')
season_list = merged_df[merged_df['meeting_key'].notna()].astype({'meeting_key':'int64'}).drop(columns=['year'])

In [3]:
dfs = [race_result_df,
        driver_df,
        constructor_df,
        meeting_info,
        session_info,
        weather_info,
        laps_data,
        season_list]



# import configuration
params = config()
# create connection
conn = psycopg2.connect(**params)
# create cursor
cur = conn.cursor()

conn_string = f"postgresql://{params['user']}:{params['password']}@{params['host']}/{params['database']}"
engine = create_engine(conn_string).connect()


def create_table(sql_query):
    try:
        cur.execute(sql_query)
    except Exception as e:
        print(f"Error: {e}")
        print(f"Query: {sql_query}")
        conn.rollback()
    else:
        conn.commit()
        print(f"table has been created")
        
def insert_to_table(dataframe,table_name):
    try:
        dataframe.to_sql(name=table_name,con=engine,if_exists='replace',index=False)
        # conn.autocommit = True
    except Exception as e:
        print(f"Error: {e}")
    else:
        conn.commit()
        print(f"{table_name} has been inserted")

# if __name__ == "__main__":

    # cur.close()
    # conn.close()

In [112]:
pairs = [[race_result_df,'race_result'],
        [laps_data,'laps_data'],
        [weather_info,'weather_data'],
        [fastest_lap_df,'fastest_lap'],
        [driver_df,'dim_driver'],
       [constructor_df,'dim_constructor'],
       [season_list,'dim_season'],
       [session_info,'dim_session'],
        [meeting_info,'dim_meeting']]

for df,tablename in pairs:
    insert_to_table(df,tablename)

race_result has been inserted
laps_data has been inserted
weather_data has been inserted
fastest_lap has been inserted
dim_driver has been inserted
dim_constructor has been inserted
dim_season has been inserted
dim_session has been inserted
dim_meeting has been inserted


In [100]:
# create_table(dim_driver)
# create_table(dim_constructor)
# create_table(dim_meeting)
# create_table(dim_season)
# create_table(dim_session)
# create_table(race_result)
# create_table(laps)
# create_table(weather_data)
# create_table(fastest_lap)

table has been created


In [99]:
dim_driver = """
    CREATE TABLE dim_driver (
        driver_id VARCHAR(100) PRIMARY KEY,
        driver_number INTEGER NOT NULL,
        code VARCHAR(50) UNIQUE NOT NULL,
        url VARCHAR(200),
        given_name VARCHAR(50) NOT NULL,
        family_name VARCHAR(50) NOT NULL,
        birthdate DATE NOT NULL,
        nationality VARCHAR(50) NOT NULL,
        start_grass_date DATE NOT NULL,
        end_grass_date DATE NOT NULL,
        is_current VARCHAR(3) NOT NULL
        );
        """

dim_constructor = """
    CREATE TABLE dim_constructor (
        constructor_id VARCHAR(100) PRIMARY KEY,
        url VARCHAR(200),
        name VARCHAR(50) NOT NULL,
        nationality VARCHAR(50) NOT NULL
        );
        """

dim_meeting = """
    CREATE TABLE dim_meeting (
        meeting_name VARCHAR(100),
        meeting_official_name VARCHAR(100) NOT NULL,
        city VARCHAR(50) NOT NULL,
        race_name VARCHAR(50),
        country_key INTEGER,
        country_code VARCHAR(50),
        country_name VARCHAR(50),
        circuit_key INTEGER,
        circuit_short_name VARCHAR(50),
        date_start DATE,
        gmt_offset TIME,
        meeting_key INTEGER PRIMARY KEY,
        year INTEGER,
        meeting_code VARCHAR(10),
        round INTEGER
        );
        """

dim_season = """
    CREATE TABLE dim_season (
        season INTEGER NOT NULL,
        round INTEGER NOT NULL,
        url VARCHAR(200),
        race_name VARCHAR(100) NOT NULL,
        date DATE,
        time TIME,
        first_practice JSONB,
        second_practice JSONB,
        third_practice JSONB,
        qualifying JSONB,
        sprint JSONB,
        circuit_id VARCHAR(50),
        circuit_url VARCHAR(200),
        circuit_name VARCHAR(50),
        location JSONB,
        meeting_key INTEGER REFERENCES dim_meeting(meeting_key)
        );
        """

dim_session = """
    CREATE TABLE dim_session (
        location VARCHAR(50),
        session_type VARCHAR(50),
        session_name VARCHAR(50),
        date_start VARCHAR(50),
        date_end VARCHAR(50),
        gmt_offset TIME,
        session_key INTEGER PRIMARY KEY,
        meeting_key INTEGER REFERENCES dim_meeting(meeting_key),
        year INTEGER,
        round INTEGER
        );
        """

race_result = """
    CREATE TABLE race_result (
        driver_number INTEGER,
        position INTEGER,
        position_text VARCHAR(50),
        points DECIMAL,
        grid INTEGER,
        laps INTEGER,
        status VARCHAR(50),
        season INTEGER,
        round INTEGER,
        permanent_number VARCHAR(50),
        driver_id VARCHAR(100) REFERENCES dim_driver(driver_id), 
        constructor_id VARCHAR(100) REFERENCES dim_constructor(constructor_id),
        absolute_millisecond VARCHAR(50),
        relative_time VARCHAR(50),
        session_key INTEGER REFERENCES dim_session(session_key),
        meeting_key INTEGER REFERENCES dim_meeting(meeting_key)
        );
        """

laps = """
    CREATE TABLE laps_data (
        meeting_key INTEGER REFERENCES dim_meeting(meeting_key),
        session_key INTEGER REFERENCES dim_session(session_key),
        driver_number INTEGER,
        i1_speed DECIMAL,
        i2_speed DECIMAL,
        st_speed DECIMAL,
        date_start VARCHAR(50),
        lap_duration FLOAT,
        is_pit_out_lap BOOLEAN,
        duration_sector_1 DECIMAL,
        duration_sector_2 DECIMAL,
        duration_sector_3 DECIMAL,
        segments_sector_1 VARCHAR(100),
        segments_sector_2 VARCHAR(100),
        segments_sector_3 VARCHAR(100),
        lap_number INTEGER
        );
        """

weather_data = """
    CREATE TABLE weather_data (
        meeting_key INTEGER REFERENCES dim_meeting(meeting_key),
        session_key INTEGER REFERENCES dim_session(session_key),
        date VARCHAR(50),
        air_temperature DECIMAL,
        humidity DECIMAL,
        pressure DECIMAL,
        rainfall INTEGER,
        track_temperature DECIMAL,
        wind_direction INTEGER,
        wind_speed DECIMAL
        );
        """

fastest_lap = """
    CREATE TABLE fastest_lap (
        driver_number INTEGER,
        points DECIMAL,
        season INTEGER,
        round INTEGER,
        permanent_number VARCHAR(50),
        driver_id VARCHAR(100) REFERENCES dim_driver(driver_id),
        constructor_id VARCHAR(100) REFERENCES dim_constructor(constructor_id),
        rank_fastest_lap VARCHAR(50),
        lap VARCHAR(50),
        time VARCHAR(50),
        average_speed VARCHAR(50)
        );
        """

In [113]:
cur.close()
conn.close()

In [None]:
# def connect():
#     try:
#         conn = None
#         params = config()
#         conn = psycopg2.connect(**params)
#         conn.set_session(autocommit=True)

#         # create a cursor
#         cur = conn.cursor() 

#         # close connection to f1_db database
#         conn.close()

#         return cur, conn