In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('results').getOrCreate()

In [16]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [17]:
results_schema = StructType([StructField("resultId", IntegerType(), False),
                             StructField("raceId", IntegerType(), False),
                             StructField("driverId", IntegerType(), False),
                             StructField("constructorId", IntegerType(), False),
                             StructField("number", IntegerType(), False),
                             StructField("grid", IntegerType(), False),
                             StructField("position", IntegerType(), False),
                             StructField("positionText", StringType(), False),
                             StructField("positionOrder", IntegerType(), False),
                             StructField("points", FloatType(), False),
                             StructField("laps", IntegerType(), False),
                             StructField("time", StringType(), False),
                             StructField("milliseconds", IntegerType(), False),
                             StructField("fastestLap", IntegerType(), False),
                             StructField("rank", IntegerType(), False),
                             StructField("fastestLapTime", StringType(), False),
                             StructField("fastestLapSpeed", FloatType(), False),
                             StructField("statusId", StringType(), False)])

In [18]:
results_df = spark.read.json('../../raw/results.json', schema=results_schema)

In [19]:
results_df.show(10)

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|  10.0|  58|1:34:50.616|     5690616|        39|   2|      1:27.452|          218.3|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|   8.0|  58|     +5.478|     5696094|        41|   3|      1:27.739|        217.586|       1|
|       3|    18|       3|            3|     7|   7|       3|           3|            3|  

In [20]:
from pyspark.sql.functions import current_timestamp

In [21]:
results_df = results_df.withColumnRenamed('resultId', 'result_id') \
                        .withColumnRenamed('raceId', 'race_id') \
                        .withColumnRenamed('driverId', 'driver_id') \
                        .withColumnRenamed('constructorId', 'constructor_id') \
                        .withColumnRenamed('positionText', 'position_text') \
                        .withColumnRenamed('positionOrder', 'position_order') \
                        .withColumnRenamed('fastestLapTime', 'fastest_lap_time') \
                        .withColumnRenamed('fastestLapSpeed', 'fastest_lap_speed') \
                        .withColumn('ingestion_date', current_timestamp()) \
                        .drop('statusId')

In [22]:
results_df.show(5)

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+----------+----+----------------+-----------------+--------------------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|       time|milliseconds|fastestLap|rank|fastest_lap_time|fastest_lap_speed|      ingestion_date|
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+----------+----+----------------+-----------------+--------------------+
|        1|     18|        1|             1|    22|   1|       1|            1|             1|  10.0|  58|1:34:50.616|     5690616|        39|   2|        1:27.452|            218.3|2022-03-14 16:51:...|
|        2|     18|        2|             2|     3|   5|       2|            2|             2|   8.0|  58|     +5.478|     5696094|        41|   3|        1:27.739|          217.586|20

In [23]:
results_df.write.mode('overwrite').parquet('../../processed/results')

# Test
results = spark.read.parquet('../../processed/results/')
results.show(5)

+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+----------+----+----------------+-----------------+--------------------+
|result_id|race_id|driver_id|constructor_id|number|grid|position|position_text|position_order|points|laps|       time|milliseconds|fastestLap|rank|fastest_lap_time|fastest_lap_speed|      ingestion_date|
+---------+-------+---------+--------------+------+----+--------+-------------+--------------+------+----+-----------+------------+----------+----+----------------+-----------------+--------------------+
|        1|     18|        1|             1|    22|   1|       1|            1|             1|  10.0|  58|1:34:50.616|     5690616|        39|   2|        1:27.452|            218.3|2022-03-14 16:51:...|
|        2|     18|        2|             2|     3|   5|       2|            2|             2|   8.0|  58|     +5.478|     5696094|        41|   3|        1:27.739|          217.586|20