# Ingest Circuit.csv file from the source folder

In [0]:
client_id=dbutils.secrets.get(scope="formula1-scope",key="client-id")
tenant_id=dbutils.secrets.get(scope="formula1-scope",key="tenant-id")
client_secret=dbutils.secrets.get(scope="formula1-scope",key="client-secret")

In [0]:
spark.conf.set("fs.azure.account.auth.type.databrickspracticesa.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.databrickspracticesa.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.databrickspracticesa.dfs.core.windows.net", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.databrickspracticesa.dfs.core.windows.net", client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.databrickspracticesa.dfs.core.windows.net",f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
display(dbutils.fs.ls("abfss://raw@databrickspracticesa.dfs.core.windows.net"))

In [0]:
display(dbutils.fs.ls("abfss://processed@databrickspracticesa.dfs.core.windows.net/race"))

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DoubleType


In [0]:
race_schema=StructType(fields=[StructField("raceId",IntegerType(),False),
                               StructField("year",IntegerType(),True),
                               StructField("round",IntegerType(),True),
                               StructField("circuitId",IntegerType(),True),
                               StructField("name",StringType(),True),
                               StructField("date",StringType(),True),
                               StructField("time",StringType(),True),
                               StructField("url",StringType(),True)])

In [0]:

#race_df=spark.read.csv("abfss://raw@databrickspracticesa.dfs.core.windows.net/races.csv",header=True,inferSchema=True)
race_df=spark.read.csv("abfss://raw@databrickspracticesa.dfs.core.windows.net/races.csv",header=True,schema=race_schema)



In [0]:
race_df.printSchema()
race_df.display()
race_df.describe()


In [0]:
type(race_df)
race_df.show();

# Select the requried Columns that are required


In [0]:
race_select_df=race_df.select("raceId","year","round","circuitId","name","date","time")
race_select_df.show()


In [0]:
race_select_df=race_df.select(race_df.raceId,race_df.year,race_df.round,race_df.circuitId,race_df.name,race_df.date,race_df.time)
display(race_select_df.limit(10))

In [0]:
race_select_df=race_df.select(race_df["raceId"],race_df["year"],race_df["round"],race_df["circuitId"],race_df["name"],race_df["date"],race_df["time"])
display(race_select_df.limit(10))

In [0]:
from pyspark.sql.functions import col;
from pyspark.sql.functions import current_timestamp;

In [0]:
race_select_df=race_df.select(col("raceId"),col("year"),col("round"),col("circuitId"),col("name"),col("date"),col("time"))
display(race_select_df.limit(10))

# Rename raceid , year,circuitId to with more meaningful name

In [0]:
race_renamed_df=race_select_df.withColumnRenamed("raceId","race_id") \
                              .withColumnRenamed("year","race_year") \
                              .withColumnRenamed("circuitId","circuit_id") 
display(race_renamed_df.limit(10))


# Add new column which is current date field

In [0]:

display(current_timestamp)

In [0]:
from pyspark.sql.functions import lit;
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import concat


In [0]:
race_final_df=race_renamed_df.withColumn("race_timestamp",to_timestamp(concat(col("date"),lit(" "),col("time")),"yyyy-MM-dd HH:mm:ss")) \
                            .withColumn("ingestion_date",current_timestamp()) \
                            .withColumn("environment",lit("ITG")) 
race_final_df.printSchema()
race_final_df.describe()


In [0]:
display(race_final_df.limit(10))

# Write data to parquet

In [0]:
#Drop unused columns
race_final_df=race_final_df.drop("time") \
                            .drop("date")

In [0]:
#Write without Partition
#race_final_df.write.parquet("abfss://processed@databrickspracticesa.dfs.core.windows.net/race")

#Write with Partition
race_final_df.write.mode("overwrite").partitionBy("race_year").parquet("abfss://processed@databrickspracticesa.dfs.core.windows.net/race")

In [0]:
race_processed_df=spark.read.parquet("abfss://processed@databrickspracticesa.dfs.core.windows.net/race")
display(race_processed_df[race_processed_df["race_year"]=='2021'].limit(10))