# Formula One Analytics: Driving with Delta

In [0]:
client_id=dbutils.secrets.get(scope="FormulaOneScope",key="clientsecret")
client_secret=dbutils.secrets.get(scope="FormulaOneScope",key="client")
tenant_id=dbutils.secrets.get(scope="FormulaOneScope",key="tenant")\
  

configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": client_id,
  "fs.azure.account.oauth2.client.secret": client_secret,
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/"+tenant_id+"/oauth2/token"
}

dbutils.fs.mount(
  source = "abfss://bronzelayer@foneprojectstorage.dfs.core.windows.net/",
  mount_point = "/mnt/mybronze_mount",
  extra_configs = configs)

In [0]:
dbutils.secrets.list("FormulaOneScope")

In [0]:
df = spark.read.option('inferSchema', 'true').option('header', 'true').csv("/mnt/mybronze_mount/my_rawdata/my_project_data.csv")

display(df)

In [0]:
print(df.printSchema())

In [0]:
df.show()

In [0]:
from pyspark.sql.functions import *

null = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null.show()

In [0]:
df = df.dropna(subset=['time'], how='any')

In [0]:
display(df.show(5))

In [0]:
df.select("grid", "finish_position", "points", "qual_position", "q_best", "q_mean").describe().display()


In [0]:

from pyspark.sql.functions import col, when, count

df = df.drop('position_change')

df = df.dropDuplicates()

null = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null.show()

In [0]:
from pyspark.sql.functions import col, sum as spark_sum

df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()


In [0]:
from pyspark.sql.types import IntegerType

# List of numerical columns that need to be converted
numerical_columns = ['points', 'grid', 'q_best', 'q_worst', 'q_mean']

# Convert columns to IntegerType (if they are in string format)
for column in numerical_columns:
    df = df.withColumn(column, df[column].cast(IntegerType()))


In [0]:
from pyspark.sql import functions as F

# Fill numerical columns with median (if they are still null)
for column in numerical_columns:
    median_value = df.approxQuantile(column, [0.5], 0.0)[0]  # Get the median
    df = df.fillna({column: median_value})


In [0]:
categorical_columns = ['weather', 'status', 'constructor', 'driverId']

for column in categorical_columns:
    mode_value = df.groupBy(column).agg(F.count(column).alias('count'))\
        .orderBy(F.desc('count')).first()[0]  # Get the mode
    df = df.fillna({column: mode_value})


In [0]:
# Check for null values in each column
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).display()


In [0]:
df = df.dropna()

In [0]:
display(df.printSchema())

In [0]:
# Convert columns to appropriate types
df = df.withColumn("distance", df["distance"].cast("double")) \
       .withColumn("finish_position", df["finish_position"].cast("double")) \
       .withColumn("points", df["points"].cast("double")) \
       .withColumn("qual_position", df["qual_position"].cast("double")) \
       .withColumn("q_best", df["q_best"].cast("double")) \
       .withColumn("q_worst", df["q_worst"].cast("double")) \
       .withColumn("q_mean", df["q_mean"].cast("double")) \
       .withColumn("lat", df["lat"].cast("double")) \
       .withColumn("long", df["long"].cast("double")) \
       .withColumn("length", df["length"].cast("double"))

# Verify changes
df.printSchema()


In [0]:
df = df.dropna()

In [0]:
from pyspark.sql import functions as F

# Fill missing numerical columns with the mean value
numerical_columns = ['finish_position', 'points', 'qual_position', 'q_best', 'q_worst', 'q_mean']
for column in numerical_columns:
    mean_value = df.agg(F.mean(df[column])).first()[0]
    df = df.fillna({column: mean_value})

# Verify if there are any remaining null values
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()


In [0]:
df = df.drop("_c0")

In [0]:
df = df.withColumn("position_change", F.col("grid") - F.col("finish_position"))

In [0]:
display(df)

In [0]:
df.columns

## Mounting for silverLayer

