### Step 1: Mount to the storage

In [0]:
dbutils.widgets.text("param_data_source", "")
var_data_source = dbutils.widgets.get("param_data_source")

In [0]:
dbutils.widgets.text("param_file_date", "2021-03-21") # based on the name of the subfolder in blob storage
var_file_date = dbutils.widgets.get("param_file_date")

In [0]:
%run "../includes/configuration"

In [0]:
%run "../includes/utils"

In [0]:

mounting_all_containers(STORAGE_ACCOUNT, ACCOUNT_KEY)

### Step 2: read the csv file

In [0]:
circuits_df = spark.read.option("header", True).csv(f"{RAW_FOLDER_PATH}/{var_file_date}/circuits.csv")

In [0]:
display(circuits_df.collect()[0:5]) # display the first 5 rows

In [0]:
circuits_df.printSchema()

In [0]:
#  automatically detect column data types
circuits_df = spark.read.option("header", True).csv(f"{RAW_FOLDER_PATH}/{var_file_date}/circuits.csv", inferSchema = True)
circuits_df.printSchema()

### Define the schema

In [0]:
# however, working on production level, you want to specify your schema -> define the type on your own 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType


In [0]:
circuits_schema = StructType([StructField("circuitId", IntegerType(), False), # StructType represents your row
                              StructField("circuitRef", StringType(), True),
                              StructField("name", StringType(), True),
                              StructField("location", StringType(), True),
                              StructField("country", StringType(), True),
                              StructField("lat", DoubleType(), True),
                              StructField("lng", DoubleType(), True),
                              StructField("alt", IntegerType(), True),
                              StructField("url", StringType(), True)
                            ])



In [0]:
circuits_df = spark.read \
    .option("header", True) \
    .schema(circuits_schema) \
    .csv(f"{RAW_FOLDER_PATH}/{var_file_date}/circuits.csv")

In [0]:
display(circuits_df.printSchema())

In [0]:
selected_circuits_df = circuits_df.select("circuitId", "circuitRef", "name", "location", "country", "lat", "lng", "alt") # first way 


# second way that lets you rename the col
# from pyspark.sql import col
# selected_circuits_df = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country"), 
#                                           col("lat").alias("latitude"), col("lng").alias("longitude"), col("alt").alias("altitude"))


In [0]:
# drop the URL column as we dont need it 
# circuits_df = circuits_df.drop("url")

### Rename the columns using the API 

In [0]:
from pyspark.sql.functions import current_timestamp, lit, date_trunc, trunc,from_utc_timestamp

In [0]:
renamed_circuits_df = selected_circuits_df.withColumnRenamed("circuitId", "circuit_id") \
                                .withColumnRenamed("circuitRef", "circuit_ref") \
                                .withColumnRenamed("lat", "latitude") \
                                .withColumnRenamed("lng", "longitude") \
                                .withColumnRenamed("alt", "altitude") \
                                .withColumn("data_source", lit(var_data_source)) \
                                .withColumn("file_date", lit(var_file_date))

### Adding a new column `ingestion date` to df

In [0]:
# Ref: https://garygregory.wordpress.com/2013/06/18/what-are-the-java-timezone-ids/

final_circuits_df = add_ingestion_date(renamed_circuits_df)

In [0]:
display(final_circuits_df.collect()[0:5])

In [0]:
# print(final_circuits_df.select("ingestion_date").first())

### Write to ADL as parquet file

In [0]:
final_circuits_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.circuits")

In [0]:
%sql
SELECT COUNT(1) FROM f1_processed.circuits  

### Now we need to do with race file
#### rename columns
#### transformed the race_timestamp 
#### add new column 'ingestion_date'
#### make sure the columns are in correct data type 


### Read csv file

In [0]:
races_df = spark.read.option("header", True).csv(f"{RAW_FOLDER_PATH}/{var_file_date}/races.csv")


### Define the schema

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

races_schema = StructType([StructField("raceId", IntegerType(), False), 
                              StructField("year", IntegerType(), True),
                              StructField("round", IntegerType(), True),
                              StructField("circuitId", IntegerType(), False),
                              StructField("name", StringType(), True),
                              StructField("date", DateType(), True),
                              StructField("time", StringType(), True)
                            ])

races_df = spark.read \
    .option("header", True) \
    .schema(races_schema) \
    .csv(f"{RAW_FOLDER_PATH}/{var_file_date}/races.csv")


### Select and rename dataframe

In [0]:
selected_races = races_df.select("raceId", "year", "round", "circuitId", "name", "date", "time")

In [0]:
renamed_races_df = selected_races.withColumnRenamed("raceId", "race_id") \
                                .withColumnRenamed("circuitId", "circuit_id") \
                                .withColumnRenamed("year", "race_year")


### Transform a new column (`race_timestamp`) which is the combination of `date` and `time` 

In [0]:
from pyspark.sql.functions import concat, col, to_timestamp, from_utc_timestamp, lit, date_trunc, current_timestamp

In [0]:
transformed_races_df = renamed_races_df \
                            .withColumn("race_timestamp", 
                                                to_timestamp(
                                                    concat(col("date"), lit(" "), col("time")), "yyyy-MM-dd HH:mm:ss")) \
                            .withColumn("ingestion_date", date_trunc("second", 
                                            from_utc_timestamp(current_timestamp(), "Europe/Amsterdam"))) \
                            .withColumn("data_source", lit(var_data_source)) \
                            .withColumn("file_date", lit(var_file_date))
                                       
                                       
display(transformed_races_df)

In [0]:
transformed_races_df.write.mode("overwrite").format("delta").saveAsTable("f1_processed.races")

In [0]:
%sql
SELECT COUNT(1) FROM f1_processed.races

In [0]:
dbutils.notebook.exit("Success")