### step1 ==> reading csv file

In [0]:
dbutils.widgets.help()

In [0]:
dbutils.widgets.text("p_data_source", "")
v_data_source = dbutils.widgets.get("p_data_source")

In [0]:
%run "/Users/nishchalvaishnav2403@gmail.com/Formula1-project/includes/configuration"

In [0]:
%run "/Users/nishchalvaishnav2403@gmail.com/Formula1-project/includes/common_functions"

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType


circuits_schema = StructType(fields=[StructField("circuitId", IntegerType(), False),
                                     StructField("circuitRef", StringType(),True),
                                     StructField("name",StringType(), True),
                                     StructField("location",StringType(), True),
                                     StructField("country", StringType(), True),
                                     StructField("lat", DoubleType(), True),
                                     StructField("lng", DoubleType(), True),
                                     StructField("alt", IntegerType(), True),
                                     StructField("url", StringType(), True)])

In [0]:
circuits_df = spark.read \
    .schema(circuits_schema) \
    .option("header", True) \
    .csv(f"{raw_folder_path}/circuits.csv")

In [0]:
display(circuits_df)

### Step2 ==> Select the Required Columns

In [0]:
# circuits_selected_df = circuits_df.select('circuitId','circuitRef','name', 'location', 'country', 'lat', 'lng', 'alt',)
# here in this method we can not apply any column functions

In [0]:
import pyspark.sql.functions as f

In [0]:
# with this method and other two method we can apply column functions
# pyspark.sql.functions as f <<<<<<<----------------
circuits_selected_df = circuits_df.select(f.col('circuitId'), f.col('circuitRef'), f.col('name'), f.col('location'), f.col('country'), f.col('lat'), f.col('lng'), f.col('alt'))

### Step3 ==> Renaming columns as per the requirement

In [0]:
from pyspark.sql.functions import lit
circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId", "circuits_id") \
    .withColumnRenamed("circuitRef", "circuits_ref") \
    .withColumnRenamed("lat", "latitude") \
    .withColumnRenamed("lng", "logitude") \
    .withColumnRenamed("alt", "altitude") \
    .withColumn("data_source", lit(v_data_source))

### Step 4 --> Add "ingestion date" column to the dataframe

In [0]:
circuits_final_df = add_ingestion_date(circuits_renamed_df)

In [0]:
circuits_final_df.display(20)

### Step5 ==> Write data to DataLake as Parquet File

In [0]:
circuits_final_df.write.mode("overwrite").parquet(f"{processed_folder_path}/circuits")

In [0]:
dbutils.notebook.exit("Success")