# PRÁCTICA 1: **DATOS ESTRUCTURADOS**

**GRUPO:** GESTDB_2

**MIEMBROS:** 
- JAIME ALVAREZ URUEÑA
- ÁLVARO FRAILE CARMENA 
- ALEJANDRO MENDOZA MEDINA
- JAVIER QUESADA PAJARES
  
----

In [1]:
import pandas as pd
import mariadb
import psycopg2
import sys
import numpy as np

Para esta práctica hemos seleccionado unas pocas tablas del siguiente dataset [**Formula 1 World Championship (1950 - 2024)**](https://www.kaggle.com/datasets/rohanrao/formula-1-world-championship-1950-2020?resource=download&select=drivers.csv). Tomaremos estas tablas y las pre-procesaremos ligeramente y las insertaremos sobre unos servicios de MariaDB y Postgre. Estos dos sistemas se unirán mediante un sistema de Hive para crear el Data Warehouse.

## Lectura de datos

Vamos a leer los datos de los csvs usando pandas.

In [2]:
drivers = pd.read_csv("./data/structured/drivers.csv")
results = pd.read_csv("./data/structured/results.csv")
races = pd.read_csv("./data/structured/races.csv")
constructors = pd.read_csv("./data/structured/constructors.csv")

In [3]:
drivers.head()

Unnamed: 0,driverId,driverRef,number,code,forename,surname,dob,nationality,url
0,1,hamilton,44,HAM,Lewis,Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton
1,2,heidfeld,\N,HEI,Nick,Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld
2,3,rosberg,6,ROS,Nico,Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg
3,4,alonso,14,ALO,Fernando,Alonso,1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso
4,5,kovalainen,\N,KOV,Heikki,Kovalainen,1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen


In [4]:
results.head()

Unnamed: 0,resultId,raceId,driverId,constructorId,number,grid,position,positionText,positionOrder,points,laps,time,milliseconds,fastestLap,rank,fastestLapTime,fastestLapSpeed,statusId
0,1,18,1,1,22,1,1,1,1,10.0,58,1:34:50.616,5690616,39,2,1:27.452,218.3,1
1,2,18,2,2,3,5,2,2,2,8.0,58,+5.478,5696094,41,3,1:27.739,217.586,1
2,3,18,3,3,7,7,3,3,3,6.0,58,+8.163,5698779,41,5,1:28.090,216.719,1
3,4,18,4,4,5,11,4,4,4,5.0,58,+17.181,5707797,58,7,1:28.603,215.464,1
4,5,18,5,1,23,3,5,5,5,4.0,58,+18.014,5708630,43,1,1:27.418,218.385,1


In [5]:
races.head()

Unnamed: 0,raceId,year,round,circuitId,name,date,time,url,fp1_date,fp1_time,fp2_date,fp2_time,fp3_date,fp3_time,quali_date,quali_time,sprint_date,sprint_time
0,1,2009,1,1,Australian Grand Prix,2009-03-29,06:00:00,http://en.wikipedia.org/wiki/2009_Australian_G...,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
1,2,2009,2,2,Malaysian Grand Prix,2009-04-05,09:00:00,http://en.wikipedia.org/wiki/2009_Malaysian_Gr...,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
2,3,2009,3,17,Chinese Grand Prix,2009-04-19,07:00:00,http://en.wikipedia.org/wiki/2009_Chinese_Gran...,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
3,4,2009,4,3,Bahrain Grand Prix,2009-04-26,12:00:00,http://en.wikipedia.org/wiki/2009_Bahrain_Gran...,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
4,5,2009,5,4,Spanish Grand Prix,2009-05-10,12:00:00,http://en.wikipedia.org/wiki/2009_Spanish_Gran...,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N


In [6]:
constructors.head()

Unnamed: 0,constructorId,constructorRef,name,nationality,url
0,1,mclaren,McLaren,British,http://en.wikipedia.org/wiki/McLaren
1,2,bmw_sauber,BMW Sauber,German,http://en.wikipedia.org/wiki/BMW_Sauber
2,3,williams,Williams,British,http://en.wikipedia.org/wiki/Williams_Grand_Pr...
3,4,renault,Renault,French,http://en.wikipedia.org/wiki/Renault_in_Formul...
4,5,toro_rosso,Toro Rosso,Italian,http://en.wikipedia.org/wiki/Scuderia_Toro_Rosso


Para algunos datos tendremos que hacer algún ligero preprocesamiento sobre el formato.

In [7]:
# Reemplazar valores nulos o no válidos ('\N') con NaN y luego con None
results.replace(r'\\N', np.nan, regex=True, inplace=True)
results.fillna(value=0, inplace=True)

drivers.replace(r'\\N', np.nan, regex=True, inplace=True)
drivers.fillna(value=0, inplace=True)

In [8]:
# Convertir la columna 'fastestlapspeed' (u otras columnas) a un formato numérico sin comillas
results['fastestLapSpeed'] = results['fastestLapSpeed'].replace({r'"': ''}, regex=True).astype(float)

# Guardar el archivo limpio
results.to_csv('./data/structured/results_cleaned.csv', index=False)

## Inserción en MariaDB

Vamos a insertar una parte de los datos en MariaDB.

<div class="alert alert-block alert-info"> 
<b>NOTA</b> Para esta sección será necesario tener levantado el contenedor de Docker de MariaDB.
</div>

In [9]:
host = "localhost"
port = 3307

# Connect to MariaDB Platform
try:
    conn = mariadb.connect(
        user="root",
        password="my_password",
        host=host,
        port=port
    )

    print(f"Succesfully connected to MariaDB on {host}:{port}")
except mariadb.Error as e:
    print(f"Error connecting to MariaDB: {e}")
    sys.exit(1)

# Create a cursor object 
cur  = conn.cursor() 

Succesfully connected to MariaDB on localhost:3307


In [10]:
# Creación de la bbdd
cur.execute("CREATE DATABASE IF NOT EXISTS F1_STRUCTURED_DATA")  
  
cur.execute("SHOW DATABASES") 
databaseList = cur.fetchall() 

print(databaseList)

# Seleccionar la base de datos
cur.execute("USE F1_STRUCTURED_DATA")

[('F1_STRUCTURED_DATA',), ('information_schema',), ('my_mariadb_db',), ('mysql',), ('performance_schema',), ('sys',)]


In [11]:
# Crear tablas
cur.execute("""
    CREATE TABLE IF NOT EXISTS drivers (
        driverId INT  PRIMARY KEY,
        driverRef VARCHAR(100),
        number INT,
        code VARCHAR(100),
        forename VARCHAR(100),
        surname VARCHAR(100),
        dob DATE,
        nationality VARCHAR(100),
        url VARCHAR(100)
    )
    """)

cur.execute("""
    CREATE TABLE IF NOT EXISTS constructors (
        constructorId INT PRIMARY KEY,
        constructorRef VARCHAR(50),
        name VARCHAR(100),
        nationality VARCHAR(50),
        url VARCHAR(255)
        )
            """)


# Verificar que la tabla fue creada
cur.execute("SHOW TABLES")
tablesList = cur.fetchall()
print("Tablas disponibles:", tablesList)

Tablas disponibles: [('constructors',), ('drivers',)]


In [12]:
# Insertar los datos

# Insertar los datos del CSV en la tabla
# Suponiendo que ya tienes una conexión a la base de datos y un cursor (`cur`)
for index, row in constructors.iterrows():
    cur.execute('''
        INSERT IGNORE INTO constructors (
            constructorId, constructorRef, name, nationality, url
        ) VALUES (?, ?, ?, ?, ?)
    ''', (
        row['constructorId'], row['constructorRef'], row['name'],
        row['nationality'], row['url']
    ))


for index, row in drivers.iterrows():
    cur.execute('''
        INSERT IGNORE INTO drivers (
            driverId, driverRef, number, code, forename, surname, dob, nationality, url
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    ''', (
        row['driverId'], row['driverRef'], row['number'] if row['number'] != 0 else None,
        row['code'], row['forename'], row['surname'], row['dob'],
        row['nationality'], row['url']
    ))

conn.commit()

In [13]:
conn.close()

## Inserción en Postgre

El resto de datos será insertado en Postgre.

<div class="alert alert-block alert-info"> 
<b>NOTA</b> Para esta sección será necesario tener levantado el contenedor de Docker de Postgre.
</div>

In [22]:
# Connect to Postgre
try:
    conn = psycopg2.connect(dbname="postgres",
                        host="localhost",
                        user="hive",
                        password="password",
                        port="5432")
    print(f"Succesfully connected")
except Exception as e:
    print(f"Error connecting: {e}")
    sys.exit(1)

conn.autocommit = True  # Deshabilitar transacciones automáticas

# Create a cursor object 
cur  = conn.cursor() 

Succesfully connected


In [15]:
from psycopg2 import sql

In [16]:
# Nombre de la nueva base de datos
nombre_db = "F1_STRUCTURED_DATA_POSTGRE"

In [24]:
# Comando SQL para crear la base de datos
cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(nombre_db)))

