In [33]:
import pandas as pd
import os
import json
import zipfile
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.functions import col
from pyspark.sql import functions as F

In [None]:
# Leer las credenciales del archivo kaggle.json
with open('./kaggle.json', 'r') as f:
    kaggle_creds = json.load(f)

In [None]:
# Establecer las variables de entorno
os.environ['KAGGLE_USERNAME'] = kaggle_creds['username']
os.environ['KAGGLE_KEY'] = kaggle_creds['key']

In [None]:
# Descargar el dataset
!kaggle datasets download -d rohanrao/formula-1-world-championship-1950-2020

In [None]:
# Ruta del archivo .tar.gz
zip_file = './formula-1-world-championship-1950-2020.zip'
extract_folder = './Formula-1-Datasets'

In [None]:
# Abrir el archivo y extraer todo
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
  zip_ref.extractall(extract_folder)

In [5]:
!ls Formula-1-Datasets

circuits.csv		   drivers.csv	   results.csv
constructor_results.csv    lap_times.csv   seasons.csv
constructor_standings.csv  pit_stops.csv   sprint_results.csv
constructors.csv	   qualifying.csv  status.csv
driver_standings.csv	   races.csv


In [10]:
# Directorio donde están los archivos CSV
csv_dir = './Formula-1-Datasets'
# Obtener la lista de archivos CSV en el directorio
csv_files = [f for f in os.listdir(csv_dir) if f.endswith('.csv')]

In [11]:
csv_files

['circuits.csv',
 'constructors.csv',
 'constructor_results.csv',
 'constructor_standings.csv',
 'drivers.csv',
 'driver_standings.csv',
 'lap_times.csv',
 'pit_stops.csv',
 'qualifying.csv',
 'races.csv',
 'results.csv',
 'seasons.csv',
 'sprint_results.csv',
 'status.csv']

In [12]:
spark = SparkSession.builder \
  .master("local[*]") \
  .appName('test') \
  .getOrCreate()

In [None]:
# Crear un diccionario para almacenar DataFrames de Spark
spark_dfs = {}

In [13]:
# Recorrer cada archivo CSV
for file in csv_files:
    # Extraer el nombre del archivo sin la extensión .csv
    df_name = file.replace('.csv', '')
    
    file_path = os.path.join(csv_dir, file)
    # Crear DataFrame de Spark
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    
    # Crear una variable dinámica para el DataFrame
    spark_dfs[df_name] = df

In [14]:
spark_dfs['circuits'].show()

+---------+--------------+--------------------+------------+---------+--------+---------+---+--------------------+
|circuitId|    circuitRef|                name|    location|  country|     lat|      lng|alt|                 url|
+---------+--------------+--------------------+------------+---------+--------+---------+---+--------------------+
|        1|   albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968| 10|http://en.wikiped...|
|        2|        sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738| 18|http://en.wikiped...|
|        3|       bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|  7|http://en.wikiped...|
|        4|     catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|109|http://en.wikiped...|
|        5|      istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|130|http://en.wikiped...|
|        6|        monaco|   Circuit de Monaco| Monte-Carlo|   Monaco| 43.7347| 

In [16]:
# Función para validar datos nulos y tipos de datos en DataFrames de Spark
def validate_dataframes(dataframes):
    validation_results = {}
    for name, df in spark_dfs.items():
        results = {}
        
        # Verificación de datos nulos
        nulls_summary = {}
        for column in df.columns:
            null_count = df.filter(col(column).isNull()).count()
            if null_count > 0:
                nulls_summary[column] = null_count
        
        # Verificación de tipos de datos
        dtypes = {field.name: field.dataType.simpleString() for field in df.schema.fields}
        
        # Almacenar resultados de validación
        results['nulls'] = nulls_summary
        results['dtypes'] = dtypes
        
        validation_results[name] = results
    
    return validation_results

# Validar DataFrames
validation_results = validate_dataframes(spark_dfs)