In [0]:
dbutils.fs.mount(
  source = "abfss://silver@foneprojectstorage.dfs.core.windows.net/",
  mount_point = "/mnt/mysilver_mount",
  extra_configs = configs)

In [0]:
df.write.format("delta").mode("overwrite").save("/mnt/mysilver_mount/clean")

In [0]:
df1 = spark.read.format("delta").load("/mnt/mysilver_mount/clean")
display(df1)

## Transforming the data into Small tables


In [0]:
driver_columns = ['driverId', 'name', 'nationality', 'code', 'dateOfBirth']
driver_df = df.select(*driver_columns).dropDuplicates()
driver_df.show()



In [0]:
race_columns = ['season', 'round', 'race_name', 'circuitId', 'date', 'distance', 'weather']
race_df = df.select(*race_columns).dropDuplicates()
race_df.show()



In [0]:
result_columns = ['season', 'round', 'driverId', 'constructor', 'finish_position', 'grid', 'points', 'qual_position', 'q_best', 'q_worst', 'q_mean', 'time', 'status']
result_df = df.select(*result_columns).dropDuplicates()
result_df.show()



In [0]:
constructor_columns = ['constructor', 'status']
constructor_df = df.select(*constructor_columns).dropDuplicates()
constructor_df.show()


In [0]:
circuit_columns = ['circuitId', 'circuitName', 'lat', 'long', 'locality', 'country', 'type', 'direction', 'length']
circuit_df = df.select(*circuit_columns).dropDuplicates()
circuit_df.show()


In [0]:
position_change_columns = ['season', 'round', 'driverId', 'position_change']
position_change_df = df.select(*position_change_columns).dropDuplicates()
position_change_df.show()


In [0]:
# Save all 3NF tables into Silver Layer as Delta format

silver_base_path = "/mnt/mysilver_mount"

race_df.write.format("delta").mode("overwrite").save(f"{silver_base_path}/race")
driver_df.write.format("delta").mode("overwrite").save(f"{silver_base_path}/driver")
constructor_df.write.format("delta").mode("overwrite").save(f"{silver_base_path}/constructor")
result_df.write.format("delta").mode("overwrite").save(f"{silver_base_path}/result")
circuit_df.write.format("delta").mode("overwrite").save(f"{silver_base_path}/circuit")
position_change_df.write.format("delta").mode("overwrite").save(f"{silver_base_path}/position_change")

## Star Schema

In [0]:
# Load from Silver
race_df = spark.read.format("delta").load("/mnt/mysilver_mount/race")
driver_df = spark.read.format("delta").load("/mnt/mysilver_mount/driver")
constructor_df = spark.read.format("delta").load("/mnt/mysilver_mount/constructor")
result_df = spark.read.format("delta").load("/mnt/mysilver_mount/result")
circuit_df = spark.read.format("delta").load("/mnt/mysilver_mount/circuit")

# Build fact table
fact_race_results = result_df.select(
    "season", "round", "driverId", "constructor", "grid", "time", "finish_position",
    "points", "qual_position", "q_best", "q_worst", "q_mean", "status"
)

# Build dimension tables
dim_race = race_df.select("season", "round", "race_name", "date", "circuitId", "weather")
dim_driver = driver_df.select("driverId", "name", "nationality", "code", "dateOfBirth")
dim_constructor = constructor_df.select("constructor")
dim_circuit = circuit_df.select("circuitId", "circuitName", "lat", "long", "locality", "country", "type", "direction", "length")


## Storing the Star Schema in Gold Layer

In [0]:
dbutils.fs.mount(
  source = "abfss://gold@foneprojectstorage.dfs.core.windows.net/",
  mount_point = "/mnt/mygold_mount",
  extra_configs = configs)

In [0]:
gold_path = "/mnt/mygold_mount/Star_Schema"

fact_race_results.write.format("delta").mode("overwrite").save(f"{gold_path}/fact_race_results")
dim_race.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_race")
dim_driver.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_driver")
dim_constructor.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_constructor")
dim_circuit.write.format("delta").mode("overwrite").save(f"{gold_path}/dim_circuit")


