# Databricks notebook source

 # Simplified Mining Operations Data Generation
 
 This notebook generates focused synthetic data for mining operations efficiency analysis.
 
 ## Core Data Entities:
 - **Truck Productivity Events**: Operational events with delays and breakdowns (80% productive, 20% non-productive)
 - **Truck Cycles**: Loading and dumping transactions with timing data (no source/destination references)
 - **Time Usage Model**: Event categorization for operational efficiency analysis
 
 ## Geographic Context:
 - **Location**: Pilbara region of Western Australia
 - **Coordinates**: Latitude -21.5° to -23.5°S, Longitude 115.0° to 125.0°E
 - **Coverage**: Major mining areas including Karratha, Port Hedland, Newman, and surrounding regions
 
 ## Data Volume:
 - 50 trucks
 - 30 days of operational data
 - 30-60 events per truck per day (67,500 total events estimated)
 - Realistic breakdown/delay patterns (3% breakdown, 12% delay, 85% productive)
 - Time usage model with operational efficiency categorization
 
 ## Realistic Data Features:
 - **Payload Utilization**: 80-110% of truck capacity (realistic loading scenarios)
 - **Truck Capacities**: 280-500 tonnes (realistic mining truck range)
 - **Geographic Accuracy**: Pilbara region coordinates for authentic mining context

In [None]:
from pyspark.sql.functions import current_timestamp, col, when, isnan, isnull
from pyspark.sql.types import *
import random
from builtins import round as py_round, min as py_min, max as py_max
from datetime import datetime, timedelta
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Spark session is already available in Databricks

# Set the Unity Catalog context
spark.sql("USE CATALOG mining_operations")
spark.sql("USE SCHEMA production")

In [None]:
# Configuration parameters
NUM_TRUCKS = 50
DAYS_TO_GENERATE = 30

# Variable activity parameters for realistic daily variation
MIN_EVENTS_PER_DAY = 800   # Low activity days
MAX_EVENTS_PER_DAY = 2000  # High activity days
BASE_EVENTS_PER_DAY = 1400 # Average baseline

# Productivity parameters (adjusted for more realistic distribution)
PRODUCTIVE_EVENT_PROBABILITY = 0.85  # 85% of events are productive
BREAKDOWN_PROBABILITY = 0.03         # 3% breakdowns (1-5% range)
DELAY_PROBABILITY = 0.12             # 12% delays (10-15% range)

# Pilbara region coordinate bounds (Western Australia)
PILBARA_LAT_MIN = -23.5   # Southern boundary
PILBARA_LAT_MAX = -21.5   # Northern boundary
PILBARA_LON_MIN = 115.0   # Western boundary
PILBARA_LON_MAX = 125.0   # Eastern boundary

# Payload utilization parameters
MIN_PAYLOAD_UTILIZATION = 0.80  # 80% of truck capacity
MAX_PAYLOAD_UTILIZATION = 1.10  # 110% of truck capacity (overloaded)

# Date range
START_DATE = datetime.now() - timedelta(days=DAYS_TO_GENERATE)
END_DATE = datetime.now()

print(f"Generating {DAYS_TO_GENERATE} days of simplified mining operations data")
print(f"Date range: {START_DATE.strftime('%Y-%m-%d')} to {END_DATE.strftime('%Y-%m-%d')}")
print(f"Variable events per day: {MIN_EVENTS_PER_DAY}-{MAX_EVENTS_PER_DAY}")
print(f"Productivity breakdown: {PRODUCTIVE_EVENT_PROBABILITY*100:.0f}% productive, {BREAKDOWN_PROBABILITY*100:.0f}% breakdowns, {DELAY_PROBABILITY*100:.0f}% delays")
print(f"Geographic region: Pilbara, Western Australia")
print(f"Coordinate bounds: Lat {PILBARA_LAT_MIN}° to {PILBARA_LAT_MAX}°S, Lon {PILBARA_LON_MIN}° to {PILBARA_LON_MAX}°E")
print(f"Payload utilization: {MIN_PAYLOAD_UTILIZATION*100:.0f}% to {MAX_PAYLOAD_UTILIZATION*100:.0f}% of truck capacity")
# Calculate estimated total events (30-60 events per truck per day)
avg_events_per_truck_per_day = 45  # Average of 30-60
estimated_total_events = DAYS_TO_GENERATE * NUM_TRUCKS * avg_events_per_truck_per_day
print(f"Estimated total events: {estimated_total_events:,} (45 events per truck per day average)")