# Imprimir resultados de validación
for df_name, results in validation_results.items():
    print(f"\nDataFrame: {df_name}")
    print("Nulls in columns:")
    for column, count in results['nulls'].items():
        print(f"{column}: {count} null values")
    print("\nData types of columns:")
    for column, dtype in results['dtypes'].items():
        print(f"{column}: {dtype}")


DataFrame: circuits
Nulls in columns:

Data types of columns:
circuitId: int
circuitRef: string
name: string
location: string
country: string
lat: double
lng: double
alt: int
url: string

DataFrame: constructor_results
Nulls in columns:

Data types of columns:
constructorResultsId: int
raceId: int
constructorId: int
points: double
status: string

DataFrame: constructor_standings
Nulls in columns:

Data types of columns:
constructorStandingsId: int
raceId: int
constructorId: int
points: double
position: int
positionText: string
wins: int

DataFrame: constructors
Nulls in columns:

Data types of columns:
constructorId: int
constructorRef: string
name: string
nationality: string
url: string

DataFrame: driver_standings
Nulls in columns:

Data types of columns:
driverStandingsId: int
raceId: int
driverId: int
points: double
position: int
positionText: string
wins: int

DataFrame: drivers
Nulls in columns:

Data types of columns:
driverId: int
driverRef: string
number: string
code: string


In [17]:
# Imprime el esquema de cada DataFrame
for df_name, df in spark_dfs.items():
    schema = df.schema
    
    # Imprime el nombre del DataFrame y su esquema
    print(f"Schema for DataFrame: {df_name}")
    print(schema)
    print('--------------------------')