# Confirma los cambios
conn.commit()

# Dará error si se intenta crear la base de datos dos veces
cur.close()
conn.close()

In [25]:
# Connect to Postgre but on the creared database
try:
    conn = psycopg2.connect(dbname=nombre_db,
                        host="localhost",
                        user="hive",
                        password="password",
                        port="5432")
    print(f"Succesfully connected")
except Exception as e:
    print(f"Error connecting: {e}")
    sys.exit(1)

conn.autocommit = True  # Deshabilitar transacciones automáticas

# Create a cursor object 
cur  = conn.cursor() 

Succesfully connected


In [26]:
# Crear la tabla races
cur.execute("""
    CREATE TABLE races (
        raceId INTEGER PRIMARY KEY,
        year INTEGER,
        round INTEGER,
        circuitId INTEGER,
        name TEXT,
        date DATE,
        time TIME,
        url TEXT,
        fp1_date DATE,
        fp1_time TIME,
        fp2_date DATE,
        fp2_time TIME,
        fp3_date DATE,
        fp3_time TIME,
        quali_date DATE,
        quali_time TIME,
        sprint_date DATE,
        sprint_time TIME
    );
""")


In [27]:
# Leer y subir el archivo CSV
with open('./data/structured/races.csv', 'r') as f:
    # Omitir la cabecera
    next(f)
    # Insertar datos directamente
    cur.copy_from(f, 'races', sep=',')

