## 6. Ingest Results.json file

In [0]:
%fs 
ls /mnt/prisha12/raw/

### 1. reading csv file

In [0]:
### import type
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType
results_schema = StructType([StructField("resultId",IntegerType(),False),
                             StructField("raceId",IntegerType(), True), 
                             StructField("driverId",IntegerType(), True),
                             StructField("constructorId", IntegerType(), True),
                             StructField("number",IntegerType(), True),
                             StructField("grid",IntegerType(), True),
                             StructField("position",IntegerType(), True),
                             StructField("positionText",StringType(), True),
                             StructField("positionOrder",IntegerType(), True),
                             StructField("points",FloatType(), True), 
                             StructField("laps",IntegerType(), True),
                             StructField("time",StringType(), True),
                             StructField("milliseconds",IntegerType(), True),
                             StructField("fastestLap",IntegerType(), True),
                             StructField("rank",IntegerType(), True), 
                             StructField("fastestLapTime",StringType(), True),
                             StructField("fastestLapSpeed",FloatType(), True),
                             StructField("statusId",IntegerType(), True)])

In [0]:
df = spark.read.schema(results_schema).json("dbfs:/mnt/prisha12/raw/results.json")
display(df.limit(10))

In [0]:
display(df.describe())

In [0]:
display(df.printSchema())

In [0]:
df_selected = df.select("resultId","raceId","driverId","constructorId","number","grid","position","positionText","positionOrder","points","laps","time","milliseconds","fastestLap","rank","fastestLapTime","fastestLapSpeed")
display(df_selected.limit(10))

#### 2. adding current timestamp as ingestion_time and renaming fields

In [0]:
from pyspark.sql.functions import current_timestamp
df_renamed = df_selected.withColumnRenamed("resultId","result_id").withColumnRenamed("raceId","race_id").withColumnRenamed("driverId","driver_id").withColumnRenamed("constructorId","constructor_id").withColumnRenamed("positionText","position_text").withColumnRenamed("positionOrder","position_order").withColumnRenamed("fastestLap","fastest_lap").withColumnRenamed("fastestLapTime","fastest_lap_time").withColumnRenamed("fastestLapSpeed","fastest_lap_speed").withColumn("ingestion_date",current_timestamp())
display(df_renamed.limit(10))

In [0]:
#drop race_date and race_time
df_renamed = df_renamed.drop("race_date","race_time")

In [0]:
##adding ingesting timestamp as ingesting date
from pyspark.sql.functions import current_timestamp
df_final = df_renamed.withColumn("ingesting_timestamp",current_timestamp())

In [0]:
### 3. Write data in Parquet format and partitionBy race_id

In [0]:
##write this data into data lake processed
df_renamed.write.mode("overwrite").format("parquet").partitionBy("race_id").save("/mnt/prisha12/processed/results")