## Ingest circuits.csv file

1. Read the csv file using the spark dataframe reader

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col, lit

In [0]:
#dbutils.widgets.help()
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")
#v_data_source

In [0]:
dbutils.widgets.text("p_file_date", "2021-03-21")
file_date= dbutils.widgets.get("p_file_date")

In [0]:
#display(dbutils.fs.mounts())

In [0]:
#dbutils.fs.ls('/mnt/formula1jf/raw')

In [0]:
circuits_schema = StructType(
    fields=[
        StructField("circuitId", IntegerType(), False), 
        StructField("circuitRef", StringType(), True), 
        StructField("name", StringType(), True), 
        StructField("location", StringType(), True), 
        StructField("country", StringType(), True), 
        StructField("lat", DoubleType(), True),
        StructField("lng", DoubleType(), True),
        StructField("alt", IntegerType(), True),
        StructField("url", StringType(), True)
    ]
)

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/common_functions"

In [0]:
#circuits_df = spark.read.csv('dbfs:/mnt/formula1jf/raw/circuits.csv')
#circuits_df = spark.read.option("header", True).csv('dbfs:/mnt/formula1jf/raw/circuits.csv')~
#circuits_df = spark.read.option("header", True).option("inferSchema", True).csv('dbfs:/mnt/formula1jf/raw/circuits.csv')
circuits_df = spark.read.option("header", True).schema(circuits_schema).csv(f'{raw_folder_path}/{file_date}/circuits.csv')

In [0]:
# type(circuits_df)
# circuits_df.show()
# display(circuits_df)
# circuits_df.printSchema()
# circuits_df.describe().show()



2. Select only the required columns

In [0]:
## select the columns
circuits_selected_df = circuits_df.select("circuitId","name","location","country","lat","lng","alt")
#circuits_selected_df = circuits_df.select(circuits_df.circuitId,circuits_df.name,circuits_df.location,circuits_df.country,circuits_df.lat,circuits_df.lng,circuits_df.alt)
#circuits_selected_df = circuits_df.select(circuits_df["circuitId"], circuits_df["name"])

##apply changes, like column name change 
#circuits_selected_df = circuits_df.select(col("circuitId"), col("name"), col("location"), col("country").alias("race_country"), col("lat"), col("lng"), col("alt"))
#display(circuits_selected_df)

3. Rename the columns

In [0]:
circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId", "circuit_id") \
    .withColumnRenamed("circuitRef", "circuit_ref") \
    .withColumnRenamed("lat", "latitude") \
    .withColumnRenamed("lng", "longitude") \
    .withColumnRenamed("alt", "altitude") \
    .withColumn("data_source", lit(v_data_source)) \
    .withColumn("file_date", lit(file_date))

In [0]:
#display(circuits_renamed_df)



4. Add ingestion date to the dataframe

In [0]:
circuits_final_df = add_ingestion_date(circuits_renamed_df)
#circuits_final_df = circuits_renamed_df.withColumn("ingestion_date", current_timestamp()).withColumn("env", lit("Production"))
#display(circuits_final_df)

5. Write data to datalake as parquet

In [0]:
#circuits_final_df.write.parquet("/mnt/formula1jf/processed/circuits")
#circuits_final_df.write.mode("overwrite").parquet(f"{processed_folder_path}/circuits")



In [0]:
# %fs
# ls mnt/formula1jf/processed/circuits



In [0]:
# df = spark.read.parquet("/mnt/formula1jf/processed/circuits")
# display(df)



In [0]:
#dbutils.notebook.exit("Success")



## 6. Write data to database

In [0]:
#circuits_final_df.write.mode("overwrite").format("parquet").saveAsTable("f1_processed.circuits")
circuits_final_df.write.format("delta").mode("overwrite").saveAsTable("f1_processed.circuits")

In [0]:
dbutils.notebook.exit("Success")



In [0]:
#display(spark.read.parquet(f"{processed_folder_path}/circuits"))