# Confirmar los cambios
conn.commit()


In [28]:
# Crear la tabla results
cur.execute("""
    CREATE TABLE results (
        resultId INTEGER PRIMARY KEY,
        raceId INTEGER,
        driverId INTEGER,
        constructorId INTEGER,
        number TEXT,
        grid INTEGER,
        position INTEGER,
        positionText TEXT,
        positionOrder INTEGER,
        points NUMERIC,
        laps INTEGER,
        time TEXT,
        milliseconds INTEGER,
        fastestLap INTEGER,
        rank INTEGER,
        fastestLapTime TEXT,
        fastestLapSpeed NUMERIC,
        statusId INTEGER
    );
""")

In [29]:
# Leer y subir el archivo CSV
with open('./data/structured/results_cleaned.csv', 'r') as f:
    # Omitir la cabecera
    next(f)
    # Insertar datos directamente
    cur.copy_from(f, 'results', sep=',')

# Confirmar los cambios
conn.commit()

In [30]:
cur.close()
conn.close()

## Configuración con Hive

Ahora tenemos que configurar las tablas que hemos creado en MariaDB y Postgre para poder acceder a ellas a través de Hive.

<div class="alert alert-block alert-info"> 
<b>NOTA</b> Para esta sección será necesario tener levantado el contenedor de Docker de Hive.
</div>

In [31]:
from pyhive import hive
conn = hive.Connection(host="localhost", username="hive", database="f1_hive")
cursor = conn.cursor()

In [33]:
# TABLA DRIVERS

cursor.execute('''
CREATE EXTERNAL TABLE IF NOT EXISTS hive_drivers (
    driverId INT,
    driverRef STRING,
    number INT,
    code STRING,
    forename STRING,
    surname STRING,
    dob DATE,
    nationality STRING,
    url STRING
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
  "hive.sql.database.type" = "MYSQL",
  "hive.sql.jdbc.url" = "jdbc:mariadb://hive4-mariadb:3306/F1_STRUCTURED_DATA",
  "hive.sql.dbcp.username" = "root",
  "hive.sql.dbcp.password" = "my_password",
  "hive.sql.jdbc.driver" = "org.mariadb.jdbc.Driver",
  "hive.sql.table" = "drivers"
)

               ''')

In [34]:
# TABLA CONSTRUCTORS

