In [None]:
import pandas as pd
import numpy as np
from datetime import datetime

# --- AZURE CONFIGURATION ---
STORAGE_ACCOUNT = "beehavenstorage"
CONTAINER_SILVER = "silver"
CONTAINER_GOLD = "gold"

# URI Construction
SILVER_URI = f"abfss://{CONTAINER_SILVER}@{STORAGE_ACCOUNT}.dfs.core.windows.net"
GOLD_URI = f"abfss://{CONTAINER_GOLD}@{STORAGE_ACCOUNT}.dfs.core.windows.net"

def load_silver_data(entity_name):
    """
    Reads the latest Parquet file from the Silver layer for a given entity.
    """
    # In a real scenario, we might read a partitioned folder or specific run ID
    path = f"{SILVER_URI}/{entity_name}/"
    print(f"Loading {entity_name} from {path}...")
    try:
        # This requires the Identity to have RBAC permissions on the Lake
        return pd.read_parquet(path)
    except Exception as e:
        print(f"Warning: Could not read {entity_name}. Error: {e}")
        return pd.DataFrame()

def process_daily_metrics(df_flow, df_weight, df_temp, df_humidity, df_weather):
    """
    Aggregates all disparate time-series into a unified DAILY Hive Health Table.
    """
    print("Aggregating data to Daily frequency...")

    # 1. Standardize Timestamps
    # Ensure all inputs have a datetime index for resampling
    for df in [df_flow, df_weight, df_temp, df_humidity]:
        if not df.empty and 'timestamp' in df.columns:
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            df.set_index('timestamp', inplace=True)

    # 2. Resample & Aggregate (The Core Logic)
    
    # Flow: Sum of flow indicates total activity
    daily_flow = df_flow.resample('D')['flow'].sum().rename("total_flow_count")
    
    # Weight: We care about the CHANGE in weight (Honey production), not just absolute weight
    # We take the max of the day minus the min of the day
    daily_weight = df_weight.resample('D')['weight'].apply(lambda x: x.max() - x.min()).rename("daily_weight_change_kg")
    
    # Temperature: Average is fine, but Max/Min is better for biological stress
    daily_temp = df_temp.resample('D')['temperature'].agg(['mean', 'max', 'min'])
    daily_temp.columns = ['avg_temp_c', 'max_temp_c', 'min_temp_c']

    # Humidity: Average
    daily_humidity = df_humidity.resample('D')['humidity'].mean().rename("avg_humidity_percent")

    # 3. Join with External Weather Data
    # Assuming weather is already daily or needs resampling
    if not df_weather.empty:
        df_weather['timestamp'] = pd.to_datetime(df_weather['timestamp'])
        df_weather.set_index('timestamp', inplace=True)
        daily_weather = df_weather.resample('D').mean() # Simplified for example
    else:
        daily_weather = pd.DataFrame()

    # 4. Merge All into One "Golden Table"
    gold_df = pd.concat([daily_flow, daily_weight, daily_temp, daily_humidity], axis=1)
    
    # Join external weather (Left join because we prioritize our sensor data)
    if not daily_weather.empty:
        gold_df = gold_df.join(daily_weather, rsuffix='_external')

    # 5. Feature Engineering (Metrics for Data Science)
    # Calculate "Efficiency": Flow per degree of temperature
    gold_df['activity_per_degree'] = gold_df['total_flow_count'] / gold_df['avg_temp_c']
    
    return gold_df.reset_index()

def main():
    # 1. Load Data
    df_flow = load_silver_data("flow")
    df_weight = load_silver_data("weight")
    df_temp = load_silver_data("temperature")
    df_humidity = load_silver_data("humidity")
    df_weather = load_silver_data("weather")

    # 2. Transform
    gold_df = process_daily_metrics(df_flow, df_weight, df_temp, df_humidity, df_weather)

    # 3. Write to Gold
    current_date = datetime.now().strftime("%Y-%m-%d")
    output_path = f"{GOLD_URI}/hive_health_daily/hive_health_{current_date}.parquet"
    
    print(f"Writing {len(gold_df)} rows to Gold Layer: {output_path}")
    gold_df.to_parquet(output_path, index=False)
    
    print("Gold transformation complete.")

if __name__ == "__main__":
    main()