# ETL Formula 1 - Luciano Alessi

# **Data Extraction**

Data extraction will be performed from the API: Ergast

### Explanation of the code:

**Extracted Data:**

1. Data of all drivers in the history of F1.
2. Race results, available at the end of each weekly or biweekly race.
3. Lap times: time taken by each driver to complete each lap in every race of the season.
4. Driver championship statistics: points accumulated by each driver in the championship after each race.

The functions used to perform these extractions are found in utils_db.py

**Storage in Parquet format in the bronze layer of a datalake:**

- Directory creation: Before storing the data, the existence of specified directories is ensured. If they do not exist, they are created automatically.
- Data storage: Data is stored in Parquet format in the bronze layer of a datalake. For temporary data such as lap times, it is partitioned by season, race (round), and driver. The rest of the data is partitioned by season and race (round).



### Code:

Import libraries and functions from utils_db.py

In [1]:
from utils_db import *

Data Extraction and Storage in the Bronze Layer of the Data Lake:

In [2]:
# Definimos los endpoints de la API y otros parámetros
API_URL = "http://ergast.com/api/f1"
ENDPOINT_DRIVERS = "drivers.json"
LIMIT_PARAMS = {"limit": 1000}
SEASON = datetime.now().year

# Ruta para guardar la fecha de la última extracción incremental
LAST_EXTRACTION_DATE_FILE = "datalake/bronze/ergast_api/last_extraction_date.txt"

# Obtenemos la fecha de la última extracción incremental
last_extraction_date = get_last_extraction_date(LAST_EXTRACTION_DATE_FILE)
current_date = datetime.now()

# Verificamos si la extracción incremental debe ejecutarse (una vez por día)
should_run_incremental = last_extraction_date is None or (current_date - last_extraction_date).days >= 1

# Extraemos datos de pilotos y los guardamos en formato Parquet
drivers_df = get_drivers(API_URL, ENDPOINT_DRIVERS, LIMIT_PARAMS)
if drivers_df is not None:
    save_to_parquet(drivers_df, "datalake/bronze/ergast_api/drivers/data.parquet")

# Extraemos datos de tiempos de vuelta y los guardamos en formato Parquet si es que debe ejecutarse la extracción incremental
if should_run_incremental:
    lap_times_data = get_lap_times(API_URL, SEASON, LIMIT_PARAMS, last_extraction_date)
    lap_times_df = build_table(lap_times_data)
    if not lap_times_df.empty:
        save_to_parquet(lap_times_df, "datalake/bronze/ergast_api/lap_times", ["season", "round", "driverId"])

# Extraemos datos de resultados de carreras y los guardamos en formato Parquet si debe ejecutarse la extracción incremental
if should_run_incremental:
    race_results_data = get_race_results(API_URL, SEASON, LIMIT_PARAMS, last_extraction_date)
    race_results_df = build_table(race_results_data)
    if not race_results_df.empty:
        save_to_parquet(race_results_df, "datalake/bronze/ergast_api/race_results", ["season", "round"])

# Extraemos clasificación de pilotos y los guardamos en formato Parquet si debe ejecutarse la extracción incremental
if should_run_incremental:
    standings_data = get_driver_standings(API_URL, SEASON, last_extraction_date=last_extraction_date)
    standings_df = build_table(standings_data)
    if not standings_df.empty:
        save_to_parquet(standings_df, "datalake/bronze/ergast_api/driver_standings", ["season", "round"])

# Actualizamos la fecha de la última extracción incremental si se obtuvieron nuevos datos
if should_run_incremental and not lap_times_df.empty:
    last_extraction_date_str = lap_times_df["date"].max()
    last_extraction_date = datetime.strptime(last_extraction_date_str, '%Y-%m-%d')
    update_last_extraction_date(LAST_EXTRACTION_DATE_FILE, last_extraction_date)

# Imprimimos un mensaje indicando que la extracción y el guardado de datos se completaron
print("Extracción y guardado de datos completados.")