In [None]:
# Generate simplified truck data
def generate_trucks():
    truck_data = []
    
    truck_types = ["CAT 797F", "Komatsu 930E", "Liebherr T 284", "Belaz 75710", "Hitachi EH5000"]
    
    for i in range(NUM_TRUCKS):
        truck_type = random.choice(truck_types)
        
        truck_data.append({
            "truck_id": f"T{i+1:03d}",
            "truck_type": truck_type,
            "capacity_tonnes": random.randint(280, 500),
            "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        })
    
    return spark.createDataFrame(truck_data)

trucks_df = generate_trucks()
print(f"✅ Generated {trucks_df.count()} trucks")
trucks_df.show(5)

In [None]:
# Generate truck cycles (simplified transactions)
def generate_truck_cycles():
    cycles = []
    
    # Create truck capacity mapping for realistic payload generation
    truck_capacities = {}
    for i in range(NUM_TRUCKS):
        truck_id = f"T{i+1:03d}"
        # Use same capacity generation logic as trucks
        truck_capacities[truck_id] = random.randint(280, 500)
    
    # Generate timestamps for the entire period
    current_time = START_DATE
    
    while current_time < END_DATE:
        # Determine daily activity volume
        day_of_week = current_time.weekday()
        
        # Weekend effect (reduced activity)
        if day_of_week >= 5:  # Saturday, Sunday
            daily_multiplier = random.uniform(0.6, 0.8)
        else:
            daily_multiplier = random.uniform(0.8, 1.2)
        
        # Calculate daily cycle count
        base_daily_cycles = int(BASE_EVENTS_PER_DAY * daily_multiplier)
        daily_cycles = py_max(MIN_EVENTS_PER_DAY, 
                             py_min(MAX_EVENTS_PER_DAY, base_daily_cycles))
        
        print(f"Day {current_time.strftime('%Y-%m-%d')}: {daily_cycles} cycles")
        
        # Generate cycles for the day
        for cycle_num in range(daily_cycles):
            # Distribute cycles throughout the day
            minute_in_day = random.randint(0, 1439)
            timestamp = current_time + timedelta(minutes=minute_in_day)
            
            # Randomly select truck
            truck_id = f"T{random.randint(1, NUM_TRUCKS):03d}"
            
            # Generate cycle timing
            loading_duration = random.randint(5, 15)
            travel_duration = random.randint(15, 45)
            dumping_duration = random.randint(3, 10)
            total_cycle_duration = loading_duration + travel_duration + dumping_duration
            
            # Calculate timestamps
            loading_start = timestamp
            loading_end = loading_start + timedelta(minutes=loading_duration)
            travel_start = loading_end
            travel_end = travel_start + timedelta(minutes=travel_duration)
            dumping_start = travel_end
            dumping_end = dumping_start + timedelta(minutes=dumping_duration)
            
            # Generate payload based on truck capacity (80-110% of capacity)
            truck_capacity = truck_capacities[truck_id]
            payload_utilization = random.uniform(MIN_PAYLOAD_UTILIZATION, MAX_PAYLOAD_UTILIZATION)
            payload_tonnes = int(truck_capacity * payload_utilization)
            
            cycles.append({
                "cycle_id": f"CYC{timestamp.strftime('%Y%m%d%H%M')}{random.randint(1000, 9999)}",
                "truck_id": truck_id,
                "payload_tonnes": payload_tonnes,
                "loading_start_time": loading_start.strftime("%Y-%m-%d %H:%M:%S"),
                "loading_end_time": loading_end.strftime("%Y-%m-%d %H:%M:%S"),
                "travel_start_time": travel_start.strftime("%Y-%m-%d %H:%M:%S"),
                "travel_end_time": travel_end.strftime("%Y-%m-%d %H:%M:%S"),
                "dumping_start_time": dumping_start.strftime("%Y-%m-%d %H:%M:%S"),
                "dumping_end_time": dumping_end.strftime("%Y-%m-%d %H:%M:%S"),
                "total_cycle_duration_minutes": total_cycle_duration,
                "cycle_status": "Completed",
                "created_at": timestamp.strftime("%Y-%m-%d %H:%M:%S")
            })
        
        current_time += timedelta(days=1)
    
    return spark.createDataFrame(cycles)

print("�� Generating truck cycles data...")
truck_cycles_df = generate_truck_cycles()
print(f"✅ Generated {truck_cycles_df.count():,} truck cycles")
truck_cycles_df.show(5)

In [None]:
# Generate truck productivity events (80% productive with occasional issues)
def generate_truck_productivity_events():
    events = []
    
    # Event types with realistic probabilities (80% productive)
    productive_events = [
        "Loading", "Traveling", "Dumping", "Available", "En Route"
    ]
    
    breakdown_events = [
        "Engine Failure", "Hydraulic Issue", "Tire Puncture", "Electrical Problem", 
        "Transmission Issue", "Brake Malfunction", "Fuel System Problem"
    ]
    
    delay_events = [
        "Weather Delay", "Traffic Delay", "Loading Delay", "Dumping Delay", 
        "Maintenance Delay", "Driver Break", "Safety Check"
    ]
    
    # Generate events for each truck throughout the day
    current_time = START_DATE
    
    while current_time < END_DATE:
        for truck_id in range(1, NUM_TRUCKS + 1):
            # Generate 30-60 events per truck per day
            num_events = random.randint(30, 60)
            
            for event in range(num_events):
                # Random time during the day
                event_time = current_time + timedelta(
                    hours=random.randint(0, 23),
                    minutes=random.randint(0, 59)
                )
                
                # Determine event type based on probabilities (85% productive, 3% breakdown, 12% delay)
                rand = random.random()
                if rand < PRODUCTIVE_EVENT_PROBABILITY:  # 85% productive events
                    # Productive event
                    event_type = random.choice(productive_events)
                    duration_minutes = random.randint(5, 60)
                    event_category = "Productive"
                elif rand < PRODUCTIVE_EVENT_PROBABILITY + BREAKDOWN_PROBABILITY:  # 3% breakdown events
                    # Breakdown event
                    event_type = random.choice(breakdown_events)
                    duration_minutes = random.randint(30, 240)
                    event_category = "Breakdown"
                else:  # 12% delay events
                    # Delay event
                    event_type = random.choice(delay_events)
                    duration_minutes = random.randint(15, 120)
                    event_category = "Delay"
                
                events.append({
                    "event_id": f"EVT{event_time.strftime('%Y%m%d%H%M')}{random.randint(1000, 9999)}",
                    "truck_id": f"T{truck_id:03d}",
                    "event_timestamp": event_time.strftime("%Y-%m-%d %H:%M:%S"),
                    "event_type": event_type,
                    "event_category": event_category,
                    "duration_minutes": duration_minutes,
                    "event_details": f"{event_type} - {event_category}",
                    # Pilbara region coordinates (Western Australia)
                    # Latitude: -21.5 to -23.5 (South), Longitude: 115.0 to 125.0 (East)
                    "location_x": py_round(random.uniform(PILBARA_LON_MIN, PILBARA_LON_MAX), 6),  # Longitude (East)
                    "location_y": py_round(random.uniform(PILBARA_LAT_MIN, PILBARA_LAT_MAX), 6),  # Latitude (South)
                    "created_at": event_time.strftime("%Y-%m-%d %H:%M:%S")
                })
        
        current_time += timedelta(days=1)
    
    return spark.createDataFrame(events)

print("🔄 Generating truck productivity events (85% productive, 3% breakdown, 12% delay)...")
truck_productivity_events_df = generate_truck_productivity_events()
print(f"✅ Generated {truck_productivity_events_df.count():,} productivity events")
truck_productivity_events_df.show(5)

In [None]:
# Generate simplified time usage model
def generate_time_usage_model():
    time_usage_data = []
    
    # Simplified time usage categories matching our event types
    time_usage_categories = [
        # Productive Events
        ("Loading", "Productive", "Operations", "Loading materials into truck", 90.0, True),
        ("Traveling", "Productive", "Operations", "Moving between locations", 85.0, True),
        ("Dumping", "Productive", "Operations", "Unloading materials at destination", 90.0, True),
        ("Available", "Productive", "Standby", "Truck available for assignment", 100.0, True),
        ("En Route", "Productive", "Operations", "Traveling to destination", 85.0, True),
        
        # Breakdown Events
        ("Engine Failure", "Breakdown", "Mechanical", "Engine mechanical failure", 0.0, False),
        ("Hydraulic Issue", "Breakdown", "Mechanical", "Hydraulic system problem", 0.0, False),
        ("Tire Puncture", "Breakdown", "Mechanical", "Tire damage requiring repair", 0.0, False),
        ("Electrical Problem", "Breakdown", "Mechanical", "Electrical system malfunction", 0.0, False),
        ("Transmission Issue", "Breakdown", "Mechanical", "Transmission system failure", 0.0, False),
        ("Brake Malfunction", "Breakdown", "Mechanical", "Brake system problem", 0.0, False),
        ("Fuel System Problem", "Breakdown", "Mechanical", "Fuel system malfunction", 0.0, False),
        
        # Delay Events
        ("Weather Delay", "Delay", "External", "Weather-related operational delay", 0.0, False),
        ("Traffic Delay", "Delay", "External", "Traffic congestion delay", 0.0, False),
        ("Loading Delay", "Delay", "Operational", "Loading process delay", 0.0, False),
        ("Dumping Delay", "Delay", "Operational", "Dumping process delay", 0.0, False),
        ("Maintenance Delay", "Delay", "Planned", "Scheduled maintenance delay", 0.0, False),
        ("Driver Break", "Delay", "Personal", "Driver rest and meal break", 0.0, False),
        ("Safety Check", "Delay", "Safety", "Safety inspection and check", 0.0, False)
    ]
    
    for event_type, category, subcategory, description, target_efficiency, is_productive in time_usage_categories:
        time_usage_data.append({
            "event_type": event_type,
            "category": category,
            "subcategory": subcategory,
            "description": description,
            "target_efficiency_percentage": target_efficiency,
            "is_productive": is_productive,
            "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        })
    
    return spark.createDataFrame(time_usage_data)

print("🔄 Generating simplified time usage model...")
time_usage_model_df = generate_time_usage_model()
print(f"✅ Generated {time_usage_model_df.count()} time usage model records")
time_usage_model_df.show(10)

In [None]:
# Save simplified data to Unity Catalog volume as parquet files
print("💾 Saving simplified data to Unity Catalog volume...")

# Configuration for volume path
CATALOG_NAME = "mining_operations"
SCHEMA_NAME = "production"
VOLUME_NAME = "raw_data"
VOLUME_PATH = f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/{VOLUME_NAME}"

# Save trucks data
trucks_df.write.mode("overwrite").parquet(f"{VOLUME_PATH}/trucks")
print(f"✅ Saved trucks data: {trucks_df.count()} records")

# Save truck cycles data
truck_cycles_df.write.mode("overwrite").parquet(f"{VOLUME_PATH}/truck_cycles")
print(f"✅ Saved truck cycles data: {truck_cycles_df.count():,} records")

# Save truck productivity events data
truck_productivity_events_df.write.mode("overwrite").parquet(f"{VOLUME_PATH}/truck_productivity_events")
print(f"✅ Saved truck productivity events data: {truck_productivity_events_df.count():,} records")

# Save time usage model data
time_usage_model_df.write.mode("overwrite").parquet(f"{VOLUME_PATH}/time_usage_model")
print(f"✅ Saved time usage model data: {time_usage_model_df.count()} records")

print("\n🎉 All simplified data successfully saved to Unity Catalog volume!")
print(f"Volume location: {CATALOG_NAME}.{SCHEMA_NAME}.{VOLUME_NAME}")

In [None]:
# Show simplified data summary
print("\n📊 Simplified Data Summary:")
print(f"Trucks: {trucks_df.count()}")
print(f"Truck Cycles: {truck_cycles_df.count():,}")
print(f"Productivity Events: {truck_productivity_events_df.count():,}")
print(f"Time Usage Model: {time_usage_model_df.count()}")
print(f"Total records: {trucks_df.count() + truck_cycles_df.count() + truck_productivity_events_df.count() + time_usage_model_df.count():,}")

# Show productivity breakdown
print(f"\n📈 Productivity Breakdown:")
print(f"Productive events: ~{PRODUCTIVE_EVENT_PROBABILITY*100:.0f}%")
print(f"Breakdown events: ~{BREAKDOWN_PROBABILITY*100:.0f}%")
print(f"Delay events: ~{DELAY_PROBABILITY*100:.0f}%")
print(f"Events per truck per day: 30-60 (average: 45)")

# Show geographic information
print(f"\n🗺️ Geographic Context:")
print(f"Region: Pilbara, Western Australia")
print(f"Coordinate bounds: Latitude {PILBARA_LAT_MIN}° to {PILBARA_LAT_MAX}°S")
print(f"Longitude bounds: {PILBARA_LON_MIN}° to {PILBARA_LON_MAX}°E")
print(f"Coverage: Major mining areas including Karratha, Port Hedland, Newman")

# Show payload utilization information
print(f"\n🚛 Payload Utilization:")
print(f"Range: {MIN_PAYLOAD_UTILIZATION*100:.0f}% to {MAX_PAYLOAD_UTILIZATION*100:.0f}% of truck capacity")
print(f"Truck capacity range: 280-500 tonnes")
print(f"Expected payload range: {int(280*MIN_PAYLOAD_UTILIZATION)}-{int(500*MAX_PAYLOAD_UTILIZATION)} tonnes")