In [0]:
gold_path = "/mnt/mygold_mount/Star_Schema"

fact_race_results = spark.read.format("delta").load(f"{gold_path}/fact_race_results")
dim_race = spark.read.format("delta").load(f"{gold_path}/dim_race")
dim_driver = spark.read.format("delta").load(f"{gold_path}/dim_driver")
dim_constructor = spark.read.format("delta").load(f"{gold_path}/dim_constructor")
dim_circuit = spark.read.format("delta").load(f"{gold_path}/dim_circuit")



### Most Wins

In [0]:
insights_path = "/mnt/mygold_mount/Insights"


In [0]:
from pyspark.sql.functions import col, count

wins_df = fact_race_results.filter(col("finish_position") == 1) \
    .groupBy("driverId") \
    .agg(count("*").alias("wins")) \
    .join(dim_driver, "driverId") \
    .orderBy(col("wins").desc())

wins_df.select("name", "wins").show()


In [0]:
wins_df.write.format("delta").mode("overwrite").save(f"{insights_path}/most_wins_by_driver")

##Average Qualifying Time per Driver

In [0]:
from pyspark.sql.functions import avg

avg_qual_time = fact_race_results.groupBy("driverId") \
    .agg(avg("q_mean").alias("avg_qual_time")) \
    .join(dim_driver, "driverId") \
    .orderBy("avg_qual_time")

avg_qual_time.select("name", "avg_qual_time").show()

avg_qual_time.write.format("delta").mode("overwrite").save(f"{insights_path}/avg_qual_time_by_driver")

##  Who Gained the Most Positions in a Race

In [0]:
from pyspark.sql.functions import desc, max, col

position_gain = fact_race_results.withColumn("position_change", col("grid") - col("finish_position")) \
    .groupBy("driverId") \
    .agg(max("position_change").alias("max_gain")) \
    .join(dim_driver, "driverId") \
    .orderBy(desc("max_gain"))

position_gain.select("name", "max_gain").show()
position_gain.write.format("delta").mode("overwrite").save(f"{insights_path}/max_position_change_by_driver")


## Seasonal Performance – Points by Driver

In [0]:
from pyspark.sql.functions import *

season_points = fact_race_results.groupBy("season", "driverId") \
    .agg(sum("points").alias("season_points")) \
    .join(dim_driver, "driverId") \
    .orderBy("season", col("season_points").desc())

season_points.select("season", "name", "season_points").show(50)
season_points.write.format("delta").mode("overwrite").save(f"{insights_path}/season_points_by_driver")


## Constructors with Most Points

In [0]:
constructor_points = fact_race_results.groupBy("constructor") \
    .agg(sum("points").alias("total_points")) \
    .orderBy(col("total_points").desc())

constructor_points.show()
constructor_points.write.format("delta").mode("overwrite").save(f"{insights_path}/constructor_points")


## Races by Country & Circuit

In [0]:
race_location = dim_race.join(dim_circuit, "circuitId") \
    .select("season", "race_name", "locality", "country") \
    .distinct()

race_location.orderBy("season").show()
race_location.write.format("delta").mode("overwrite").save(f"{insights_path}/race_location")


## Podium Finishes (Top 3) by Driver

In [0]:
podiums = fact_race_results.filter(col("finish_position") <= 3) \
    .groupBy("driverId") \
    .agg(count("*").alias("podium_finishes")) \
    .join(dim_driver, "driverId") \
    .orderBy(col("podium_finishes").desc())

podiums.select("name", "podium_finishes").show()
podiums.write.format("delta").mode("overwrite").save(f"{insights_path}/podiums_by_driver")

## Did Not Finish Counts by Driver

In [0]:
dnf = fact_race_results.filter(col("status") != "Finished") \
    .groupBy("driverId") \
    .agg(count("*").alias("dnf_count")) \
    .join(dim_driver, "driverId") \
    .orderBy(col("dnf_count").desc())

dnf.select("name", "dnf_count").show()
dnf.write.format("delta").mode("overwrite").save(f"{insights_path}/dnf_by_driver")

