## Ingest circuits.csv file

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-21")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

##### Step 1 - Read the CSV file using the spark dataframe reader

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [0]:
circuits_schema = StructType(fields=[StructField("circuitId", IntegerType(), False),
                               StructField("circuitRef", StringType(), True),
                               StructField("name", StringType(), True),
                               StructField("location", StringType(), True),
                               StructField("country", StringType(), True),
                               StructField("lat", DoubleType(), True),
                               StructField("lng", DoubleType(), True),
                               StructField("alt", IntegerType(), True),
                               StructField("url", StringType(), True)])

In [0]:
circuits_df = spark.read \
    .option("header", True) \
    .schema(circuits_schema) \
    .csv(f'{raw_folder_path}/{v_file_date}/circuits.csv')

In [0]:
display(circuits_df)

In [0]:
# circuits_df.printSchema()

In [0]:
# display(circuits_df.describe())

##### Step 2 - Select only the required columns

In [0]:
# circuits_selected_df = circuits_df.select("circuitId", "circuitRef", "name", "location", "country", "lat", "lng", "alt")

In [0]:
# circuits_selected_df = circuits_df.select(circuits_df.circuitId, circuits_df.circuitRef, circuits_df.name, circuits_df.location, circuits_df.country, circuits_df.lat, circuits_df.lng, circuits_df.alt)

In [0]:
# circuits_selected_df = circuits_df.select(circuits_df["circuitId"], circuits_df["circuitRef"], circuits_df["name"], circuits_df["location"], circuits_df["country"], circuits_df["lat"], circuits_df["lng"], circuits_df["alt"])

In [0]:
from pyspark.sql.functions import col

In [0]:
circuits_selected_df = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country"), col("lat"), col("lng"), col("alt"))

In [0]:
display(circuits_selected_df)

##### Step 3 - Rename the columns as required

In [0]:
from pyspark.sql.functions import lit

In [0]:
circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId", "circuit_id") \
                                            .withColumnRenamed("circuitRef", "circuit_ref") \
                                            .withColumnRenamed("lat", "latitude") \
                                            .withColumnRenamed("lng", "longitude") \
                                            .withColumnRenamed("alt", "altitude") \
                                            .withColumn("data_source", lit(v_data_source)) \
                                            .withColumn("file_date", lit(v_file_date))

In [0]:
display(circuits_renamed_df)

##### Step 4 - Add ingestion date to the dataframe

In [0]:
circuits_final_df = add_ingestion_date(circuits_renamed_df)

In [0]:
display(circuits_final_df)

##### Step 5 - Write data to datalake as parquet

In [0]:
circuits_final_df.write.mode("overwrite").format("delta").saveAsTable(f"f1_processed.circuits")

In [0]:
display(dbutils.fs.ls(f"{processed_folder_path}/circuits"))

In [0]:
%sql
SELECT * FROM f1_processed.circuits

In [0]:
dbutils.notebook.exit("Success")