Extracción y guardado de datos completados.



# **Data Transformation**

### Transformations Applied:

The following base transformations have been applied to the four tables (`drivers`, `lap_times`, `race_results`, `driver_standings`) as appropriate:

1. **Duplicate removal.**
2. **Null value removal.**
3. **Column type conversion for optimization.**
4. **Column renaming.**
5. **Formatting of date type columns.**

### Lap Times Table `lap_times`

**Creation of two new columns using `groupby`**:
1. **Driver's Fastest Lap**: The best lap time of a driver in the race.
2. **Race Fastest Lap**: The best lap time of all drivers in the race.

Note: The driver with the fastest lap in a race earns an extra championship point.

### Driver Standings Table `driver_standings`

Addition of three new columns:

1. **Total Wins**: Calculated from the `race_results` table and merged into the standings table.
2. **Total Podiums**: Calculated from the `race_results` table and merged into the standings table.
3. **Total Fastest Laps**: Calculated from the `lap_times` table and merged into the standings table.


### Code:

In [3]:
# 1-  Datos de pilotos

# Leer los archivos Parquet generados en la parte 1
drivers_df = pd.read_parquet("datalake/bronze/ergast_api/drivers/data.parquet")
print(drivers_df)
print(drivers_df.info(memory_usage='deep'))

# Eliminación de duplicados
drivers_df = drivers_df.drop_duplicates()

# Eliminación de Nulos
imputation_mapping = {
    "permanentNumber": -1,
    "code": "Unknown"
}
drivers_df = drivers_df.fillna(imputation_mapping)

# Conversiones de tipo de columna
conversion_mapping = {
    "nationality": "category",
    "dateOfBirth":"datetime64[ns]",
    "permanentNumber": "int16"
    }
drivers_df = drivers_df.astype(conversion_mapping)

# Renombramos columnas
drivers_df.rename(columns={
    "driverId": "driver_id",
    "url": "profile_url",
    "dateOfBirth": "date_of_birth",
    "permanentNumber": "permanent_number"
}, inplace=True)

# Formatear columnas de tipo fecha
drivers_df['date_of_birth'] = pd.to_datetime(drivers_df['date_of_birth'], format='%Y-%m-%d')

# Imprimir el resultado final
print(drivers_df.head())
print(drivers_df.info(memory_usage='deep'))


# 2- Tiempos de vuelta

# Accedemos a los datos
lap_times_df = pd.read_parquet("datalake/bronze/ergast_api/lap_times")
print(lap_times_df)
print(lap_times_df.info(memory_usage='deep'))

# Eliminamos duplicados
lap_times_df = lap_times_df.drop_duplicates()

# Conversiones de tipo de columna
conversion_mapping = {
    "country": "category",
    "circuit_name": "category",
    "lap_number": "int8",
    "position": "int8",
    "season": "int16",
    "round": "int8",
    "driverId": "category"
    }
lap_times_df = lap_times_df.astype(conversion_mapping)

# Formatear columnas de tipo fecha
lap_times_df['date'] = pd.to_datetime(lap_times_df['date'], format='%Y-%m-%d')

# Renombrar columnas
lap_times_df.rename(columns={
    "driverId": "driver_id",
    "time": "lap_time"
}, inplace=True)

# Creacion de nuevas columnas
# Tiempo de la vuelta más rápida de cada piloto en cada ronda(booleano)
lap_times_df['fastest_lap_of_driver'] = (lap_times_df['lap_time'] == lap_times_df.groupby(['driver_id', 'season', 'round'])['lap_time'].transform('min'))
# Tiempo de vuelta mas rapida de la carrera(booleano)
lap_times_df['fastest_lap_of_race'] = (lap_times_df['lap_time'] == lap_times_df.groupby(['season', 'round'])['lap_time'].transform('min'))

