###Creating initial resources:
- source data
- pipelines
- dashboards
- genie space
- databricks app


In [0]:
%pip install databricks-sdk>=0.57.0 
dbutils.library.restartPython() 

In [0]:
%run ../00.set_variables

In [0]:
# Create a catalog, schema and volume

spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.{volume_name}")

# Grant all permissions on the catalog to all account users (crude but will avoid downstream issues)
spark.sql(f"GRANT ALL PRIVILEGES ON CATALOG {catalog_name} TO `account users`")

In [0]:
#Create files from kaggle dataset in volumes

import requests
import zipfile
import io

def create_mock_plant_equipment():
    data = [
        ("Spectrophotometer", 1, True),
        ("pH Meter", 2, True),
        ("Oven", 3, True),
        ("Remote Arm", 4, False),
        ("Thermometer", 5, None),
    ]
    
    columns = ["name", "id", "is_active"]
    df_equipment = spark.createDataFrame(data, columns)
    
    return df_equipment

# Download the zip file
url = "https://www.kaggle.com/api/v1/datasets/download/edumagalhaes/quality-prediction-in-a-mining-process"
response = requests.get(url)
zip_file = zipfile.ZipFile(io.BytesIO(response.content))

# Extract the CSV file to a UC volume
csv_filename = "MiningProcess_Flotation_Plant_Database.csv"
zip_file.extract(csv_filename, f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/")

# Define the file path
file_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/MiningProcess_Flotation_Plant_Database.csv"

# Read the CSV file into a DataFrame
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

from pyspark.sql.functions import col, hour, when, concat_ws, lit, avg
from pyspark.sql import functions as F
import random
import pandas as pd

# Rename columns to remove invalid characters
df = df.select([F.col(col).alias(col.replace(" ", "_").replace("%", "Percent")) for col in df.columns])

for col in df.columns:
    if col != "date":
        df = df.withColumn(col, F.regexp_replace(col, r",", ".").cast("double"))

flotation_columns = [
    "date",
    "Starch_Flow",
    "Amina_Flow", 
    "Ore_Pulp_Flow",
    "Ore_Pulp_pH", 
    "Ore_Pulp_Density", 
    "Flotation_Column_01_Air_Flow", 
    "Flotation_Column_02_Air_Flow",
    "Flotation_Column_03_Air_Flow",
    "Flotation_Column_04_Air_Flow",
    "Flotation_Column_05_Air_Flow",
    "Flotation_Column_06_Air_Flow",
    "Flotation_Column_07_Air_Flow",
    "Flotation_Column_01_Level",
    "Flotation_Column_02_Level",
    "Flotation_Column_03_Level",
    "Flotation_Column_04_Level",
    "Flotation_Column_05_Level",
    "Flotation_Column_06_Level",
    "Flotation_Column_07_Level",
]
lab_data_columns = [
    "date",
    "Percent_Iron_Feed",
    "Percent_Silica_Feed",
    "Percent_Iron_Concentrate",
    "Percent_Silica_Concentrate",
]
df_flotation = df.select(*flotation_columns)
df_flotation.write.format("parquet") \
    .mode("overwrite") \
    .save(f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/flotation_data/")

df_lab = df.select(*lab_data_columns)

df_lab_hourly = df_lab.groupBy("date").agg(
    avg("Percent_Iron_Feed").alias("Percent_Iron_Feed"),
    avg("Percent_Silica_Feed").alias("Percent_Silica_Feed"),
    avg("Percent_Iron_Concentrate").alias("Percent_Iron_Concentrate"),
    avg("Percent_Silica_Concentrate").alias("Percent_Silica_Concentrate")
)

# Mock shift operators for PII demo

# 1. List of fake operator names
operator_names = [
    "Alice Johnson", "Ben Carter", "Cindy Lee", "David Smith", "Emma Wright",
    "Frank Miller", "Grace Kim", "Henry Jones", "Isla Clarke", "Jack White"
]

# Broadcast the list and create a shift ID based on date and shift
df_with_shift = df_lab_hourly.withColumn("shift_type", when(
    (hour("date") >= 6) & (hour("date") < 18), "day"
).otherwise("night"))

# Create a shift identifier (e.g., "2024-05-13_day")
df_with_shift = df_with_shift.withColumn("shift_id",
    concat_ws("_", F.to_date("date"), F.col("shift_type"))
)

# Get distinct shifts
distinct_shifts = df_with_shift.select("shift_id").distinct().collect()

# Assign a random operator to each shift
shift_operator_map = {row["shift_id"]: random.choice(operator_names) for row in distinct_shifts}

# Convert to a DataFrame for joining
shift_df = spark.createDataFrame(shift_operator_map.items(), ["shift_id", "operator_name"])

# Join operator name back to the main DataFrame
df_with_operator = df_with_shift.join(shift_df, on="shift_id", how="left").drop("shift_id")

df_with_operator.write.format("parquet") \
    .mode("overwrite") \
    .save(f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/lab_data_hourly/")

#mock equipment with a None isactive field that we can use to demo expecations
df_equipment = create_mock_plant_equipment()
df_equipment.write.format("parquet") \
    .mode("overwrite") \
    .save(f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/equipment/")

#display(df_with_operator)
#display(df_flotation)


In [0]:
%run ./01.create_pipeline

In [0]:
%run ./02.create_dashboard

In [0]:
%run ./model_deploy_jobs/create-deployment-job

In [0]:
%run "../notebooks/01b. Unity Catalog, Governance and Auditability"

In [0]:
# Manually create feature store table ahead of time
gold_df = spark.sql(f"SELECT * FROM {catalog_name}.{schema_name}.gold_iron_ore_prediction_dataset")

# Filter out rows between the specified dates
start_date = "2017-05-12"
end_date = "2017-06-14"
df_filtered = gold_df.filter(~(col("date").between(start_date, end_date)))
start_date = "2017-07-23"
end_date = "2017-08-03"
df_filtered = df_filtered.filter(~(col("date").between(start_date, end_date)))
start_date = "2017-08-07"
end_date = "2017-08-14"
df_filtered = df_filtered.filter(~(col("date").between(start_date, end_date)))
df_filtered = df_filtered.filter(col("Percent_Iron_Feed") > 0)

feature_table_name = "fs_gold_iop_features"

from pyspark.sql.types import TimestampType

df_filtered.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{catalog_name}.{schema_name}.{feature_table_name}")

spark.sql(f"ALTER TABLE {catalog_name}.{schema_name}.{feature_table_name} ALTER COLUMN date SET NOT NULL")
spark.sql(f"ALTER TABLE {catalog_name}.{schema_name}.{feature_table_name} ADD CONSTRAINT pk_date PRIMARY KEY (date)")

In [0]:
%run "../notebooks/03. Model Training and Experimentation" 

In [0]:
%run ./03.create_genie_space

In [0]:
%run ./04.create_app

In [0]:
%run ./05.create_serving_endpoints

In [0]:
%run "../notebooks/05. Optimisation" 

In [0]:
%run ./06.enable_data_classification_and_anomaly_detection