In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, to_timestamp, concat
from config_file import raw_path, processed_path
from common_func import add_timestamp

In [2]:
spark = SparkSession.builder.appName('ETL').getOrCreate()
spark

## Results Data Ingestion

In [3]:
result_df = spark.read.json(f"{raw_path}/results.json")
result_df.show()

+-------------+--------+----------+---------------+--------------+----+----+------------+------+------+--------+-------------+------------+------+----+--------+--------+-----------+
|constructorId|driverId|fastestLap|fastestLapSpeed|fastestLapTime|grid|laps|milliseconds|number|points|position|positionOrder|positionText|raceId|rank|resultId|statusId|       time|
+-------------+--------+----------+---------------+--------------+----+----+------------+------+------+--------+-------------+------------+------+----+--------+--------+-----------+
|            1|       1|        39|          218.3|      1:27.452|   1|  58|     5690616|    22|  10.0|       1|            1|           1|    18|   2|       1|       1|1:34:50.616|
|            2|       2|        41|        217.586|      1:27.739|   5|  58|     5696094|     3|   8.0|       2|            2|           2|    18|   3|       2|       1|     +5.478|
|            3|       3|        41|        216.719|      1:28.090|   7|  58|     5698779| 

In [4]:
result_df.printSchema()

root
 |-- constructorId: long (nullable = true)
 |-- driverId: long (nullable = true)
 |-- fastestLap: string (nullable = true)
 |-- fastestLapSpeed: string (nullable = true)
 |-- fastestLapTime: string (nullable = true)
 |-- grid: long (nullable = true)
 |-- laps: long (nullable = true)
 |-- milliseconds: string (nullable = true)
 |-- number: string (nullable = true)
 |-- points: double (nullable = true)
 |-- position: string (nullable = true)
 |-- positionOrder: long (nullable = true)
 |-- positionText: string (nullable = true)
 |-- raceId: long (nullable = true)
 |-- rank: string (nullable = true)
 |-- resultId: long (nullable = true)
 |-- statusId: long (nullable = true)
 |-- time: string (nullable = true)



In [5]:
result_df = result_df.withColumnRenamed('constructorId','constructor_Id')\
                    .withColumnRenamed('driverId','driver_Id')\
                    .withColumnRenamed('raceId','race_Id')\
                    .withColumnRenamed('resultId','result_Id')\
                    .withColumnRenamed('statusId','status_Id')\
                    .withColumnRenamed('fastestLap','fastest_Lap')\
                    .withColumnRenamed('fastestLapSpeed','fastest_Lap_Speed')\
                    .withColumnRenamed('fastestLapTime','fastest_Lap_Time')\
                    .withColumnRenamed('positionOrder','position_Order')\
                    .withColumnRenamed('positionText','position_Text')

In [6]:
result_df = add_timestamp(result_df)

In [7]:
result_df = result_df.withColumn('fastest_Lap',result_df.fastest_Lap.cast('int'))\
                    .withColumn('fastest_Lap_Speed',result_df.fastest_Lap_Speed.cast('float'))\
                    .withColumn('number',result_df.number.cast('int'))\
                    .withColumn('position',result_df.position.cast('int'))\
                    .withColumn('rank',result_df.rank.cast('int'))\
                    .withColumn('milliseconds',result_df.milliseconds.cast('int'))

In [8]:
result_df.printSchema()

root
 |-- constructor_Id: long (nullable = true)
 |-- driver_Id: long (nullable = true)
 |-- fastest_Lap: integer (nullable = true)
 |-- fastest_Lap_Speed: float (nullable = true)
 |-- fastest_Lap_Time: string (nullable = true)
 |-- grid: long (nullable = true)
 |-- laps: long (nullable = true)
 |-- milliseconds: integer (nullable = true)
 |-- number: integer (nullable = true)
 |-- points: double (nullable = true)
 |-- position: integer (nullable = true)
 |-- position_Order: long (nullable = true)
 |-- position_Text: string (nullable = true)
 |-- race_Id: long (nullable = true)
 |-- rank: integer (nullable = true)
 |-- result_Id: long (nullable = true)
 |-- status_Id: long (nullable = true)
 |-- time: string (nullable = true)
 |-- ingestion_Date: timestamp (nullable = false)



In [9]:
result_df = result_df.drop('status_Id')

In [10]:
result_df.write.mode('overwrite').partitionBy('race_Id').parquet(f"{processed_path}\Results")