#Filtros para busquedas:
# Filtra la vuelta más rápida de cada piloto en cada round
fastest_lap_per_driver = lap_times_df.groupby(['driver_id', 'season', 'round'])['lap_time'].min().reset_index()
# Filtra las vueltas más rápidas por round
fastest_laps_by_round = lap_times_df[lap_times_df['fastest_lap_of_race']]

# Imprimir el resultado final
print(lap_times_df)
print(lap_times_df.info(memory_usage='deep'))


# 3- Race results

race_results_df = pd.read_parquet("datalake/bronze/ergast_api/race_results")
print(race_results_df)
print(race_results_df.info(memory_usage='deep'))

# Eliminación de duplicados
race_results_df = race_results_df.drop_duplicates()

# Eliminación de Nulos
imputation_mapping = {
    "time": "no time"
}
race_results_df = race_results_df.fillna(imputation_mapping)

#Conversiones de tipo de columna
conversion_mapping = {
    "race_name": "category",
    "circuit_name": "category",
    "driver": "category",
    "position": "int8",
    "points": "int8",
    "season": "int16",
    "round": "int8"
    }
race_results_df = race_results_df.astype(conversion_mapping)

# Formatear columnas de tipo fecha
race_results_df['date'] = pd.to_datetime(race_results_df['date'], format='%Y-%m-%d')

# Renombrar columnas
race_results_df.rename(columns={"driver": "driver_id"}, inplace=True)

# Imprimir el resultado final
print(race_results_df)
print(race_results_df.info(memory_usage='deep'))


# 4 - drivers standings

driver_standings_df = pd.read_parquet("datalake/bronze/ergast_api/driver_standings")
print(driver_standings_df)
print(driver_standings_df.info(memory_usage='deep'))

# Eliminación de duplicados
driver_standings_df = driver_standings_df.drop_duplicates()

#Conversiones de tipo de columna
conversion_mapping = {
    "driver": "category",
    "position": "int8",
    "points": "int16",
    "season": "int16",
    "round": "int8"
    }
driver_standings_df = driver_standings_df.astype(conversion_mapping)

# Renombrar columnas
driver_standings_df.rename(columns={
    "round": "total_rounds",
    "driver": "driver_id"
}, inplace=True)

# Agregar Columna de victorias y podios en la tabla drivers standings (calculamos el total desde la tabla race result y la agregamos a la tabla drivers standings con un merge)
# 1. Contar las victorias por piloto y temporada
victories_per_driver = race_results_df[race_results_df['position'] == 1].groupby(['season', 'driver_id'])['position'].count().reset_index()
victories_per_driver = victories_per_driver.rename(columns={'position': 'victories'})

# 2. Contar los podios por piloto y temporada
podiums_per_driver = race_results_df[race_results_df['position'] <= 3].groupby(['season', 'driver_id'])['position'].count().reset_index()
podiums_per_driver = podiums_per_driver.rename(columns={'position': 'podiums'})

# 3. Combinamos estos datos del DataFrame de race_results_df con el DataFrame de driver_standings
driver_standings_df = pd.merge(driver_standings_df, victories_per_driver, on=['season', 'driver_id'], how='left')
driver_standings_df = pd.merge(driver_standings_df, podiums_per_driver, on=['season', 'driver_id'], how='left')

# 4. Rellenar NaN con 0 porque algunos pilotos pueden no tener victorias o podios
driver_standings_df['victories'] = driver_standings_df['victories'].fillna(0).astype(int)
driver_standings_df['podiums'] = driver_standings_df['podiums'].fillna(0).astype(int)


# Agregamos la columna total de vueltas rapidas desde la tabla de laps times a la tabla de drivers standings
# 1. Calcular las vueltas rápidas por piloto y temporada
fastest_laps_per_driver = lap_times_df[lap_times_df['fastest_lap_of_race']].groupby(['season', 'driver_id']).size().reset_index(name='fastest_laps')

