In [0]:
import requests
import json

# Set up the necessary variables
databricks_instance = ""  # Replace with your actual Databricks instance URL
token = ""  # Ensure this is your actual token 
headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}

# Create the secret scope with initial_manage_principal set to "users"
scope_name = "f1Scope"
create_scope_url = f"{databricks_instance}/api/2.0/secrets/scopes/create"
data = {
    "scope": scope_name,
    "initial_manage_principal": "users"
}

response = requests.post(create_scope_url, headers=headers, data=json.dumps(data))

# Check for errors and print the response
if response.status_code == 200:
    print("Secret scope created successfully.")
else:
    print(f"Failed to create secret scope: {response.status_code}")
    print(response.json())

In [0]:
# List all secret scopes
display(dbutils.secrets.listScopes())

# List all secrets in the specific scope
display(dbutils.secrets.list("f1Scope"))

In [0]:
storage_account_name = ""
storage_account_key = dbutils.secrets.get(scope="f1Scope", key="f1-key")

wasbs_source = f""

configs = {
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key
}

dbutils.fs.mount(
    source=wasbs_source,
    mount_point="/mnt/raw-data",
    extra_configs=configs
)

display(dbutils.fs.mounts())

In [0]:
# 1) Define your storage account
storage_account_name = ""

# 2) Pull the real key from Key Vault
storage_account_key = dbutils.secrets.get(
    scope = "f1Scope",    # exactly your scope name
    key   = "f1-Key"       # exactly your secret name
)

# 3) Base64 key
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    storage_account_key
)

# 2) Build WASBS URI
wasbs_path = ""

# 3) Read it
lap_times_df = (
    spark.read
         .format("csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .load(wasbs_path+"/lap_times.csv")
)

drivers_df = (
    spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(wasbs_path+"/drivers.csv")
)

races_df = (
    spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(wasbs_path+"/races.csv")
)

circuits_df = (
    spark.read
        .format("csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(wasbs_path+"/circuits.csv")
)


In [0]:
from pyspark.sql import functions as F

#Join races to get year and circuitId
lap_race_df = lap_times_df.join(races_df.select("raceId", "circuitId", "year"),
                              on="raceId", how="inner", )

lap_race_df = lap_race_df.filter(lap_race_df["year"] == 2024)
lap_race_df = lap_race_df.join(circuits_df.select("circuitId", "circuitRef","location", "name", "country"),
                               on="circuitId", how="inner")

overview_table = lap_race_df.join(drivers_df.select("driverId", "driverRef"),
                                  on="driverId", how="inner")

overview_table = overview_table.drop("milliseconds")

overview_table = overview_table.withColumn(
    "lap_sec", (F.split("time", ":")[0].cast("int") * 60 + F.split("time", ":")[1].cast("int")))
overview_table = overview_table.drop("time")
overview_table.show()



In [0]:
from pyspark.sql import functions as F
overview_table.describe(["lap_sec"]).show()

probs = [0.01, 0.25, 0.5, 0.75, 0.90, 0.99]

# 3) Call approxQuantile with a small relative error (e.g. 0.001 for high accuracy)
quantiles = overview_table.approxQuantile(
    "lap_sec",   # column
    probs,       # list of probabilities
    0.001        # relativeError: 0.001 = very accurate
)

for p, q in zip(probs, quantiles):
    print(f"{int(p*100)}th percentile = {q:.3f} seconds")

min_bound, max_bound = 60.0, 200.0

overview_table = overview_table.filter((F.col("lap_sec") >= min_bound) & (F.col("lap_sec") <= max_bound))

overview_table.describe(["lap_sec"]).show()




In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

circuit_stats = overview_table.groupBy("circuitId").agg(F.avg("lap_sec").alias("circuit_mean"))

norm_df = (
    overview_table
        .join(circuit_stats, on="circuitId", how="left")
        .withColumn("lap_diff", F.col("lap_sec") - F.col("circuit_mean"))
        .drop("lap_sec")
)

drv_indexer = StringIndexer(inputCol="driverId", outputCol="drv_idx")
cir_indexer = StringIndexer(inputCol="circuitId", outputCol="cir_idx")

assembler = VectorAssembler(
    inputCols=["drv_idx", "cir_idx", "lap"],
    outputCol="features"
)

prep_pipeline = Pipeline(stages=[drv_indexer, cir_indexer, assembler])
prepared_df = prep_pipeline.fit(norm_df).transform(norm_df)

prepared_df.select(
    "driverRef", "circuitRef", "lap", "features", "lap_diff"
).show(5, truncate=False)

train_df, test_df = prepared_df.randomSplit([0.8, 0.2], seed=42)


In [0]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(
    labelCol="lap_diff",
    featuresCol="features",
    numTrees=50,
    maxDepth=5)
    

In [0]:
from pyspark.ml import Pipeline

model_pipeline = Pipeline(stages=[
    drv_indexer,    
    cir_indexer,
    assembler,
    rf
])

In [0]:
# 1) Split the *prepared* data
train_prep, test_prep = prepared_df.randomSplit([0.8, 0.2], seed=42)

# 2) Just fit the RandomForestRegressor directly
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(labelCol="lap_diff", featuresCol="features", numTrees=50, maxDepth=5)
rf_model = rf.fit(train_prep)

# 3) Predict and evaluate
preds = rf_model.transform(test_prep)
evaluator = RegressionEvaluator(labelCol="lap_diff", predictionCol="prediction", metricName="rmse")
print("Test RMSE =", evaluator.evaluate(preds))


Test RMSE = 8.616095730284968


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_rmse = RegressionEvaluator(
    labelCol="lap_diff", 
    predictionCol="prediction", 
    metricName="rmse"
)

evaluator_r2 = RegressionEvaluator(
    labelCol="lap_diff", 
    predictionCol="prediction", 
    metricName="r2"
)

rmse = evaluator_rmse.evaluate(preds)
r2 = evaluator_r2.evaluate(preds)

print(f"RMSE: {rmse:.3f} seconds")
print(f"R2: {r2:.3f}")

RMSE: 8.616 seconds
R2: 0.216


In [0]:
importances = rf_model.featureImportances.toArray()

for name, imp in zip(["driver","circuit","lap_number"], importances):
    print(f"{name:12s}: {imp:.3f}")