cursor.execute('''
CREATE EXTERNAL TABLE IF NOT EXISTS hive_constructors (
    constructorId INT,
    constructorRef STRING,
    name STRING,
    nationality STRING,
    url STRING
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
  "hive.sql.database.type" = "MYSQL",
  "hive.sql.jdbc.url" = "jdbc:mariadb://hive4-mariadb:3306/F1_STRUCTURED_DATA",
  "hive.sql.dbcp.username" = "root",
  "hive.sql.dbcp.password" = "my_password",
  "hive.sql.jdbc.driver" = "org.mariadb.jdbc.Driver",
  "hive.sql.table" = "constructors"
)

               ''')

In [35]:
# TABLA RESULTS

cursor.execute('''
CREATE EXTERNAL TABLE IF NOT EXISTS hive_results (
    resultId INTEGER,
    raceId INTEGER,
    driverId INTEGER,
    constructorId INTEGER,
    number STRING,
    grid INTEGER,
    position INTEGER,
    positionText STRING,
    positionOrder INTEGER,
    points INTEGER,
    laps INTEGER,
    `time` STRING,          -- 'time' entre comillas invertidas
    miliseconds INTEGER,
    fastestLap INTEGER,
    `rank` INTEGER,         -- 'rank' entre comillas invertidas
    fastestLapTime STRING,
    fastestLapSpeed NUMERIC,
    statusId INTEGER
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
  "hive.sql.database.type" = "POSTGRES",
  "hive.sql.jdbc.url" = "jdbc:postgresql://hive4-postgres:5432/F1_STRUCTURED_DATA_POSTGRE",
  "hive.sql.dbcp.username" = "hive",
  "hive.sql.dbcp.password" = "password",
  "hive.sql.jdbc.driver" = "org.postgresql.Driver",
  "hive.sql.table" = "results"
)

               ''')

In [37]:
# TABLA RACES

cursor.execute('''
CREATE EXTERNAL TABLE IF NOT EXISTS hive_races (
    raceId INT,
    year INT,
    round INT,
    circuitId INT,
    name STRING,
    `date` DATE,            -- 'date' entre comillas invertidas por ser palabra reservada
    `time` STRING,          -- 'time' entre comillas invertidas, mapeado a STRING
    url STRING,
    fp1_date DATE,
    fp1_time STRING,        -- TIME mapeado a STRING
    fp2_date DATE,
    fp2_time STRING,
    fp3_date DATE,
    fp3_time STRING,
    quali_date DATE,
    quali_time STRING,
    sprint_date DATE,
    sprint_time STRING
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
  "hive.sql.database.type" = "POSTGRES",
  "hive.sql.jdbc.url" = "jdbc:postgresql://hive4-postgres:5432/F1_STRUCTURED_DATA_POSTGRE",
  "hive.sql.dbcp.username" = "hive",
  "hive.sql.dbcp.password" = "password",
  "hive.sql.jdbc.driver" = "org.postgresql.Driver",
  "hive.sql.table" = "races"
)
               ''')

Ahora vamos a hacer unas querys sencillas para comprobar que todo funciona correctamente.

In [42]:
cursor.execute('''
SELECT * FROM hive_drivers LIMIT 5
               ''')

results = cursor.fetchall()

for result in results:
    print(result)

(1, 'hamilton', 44, 'HAM', 'Lewis', 'Hamilton', '1985-01-07', 'British', 'http://en.wikipedia.org/wiki/Lewis_Hamilton')
(2, 'heidfeld', None, 'HEI', 'Nick', 'Heidfeld', '1977-05-10', 'German', 'http://en.wikipedia.org/wiki/Nick_Heidfeld')
(3, 'rosberg', 6, 'ROS', 'Nico', 'Rosberg', '1985-06-27', 'German', 'http://en.wikipedia.org/wiki/Nico_Rosberg')
(4, 'alonso', 14, 'ALO', 'Fernando', 'Alonso', '1981-07-29', 'Spanish', 'http://en.wikipedia.org/wiki/Fernando_Alonso')
(5, 'kovalainen', None, 'KOV', 'Heikki', 'Kovalainen', '1981-10-19', 'Finnish', 'http://en.wikipedia.org/wiki/Heikki_Kovalainen')


In [43]:
cursor.execute('''
SELECT count(*) FROM hive_results
               ''')

print(cursor.fetchall())

[(26519,)]


In [44]:
cur.close()
conn.close()