# 2. Diccionario de mapeo para estandarizar los nombres de los pilotos y que coincidan en las dos tablas
driver_mapping = {
    'max_verstappen': 'Verstappen',
    'leclerc': 'Leclerc',
    'norris': 'Norris',
    'sainz': 'Sainz',
    'perez': 'Pérez',
    'piastri': 'Piastri',
    'russell': 'Russell',
    'hamilton': 'Hamilton',
    'alonso': 'Alonso',
    'tsunoda': 'Tsunoda',
    'stroll': 'Stroll',
    'bearman': 'Bearman',
    'hulkenberg': 'Hülkenberg',
    'ricciardo': 'Ricciardo',
    'albon': 'Albon',
    'ocon': 'Ocon',
    'kevin_magnussen': 'Magnussen',
    'gasly': 'Gasly',
    'zhou': 'Zhou',
    'bottas': 'Bottas',
    'sargeant': 'Sargeant'
}

# 3. Función para mapear los nombres de los pilotos
def map_driver_names(driver):
    return driver_mapping.get(driver, driver)

# 4. Aplicar la función de mapeo a las columnas driver_id de ambas tablas
driver_standings_df['driver_id'] = driver_standings_df['driver_id'].apply(map_driver_names)
fastest_laps_per_driver['driver_id'] = fastest_laps_per_driver['driver_id'].apply(map_driver_names)

# 5. Ahora puedes realizar el merge
driver_standings_df = pd.merge(driver_standings_df, fastest_laps_per_driver, on=['season', 'driver_id'], how='left')

# 6. Rellenar NaN con 0 porque algunos pilotos pueden no tener vueltas rápidas
driver_standings_df['fastest_laps'] = driver_standings_df['fastest_laps'].fillna(0).astype(int)

# Imprimir el resultado final
print(driver_standings_df)
print(driver_standings_df.info(memory_usage='deep'))

      driverId                                             url givenName  \
0        abate  http://en.wikipedia.org/wiki/Carlo_Mario_Abate     Carlo   
1    abecassis   http://en.wikipedia.org/wiki/George_Abecassis    George   
2      acheson      http://en.wikipedia.org/wiki/Kenny_Acheson     Kenny   
3        adams     http://en.wikipedia.org/wiki/Philippe_Adams  Philippe   
4         ader          http://en.wikipedia.org/wiki/Walt_Ader      Walt   
..         ...                                             ...       ...   
854     zapico      http://en.wikipedia.org/wiki/Emilio_Zapico    Emilio   
855       zhou        http://en.wikipedia.org/wiki/Zhou_Guanyu    Guanyu   
856      zonta      http://en.wikipedia.org/wiki/Ricardo_Zonta   Ricardo   
857      zorzi        http://en.wikipedia.org/wiki/Renzo_Zorzi     Renzo   
858     zunino     http://en.wikipedia.org/wiki/Ricardo_Zunino   Ricardo   

    familyName dateOfBirth nationality permanentNumber  code  
0        Abate  1932-07-

  lap_times_df['fastest_lap_of_driver'] = (lap_times_df['lap_time'] == lap_times_df.groupby(['driver_id', 'season', 'round'])['lap_time'].transform('min'))
  fastest_lap_per_driver = lap_times_df.groupby(['driver_id', 'season', 'round'])['lap_time'].min().reset_index()
  victories_per_driver = race_results_df[race_results_df['position'] == 1].groupby(['season', 'driver_id'])['position'].count().reset_index()
  podiums_per_driver = race_results_df[race_results_df['position'] <= 3].groupby(['season', 'driver_id'])['position'].count().reset_index()
  fastest_laps_per_driver = lap_times_df[lap_times_df['fastest_lap_of_race']].groupby(['season', 'driver_id']).size().reset_index(name='fastest_laps')


### **Optional**: Storage of refined data in Parquet format in the silver layer of the datalake