## First Place Finishes Over the Seasons

In [0]:
wins_per_season = fact_race_results.filter(col("finish_position") == 1) \
    .groupBy("season", "driverId") \
    .agg(count("*").alias("wins")) \
    .join(dim_driver, "driverId") \
    .orderBy("season", "wins", ascending=[True, False])

wins_per_season.select("season", "name", "wins").show()
wins_per_season.write.format("delta").mode("overwrite").save(f"{insights_path}/wins_per_season_by_driver")

## Most Poles (Qualifying Position = 1)

In [0]:
pole_positions = fact_race_results.filter(col("qual_position") == 1) \
    .groupBy("driverId") \
    .agg(count("*").alias("pole_positions")) \
    .join(dim_driver, "driverId") \
    .orderBy(col("pole_positions").desc())

pole_positions.select("name", "pole_positions").show()
pole_positions.write.format("delta").mode("overwrite").save(f"{insights_path}/pole_positions_by_driver")



## Average Points per Race by Driver

In [0]:
from pyspark.sql.functions import avg

avg_points = fact_race_results.groupBy("driverId") \
    .agg(avg("points").alias("avg_points_per_race")) \
    .join(dim_driver, "driverId") \
    .orderBy(col("avg_points_per_race").desc())

avg_points.select("name", "avg_points_per_race").show()
avg_points.write.format("delta").mode("overwrite").save(f"{insights_path}/avg_points_by_driver")

## Top Circuits by Number of Races Held

In [0]:
top_circuits = dim_race.groupBy("circuitId") \
    .agg(count("*").alias("race_count")) \
    .join(dim_circuit, "circuitId") \
    .orderBy(col("race_count").desc())

top_circuits.select("circuitName", "country", "race_count").show()
top_circuits.write.format("delta").mode("overwrite").save(f"{insights_path}/top_circuits")


In [0]:
df.select("weather").distinct().show(truncate=False)


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql import functions as F

def classify_weather(weather):
    weather = weather.lower() if weather else ""
    
    if "rain" in weather or "wet" in weather or "drizzle" in weather or "showers" in weather:
        return "Wet"
    elif "clear" in weather or "sunny" in weather or "dry" in weather:
        return "Dry"
    elif "cloudy" in weather or "overcast" in weather:
        return "Cloudy"
    else:
        return "Mixed"

classify_weather_udf = udf(classify_weather, StringType())
df = df.withColumn("weather_category", classify_weather_udf(df.weather))


##Average Points by Weather Category:

In [0]:
win_avg_wet = df.groupBy("weather_category") \
  .agg(
      F.avg("points").alias("avg_points"),
      F.count("*").alias("num_race_results")
  )

win_avg_wet.show()
win_avg_wet.write.format("delta").mode("overwrite").save(f"{insights_path}/win_avg_wet")

## Win Rate by Weather Category:

In [0]:
win_rate = df.withColumn("won_race", F.when(df.finish_position == 1, 1).otherwise(0)) \
  .groupBy("weather_category") \
  .agg(
      F.sum("won_race").alias("wins"),
      F.count("*").alias("total_races"),
      (F.sum("won_race") / F.count("*") * 100).alias("win_rate_percent")
  ) \
  .orderBy("win_rate_percent", ascending=False) 
win_rate.show()
win_rate.write.format("delta").mode("overwrite").save(f"{insights_path}/win_rate")


## Visualizations

In [0]:
#Most Wins
display(wins_df)

Databricks visualization. Run in Databricks to view.

In [0]:
#Avgerage Points by driver..
display(avg_points)

Databricks visualization. Run in Databricks to view.

In [0]:
#Average Qualification Time
display(avg_qual_time)

Databricks visualization. Run in Databricks to view.

In [0]:
#Pole Positions
display(pole_positions)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
#Season Points
display(season_points)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
#Wins per Season
display(wins_per_season)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
#Top Circuits
display(top_circuits)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
#Win Average by Weather
display(win_avg_wet)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
#Win Rate by Weather Percentage
display(win_rate)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.