In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType
from pyspark.sql.functions import col, lit, concat, current_timestamp
from pathlib import Path

results_bronze_path = Path.cwd() / 'bronze' / 'results.json'
results_silver_path = Path.cwd() / 'silver' / 'results'

spark = SparkSession.builder.appName('f1Practice').getOrCreate()

In [21]:
results_schema = StructType(fields=[
    StructField('constructorId', IntegerType(), True),
    StructField('driverId', IntegerType(), True),
    StructField('fastestLap', IntegerType(), True),
    StructField('fastestLapSpeed', FloatType(), True),
    StructField('fastestLapTime', StringType(), True),
    StructField('grid', IntegerType(), True),
    StructField('laps', IntegerType(), True),
    StructField('milliseconds', IntegerType(), True),
    StructField('number', IntegerType(), True),
    StructField('points', FloatType(), True),
    StructField('position', IntegerType(), True),
    StructField('positionOrder', IntegerType(), True),
    StructField('positionText', StringType(), True),
    StructField('raceId', IntegerType(), True),
    StructField('rank', IntegerType(), True),
    StructField('resultId', IntegerType(), True),
    StructField('statusId', IntegerType(), True),
    StructField('time', StringType(), True)

])

df = spark.read.schema(results_schema).json(str(results_bronze_path))


In [23]:
final_df = df.select(
    col('resultId').alias('result_id'),
    col('raceId').alias('race_id'),
    col('driverId').alias('driver_id'),
    col('constructorId').alias('constructor_id'),
    col('number'),
    col('grid'),
    col('position'),
    col('positionText').alias('position_text'),
    col('positionOrder').alias('position_order'),
    col('points'),
    col('laps'),
    col('time'),
    col('milliseconds'),
    col('fastestLap').alias('fastest_lap'),
    col('rank'),
    col('fastestLapTime').alias('fastest_lap_time'),
    col('fastestLapSpeed').alias('fastest_lap_speed')
) \
.withColumn('ingestion_date', current_timestamp())

In [24]:
final_df.write.mode('overwrite').partitionBy('race_id').parquet(str(results_silver_path))

                                                                                