##Read Data

### Steps:
###    1. Read the data 
###    2. Rename columns
###    3. Inner join and then select the columns needed. Can join multiple data frames together in one statement.

In [0]:
from pyspark.sql.functions import *

In [0]:
%run "../includes/configurations"

In [0]:
races_df = spark.read.parquet(f"{processed_folder_path}/races").select(col("race_id"), col("circuit_id"), col("race_year"), col("name").alias("race_name"), col("race_timestamp").alias("race_date"))

circuits_df = spark.read.parquet(f"{processed_folder_path}/circuits").select(col("circuit_id"), col("location").alias("circuit_location"))

drivers_df = spark.read.parquet(f"{processed_folder_path}/drivers").select(col("driver_id"), col("number").alias("driver_number"), col("name").alias("driver_name"), col("nationality").alias("driver_nationality"))

constructors_df = spark.read.parquet(f"{processed_folder_path}/constructors").select(col("constructor_id"), col("name").alias("team"))

results_df = spark.read.parquet(f"{processed_folder_path}/results").select(col("race_id"), col("driver_id"), col("constructor_id"), col("fastest_lap"), col("grid"), col("points"), col("time").alias("race_time"), col("position"))

## Join Data

In [0]:
races_df_with_circuits = races_df.join(circuits_df, "circuit_id", "left").drop("circuit_id")

In [0]:
races_df_with_results = races_df_with_circuits.join(results_df, "race_id")
display(races_df_with_results)

In [0]:
races_df_with_constructors = races_df_with_results.join(constructors_df, "constructor_id")
display(races_df_with_constructors)

In [0]:
races_df_with_drivers = races_df_with_constructors.join(drivers_df, "driver_id")
display(races_df_with_drivers)

In [0]:
final_df = races_df_with_drivers.drop("driver_id", "constructor_id", "race_id").withColumn("created_date", current_timestamp())
display(final_df)

In [0]:
final_df.write.mode("overwrite").parquet(f"{presentation_folder_path}/race_results")

In [0]:
display(final_df.where("race_name = 'Abu Dhabi Grand Prix' and race_year = 2020"))