Schema for DataFrame: circuits
StructType([StructField('circuitId', IntegerType(), True), StructField('circuitRef', StringType(), True), StructField('name', StringType(), True), StructField('location', StringType(), True), StructField('country', StringType(), True), StructField('lat', DoubleType(), True), StructField('lng', DoubleType(), True), StructField('alt', IntegerType(), True), StructField('url', StringType(), True)])
--------------------------
Schema for DataFrame: constructor_results
StructType([StructField('constructorResultsId', IntegerType(), True), StructField('raceId', IntegerType(), True), StructField('constructorId', IntegerType(), True), StructField('points', DoubleType(), True), StructField('status', StringType(), True)])
--------------------------
Schema for DataFrame: constructor_standings
StructType([StructField('constructorStandingsId', IntegerType(), True), StructField('raceId', IntegerType(), True), StructField('constructorId', IntegerType(), True), StructField(

In [None]:
# Reemplazar '\N' por None en todos los DataFrames
for df_name, df in spark_dfs.items():
    for col_name in df.columns:
        df = df.withColumn(col_name, F.when(F.col(col_name) == r"\N", None).otherwise(F.col(col_name)))
    spark_dfs[df_name] = df

In [49]:
schemas = {
'circuits' : types.StructType([
    types.StructField('circuitId', types.IntegerType(), True), 
    types.StructField('circuitRef', types.StringType(), True), 
    types.StructField('name', types.StringType(), True), 
    types.StructField('location', types.StringType(), True), 
    types.StructField('country', types.StringType(), True), 
    types.StructField('lat', types.DoubleType(), True), 
    types.StructField('lng', types.DoubleType(), True), 
    types.StructField('alt', types.IntegerType(), True), 
    types.StructField('url', types.StringType(), True)
    ]),
'constructor_results' : types.StructType([
    types.StructField('constructorResultsId', types.IntegerType(), True), 
    types.StructField('raceId', types.IntegerType(), True), 
    types.StructField('constructorId', types.IntegerType(), True), 
    types.StructField('points', types.DoubleType(), True), 
    types.StructField('status', types.StringType(), True)
    ]),
'constructor_standings' : types.StructType([
    types.StructField('constructorStandingsId', types.IntegerType(), True), 
    types.StructField('raceId', types.IntegerType(), True), 
    types.StructField('constructorId', types.IntegerType(), True), 
    types.StructField('points', types.DoubleType(), True), 
    types.StructField('position', types.IntegerType(), True), 
    types.StructField('positionText', types.StringType(), True), 
    types.StructField('wins', types.IntegerType(), True)
    ]),
'constructors' : types.StructType([
    types.StructField('constructorId', types.IntegerType(), True), 
    types.StructField('constructorRef', types.StringType(), True), 
    types.StructField('name', types.StringType(), True), 
    types.StructField('nationality', types.StringType(), True), 
    types.StructField('url', types.StringType(), True)
    ]),
'driver_standings' : types.StructType([
    types.StructField('driverStandingsId', types.IntegerType(), True), 
    types.StructField('raceId', types.IntegerType(), True), 
    types.StructField('driverId', types.IntegerType(), True), 
    types.StructField('points', types.DoubleType(), True), 
    types.StructField('position', types.IntegerType(), True), 
    types.StructField('positionText', types.StringType(), True), 
    types.StructField('wins', types.IntegerType(), True)
    ]),
'drivers' : types.StructType([
    types.StructField('driverId', types.IntegerType(), True), 
    types.StructField('driverRef', types.StringType(), True), 
    types.StructField('number', types.StringType(), True), 
    types.StructField('code', types.StringType(), True), 
    types.StructField('forename', types.StringType(), True), 
    types.StructField('surname', types.StringType(), True), 
    types.StructField('dob', types.TimestampType(), True), 
    types.StructField('nationality', types.StringType(), True), 
    types.StructField('url', types.StringType(), True)
    ]),
'lap_times' : types.StructType([
    types.StructField('raceId', types.IntegerType(), True), 
    types.StructField('driverId', types.IntegerType(), True), 
    types.StructField('lap', types.IntegerType(), True), 
    types.StructField('position', types.IntegerType(), True), 
    types.StructField('time', types.TimestampType(), True), 
    types.StructField('milliseconds', types.IntegerType(), True)
    ]),
'pit_stops' : types.StructType([
    types.StructField('raceId', types.IntegerType(), True), 
    types.StructField('driverId', types.IntegerType(), True), 
    types.StructField('stop', types.IntegerType(), True), 
    types.StructField('lap', types.IntegerType(), True), 
    types.StructField('time', types.TimestampType(), True), 
    types.StructField('duration', types.DoubleType(), True), 
    types.StructField('milliseconds', types.IntegerType(), True)
    ]),
'qualifying' : types.StructType([
    types.StructField('qualifyId', types.IntegerType(), True), 
    types.StructField('raceId', types.IntegerType(), True), 
    types.StructField('driverId', types.IntegerType(), True), 
    types.StructField('constructorId', types.IntegerType(), True), 
    types.StructField('number', types.IntegerType(), True), 
    types.StructField('position', types.IntegerType(), True), 
    types.StructField('q1', types.StringType(), True), 
    types.StructField('q2', types.StringType(), True), 
    types.StructField('q3', types.StringType(), True)
    ]),
'races' : types.StructType([
    types.StructField('raceId', types.IntegerType(), True), 
    types.StructField('year', types.IntegerType(), True), 
    types.StructField('round', types.IntegerType(), True), 
    types.StructField('circuitId', types.IntegerType(), True), 
    types.StructField('name', types.StringType(), True), 
    types.StructField('date', types.DateType(), True), 
    types.StructField('time', types.StringType(), True), 
    types.StructField('url', types.StringType(), True), 
    types.StructField('fp1_date', types.StringType(), True), 
    types.StructField('fp1_time', types.StringType(), True), 
    types.StructField('fp2_date', types.StringType(), True), 
    types.StructField('fp2_time', types.StringType(), True), 
    types.StructField('fp3_date', types.StringType(), True), 
    types.StructField('fp3_time', types.StringType(), True), 
    types.StructField('quali_date', types.StringType(), True), 
    types.StructField('quali_time', types.StringType(), True), 
    types.StructField('sprint_date', types.StringType(), True), 
    types.StructField('sprint_time', types.StringType(), True)
    ]),
'results' : types.StructType([
    types.StructField('resultId', types.IntegerType(), True), 
    types.StructField('raceId', types.IntegerType(), True), 
    types.StructField('driverId', types.IntegerType(), True), 
    types.StructField('constructorId', types.IntegerType(), True), 
    types.StructField('number', types.IntegerType(), True), 
    types.StructField('grid', types.IntegerType(), True), 
    types.StructField('position', types.IntegerType(), True), 
    types.StructField('positionText', types.StringType(), True), 
    types.StructField('positionOrder', types.IntegerType(), True), 
    types.StructField('points', types.DoubleType(), True), 
    types.StructField('laps', types.IntegerType(), True), 
    types.StructField('time', types.StringType(), True), 
    types.StructField('milliseconds', types.IntegerType(), True), 
    types.StructField('fastestLap', types.IntegerType(), True), 
    types.StructField('rank', types.IntegerType(), True), 
    types.StructField('fastestLapTime', types.StringType(), True), 
    types.StructField('fastestLapSpeed', types.DoubleType(), True), 
    types.StructField('statusId', types.IntegerType(), True)
    ]),
'seasons' : types.StructType([
    types.StructField('year', types.IntegerType(), True), 
    types.StructField('url', types.StringType(), True)
    ]),
'sprint_results' : types.StructType([
    types.StructField('resultId', types.IntegerType(), True), 
    types.StructField('raceId', types.IntegerType(), True), 
    types.StructField('driverId', types.IntegerType(), True), 
    types.StructField('constructorId', types.IntegerType(), True), 
    types.StructField('number', types.IntegerType(), True), 
    types.StructField('grid', types.IntegerType(), True), 
    types.StructField('position', types.IntegerType(), True), 
    types.StructField('positionText', types.StringType(), True), 
    types.StructField('positionOrder', types.IntegerType(), True), 
    types.StructField('points', types.IntegerType(), True), 
    types.StructField('laps', types.IntegerType(), True), 
    types.StructField('time', types.StringType(), True), 
    types.StructField('milliseconds', types.IntegerType(), True), 
    types.StructField('fastestLap', types.StringType(), True), 
    types.StructField('fastestLapTime', types.StringType(), True), 
    types.StructField('statusId', types.IntegerType(), True)
    ]),
'status' : types.StructType([
    types.StructField('statusId', types.IntegerType(), True), 
    types.StructField('status', types.StringType(), True)
    ])
}

In [52]:
# Función para aplicar el esquema a un DataFrame
def apply_schema(df, schema):
    for field in schema:
        column_name = field.name
        column_type = field.dataType
        # Aplicar el cast si la columna existe en el DataFrame
        if column_name in df.columns:
            df = df.withColumn(column_name, col(column_name).cast(column_type))
    return df

# Iterar sobre el diccionario de DataFrames y aplicar los esquemas
for df_name, df in spark_dfs.items():
    if df_name in schemas:
        schema = schemas[df_name]
        spark_dfs[df_name] = apply_schema(df, schema)

# Imprimir los esquemas modificados para verificar
for df_name, df in spark_dfs.items():
    print(f"Schema for DataFrame: {df_name}")
    df.printSchema()


Schema for DataFrame: circuits
root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)

Schema for DataFrame: constructor_results
root
 |-- constructorResultsId: integer (nullable = true)
 |-- raceId: integer (nullable = true)
 |-- constructorId: integer (nullable = true)
 |-- points: double (nullable = true)
 |-- status: string (nullable = true)

Schema for DataFrame: constructor_standings
root
 |-- constructorStandingsId: integer (nullable = true)
 |-- raceId: integer (nullable = true)
 |-- constructorId: integer (nullable = true)
 |-- points: double (nullable = true)
 |-- position: integer (nullable = true)
 |-- positionText: string (nullable = true)
 |-- wins: integer (nullable = true)

Schema for 