In [4]:
# Guardar los resultados procesados(datos refinados ) en nuevos archivos Parquet en la capa Silver
save_to_parquet(drivers_df, "datalake/silver/ergast_api/drivers/data.parquet")
if should_run_incremental:
    if not lap_times_df.empty:
        save_to_parquet(lap_times_df, "datalake/silver/ergast_api/lap_times", ["season", "round", "driver_id"])
    if not race_results_df.empty:
        save_to_parquet(race_results_df, "datalake/silver/ergast_api/race_results", ["season", "round"])
    if not driver_standings_df.empty:
        save_to_parquet(driver_standings_df, "datalake/silver/ergast_api/driver_standings", ["season", "total_rounds"])

¡
# **Load/Storage** of OLAP Data



In this section, a connection to a PostgreSQL database is established, and the transformed data is stored in specific tables. The general structure of the operations is as follows:

1. **Definition of pipeline.conf**: Parameters necessary to connect to the database are set.
2. **Creation of the connection**: `SQLAlchemy` is used to create a connection engine that allows interaction with the database.
3. **Creation of tables**: SQL queries are defined and executed to create the necessary tables in the database, ensuring they have the appropriate schema.
4. **Data insertion**: The transformed DataFrames are saved into the corresponding tables using the `to_sql` method of `pandas`.


NOTE: To connect to the database (securely without exposing connection data), it is necessary to upload or create a configuration file 'pipeline.conf' in this session with the connection details and PostgreSQL credentials. These data will be used by the 'connect_to_db' function to establish the connection with the database.

Template for creating the file and entering connection data:


```
[postgres]
host=****.aivencloud.com
port=15191
user=avnadmin
pwd=****
dbname=defaultdb
```

### Code:

In [5]:
#Establecemos conexion con la base de datos
engine = connect_to_db(
    "pipeline.conf",
    "postgres",
    "postgresql+psycopg2"
    )

conn = engine.connect()

# Definimos las queries para crear las tablas (Se recomienda crear la tabla antes de insertar los datos para definir un esquema adecuado)
queries = [
    text("""
    CREATE TABLE IF NOT EXISTS public.drivers (
        driver_id VARCHAR(50) PRIMARY KEY,
        givenName VARCHAR(255),
        familyName VARCHAR(255),
        nationality VARCHAR(255),
        permanent_number INT,
        code VARCHAR(50),
        date_of_birth DATE,
        profile_url VARCHAR(255)
    );
    """),
    text("""
    CREATE TABLE IF NOT EXISTS public.lap_times (
        season INT,
        date DATE,
        round INT,
        country VARCHAR(255),
        circuit_name VARCHAR(255),
        lap_number INT,
        position INT,
        driver_id VARCHAR(50),
        lap_time VARCHAR(50),
        fastest_lap_of_driver BOOLEAN,
        fastest_lap_of_race BOOLEAN
    );
    """),
    text("""
    CREATE TABLE IF NOT EXISTS public.race_results (
        season INT,
        date DATE,
        round INT,
        race_name VARCHAR(255),
        circuit_name VARCHAR(255),
        driver_id VARCHAR(50),
        position INT,
        time VARCHAR(50),
        points INT
    );
    """),
    text("""
    CREATE TABLE IF NOT EXISTS public.driver_standings (
        season INT,
        total_rounds INT,
        position INT,
        driver_id VARCHAR(50),
        points INT,
        victories INT,
        podiums INT,
        fastest_laps INT
    );
    """)
]

# Ejecutamos las queries para crear las tablas
with engine.connect() as conn:
    for query in queries:
        conn.execute(query)

# Guardar los DataFrames en la base de datos
drivers_df.to_sql('drivers', con=engine, if_exists='append', index=False, method='multi')
lap_times_df.to_sql('lap_times', con=engine, if_exists='append', index=False, method='multi')
race_results_df.to_sql('race_results', con=engine, if_exists='append', index=False, method='multi')
driver_standings_df.to_sql('driver_standings', con=engine, if_exists='append', index=False, method='multi')

# Imprimir un mensaje indicando que la inserción de datos se completó
print("Inserción de datos en la base de datos completada.")

Inserción de datos en la base de datos completada.
