In [None]:
import pandas as pd
import numpy as np
import lightgbm as lgb
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

class GasAnomalyDetector:
    """
    Predictive analytics pipeline for detecting gas usage anomalies
    """

    def __init__(self):
        self.model = None
        self.feature_columns = []
        self.scaler = None

    def load_data(self, meter_logs_path, units_registry_path, anomaly_log_path=None):
        """
        Load all datasets and merge
        """
        # Load meter logs
        self.meter_logs = pd.read_csv(meter_logs_path)
        self.meter_logs['datetime'] = pd.to_datetime(
            self.meter_logs['date'] + ' ' + self.meter_logs['time']
        )

        # Load units registry
        self.units = pd.read_csv(units_registry_path)

        # Load anomaly labels (ground truth)
        if anomaly_log_path:
            self.anomalies = pd.read_csv(anomaly_log_path)
            self.anomalies['start_time'] = pd.to_datetime(self.anomalies['start_time'])
        else:
            self.anomalies = None

        print(f"Loaded {len(self.meter_logs)} meter readings")
        print(f"Loaded {len(self.units)} units")
        if self.anomalies is not None:
            print(f"Loaded {len(self.anomalies)} labeled anomalies")

    def create_labels(self):
        """
        Create binary labels from anomaly log (ground truth)
        """
        self.meter_logs['label'] = 0
        self.meter_logs['anomaly_type'] = None

        if self.anomalies is None:
            print("No anomaly labels provided - unsupervised mode")
            return

        # Mark anomalous periods
        for _, anomaly in self.anomalies.iterrows():
            mask = (
                (self.meter_logs['date'].str.contains(anomaly['meter_id'])) &
                (self.meter_logs['datetime'] >= anomaly['start_time']) &
                (self.meter_logs['datetime'] < anomaly['start_time'] +
                 pd.Timedelta(hours=anomaly['duration_hours']))
            )
            self.meter_logs.loc[mask, 'label'] = 1
            self.meter_logs.loc[mask, 'anomaly_type'] = anomaly['type']

        print(f"Labeled data: {self.meter_logs['label'].sum()} anomalous hours, "
              f"{(self.meter_logs['label']==0).sum()} normal hours")
        print(f"Anomaly rate: {self.meter_logs['label'].mean()*100:.2f}%")

    def engineer_features(self):
        """
        Create ML features from raw meter data
        """
        print("\nEngineering features...")

        # Merge with units registry
        df = self.meter_logs.merge(
            self.units[['meter_id', 'home_sqft', 'occupancy']],
            left_on='date',  # Assuming 'date' contains meter_id info
            right_on='meter_id',
            how='left'
        )

        # Time-based features
        df['hour'] = df['datetime'].dt.hour
        df['day_of_week'] = df['datetime'].dt.dayofweek
        df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)
        df['month'] = df['datetime'].dt.month
        df['is_winter'] = df['month'].isin([12, 1, 2]).astype(int)

        # Usage pattern features
        df = df.sort_values(['date', 'datetime'])

        # Rolling statistics (by meter)
        df['usage_ma_3h'] = df.groupby('date')['usage_therms'].transform(
            lambda x: x.rolling(3, min_periods=1).mean()
        )
        df['usage_ma_6h'] = df.groupby('date')['usage_therms'].transform(
            lambda x: x.rolling(6, min_periods=1).mean()
        )
        df['usage_ma_24h'] = df.groupby('date')['usage_therms'].transform(
            lambda x: x.rolling(24, min_periods=1).mean()
        )
        df['usage_std_6h'] = df.groupby('date')['usage_therms'].transform(
            lambda x: x.rolling(6, min_periods=1).std()
        ).fillna(0)
        df['usage_std_24h'] = df.groupby('date')['usage_therms'].transform(
            lambda x: x.rolling(24, min_periods=1).std()
        ).fillna(0)

        # Delta features
        df['usage_delta'] = df.groupby('date')['usage_therms'].diff().fillna(0)
        df['usage_delta_abs'] = df['usage_delta'].abs()

        # Consecutive non-zero hours (for continuous leak detection)
        df['is_nonzero'] = (df['usage_therms'] > 0).astype(int)
        df['nonzero_run'] = df.groupby('date')['is_nonzero'].transform(
            lambda x: x.rolling(6, min_periods=1).sum()
        )

        # Temperature correlation (if temp available)
        if 'temp' in df.columns:
            df['temp_usage_interaction'] = df['temp'] * df['usage_therms']
            df['heating_expected'] = (df['temp'] < 65).astype(int)
            df['usage_vs_temp'] = np.where(
                df['heating_expected'] == 1,
                df['usage_therms'] / (70 - df['temp'].clip(upper=65)),
                0
            )

        # Household context features
        df['usage_per_sqft'] = df['usage_therms'] / df['home_sqft']
        df['usage_per_person'] = df['usage_therms'] / df['occupancy']

        # Time-of-day anomaly features
        df['is_overnight'] = df['hour'].between(0, 5).astype(int)
        df['is_cooking_time'] = df['hour'].isin([7, 11, 18, 19]).astype(int)
        df['unusual_time_usage'] = np.where(
            (df['is_overnight'] == 1) & (df['usage_therms'] > 0.05),
            1, 0
        )

        # Statistical anomaly score (burst detection)
        df['zscore_24h'] = np.where(
            df['usage_std_24h'] > 0,
            (df['usage_therms'] - df['usage_ma_24h']) / df['usage_std_24h'],
            0
        )
        df['is_statistical_outlier'] = (df['zscore_24h'].abs() > 3).astype(int)

        self.features_df = df

        self.feature_columns = [
            'hour', 'day_of_week', 'is_weekend', 'month', 'is_winter',
            'usage_therms', 'usage_ma_3h', 'usage_ma_6h', 'usage_ma_24h',
            'usage_std_6h', 'usage_std_24h', 'usage_delta', 'usage_delta_abs',
            'nonzero_run', 'usage_per_sqft', 'usage_per_person',
            'is_overnight', 'is_cooking_time', 'unusual_time_usage',
            'zscore_24h', 'is_statistical_outlier',
            'home_sqft', 'occupancy'
        ]

        if 'temp' in df.columns:
            self.feature_columns.extend(['temp', 'heating_expected', 'usage_vs_temp'])

        print(f"Created {len(self.feature_columns)} features")
        return self.features_df

    def train_model(self, test_size=0.2):
        """
        Train LightGBM anomaly detection model
        """
        print("\nTraining anomaly detection model...")

        # Prepare data
        X = self.features_df[self.feature_columns].fillna(0)
        y = self.features_df['label']

        # Train/test split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=42, stratify=y
        )

        # Handle class imbalance
        scale_pos_weight = (y_train == 0).sum() / (y_train == 1).sum()

        # LightGBM parameters
        params = {
            'objective': 'binary',
            'metric': 'auc',
            'boosting_type': 'gbdt',
            'num_leaves': 31,
            'learning_rate': 0.05,
            'feature_fraction': 0.8,
            'bagging_fraction': 0.8,
            'bagging_freq': 5,
            'scale_pos_weight': scale_pos_weight,
            'verbose': -1
        }

        # Train
        train_data = lgb.Dataset(X_train, label=y_train)
        valid_data = lgb.Dataset(X_test, label=y_test, reference=train_data)

        self.model = lgb.train(
            params,
            train_data,
            num_boost_round=200,
            valid_sets=[train_data, valid_data],
            valid_names=['train', 'valid'],
            callbacks=[lgb.early_stopping(stopping_rounds=20)]
        )

        # Evaluate
        y_pred_proba = self.model.predict(X_test, num_iteration=self.model.best_iteration)
        y_pred = (y_pred_proba > 0.5).astype(int)

        print("\n" + "="*60)
        print("MODEL PERFORMANCE")
        print("="*60)
        print(classification_report(y_test, y_pred, target_names=['Normal', 'Anomaly']))
        print(f"\nROC-AUC Score: {roc_auc_score(y_test, y_pred_proba):.4f}")

        # Confusion matrix
        cm = confusion_matrix(y_test, y_pred)
        print("\nConfusion Matrix:")
        print(cm)

        # Feature importance
        self.plot_feature_importance()

        return X_test, y_test, y_pred_proba

    def plot_feature_importance(self, top_n=20):
        """
        Visualize most important features
        """
        importance = self.model.feature_importance(importance_type='gain')
        feature_importance = pd.DataFrame({
            'feature': self.feature_columns,
            'importance': importance
        }).sort_values('importance', ascending=False)

        plt.figure(figsize=(10, 8))
        plt.barh(feature_importance['feature'].head(top_n),
                 feature_importance['importance'].head(top_n))
        plt.xlabel('Importance (Gain)')
        plt.title(f'Top {top_n} Most Important Features')
        plt.gca().invert_yaxis()
        plt.tight_layout()
        plt.savefig('feature_importance.png', dpi=300, bbox_inches='tight')
        print("\n✓ Feature importance plot saved as 'feature_importance.png'")

    def detect_anomalies_by_type(self, threshold=0.5):
        """
        Classify detected anomalies by type using rule-based logic
        """
        print("\nClassifying anomaly types...")

        # Get predictions
        X = self.features_df[self.feature_columns].fillna(0)
        anomaly_scores = self.model.predict(X, num_iteration=self.model.best_iteration)

        self.features_df['anomaly_score'] = anomaly_scores
        self.features_df['predicted_anomaly'] = (anomaly_scores > threshold).astype(int)

        # Classify by type using rule-based logic
        def classify_anomaly_type(row):
            if row['predicted_anomaly'] == 0:
                return 'normal'

            # Continuous use leak
            if row['nonzero_run'] >= 6 and row['usage_std_6h'] < 0.02:
                return 'continuous_leak'

            # Burst leak
            if row['zscore_24h'] > 3 and row['usage_therms'] > 0.8:
                return 'burst_leak'

            # Time anomaly
            if row['unusual_time_usage'] == 1:
                return 'time_anomaly'

            # Stove left on (evening + sustained)
            if (row['hour'] >= 17) and (row['nonzero_run'] >= 3) and (row['usage_therms'] > 0.3):
                return 'stove_left_on'

            return 'general_anomaly'

        self.features_df['predicted_type'] = self.features_df.apply(classify_anomaly_type, axis=1)

        # Summary
        type_counts = self.features_df[
            self.features_df['predicted_anomaly'] == 1
        ]['predicted_type'].value_counts()

        print("\nPredicted Anomaly Types:")
        print(type_counts)

        return self.features_df

    def score_to_snowflake(self, output_path='anomaly_scores.csv'):
        """
        Export anomaly scores for Snowflake ingestion
        """
        output = self.features_df[[
            'date', 'datetime', 'usage_therms',
            'anomaly_score', 'predicted_anomaly', 'predicted_type'
        ]].copy()

        output.columns = ['meter_id', 'timestamp', 'usage_therms',
                         'score', 'is_anomaly', 'anomaly_type']

        output.to_csv(output_path, index=False)
        print(f"\n✓ Anomaly scores exported to '{output_path}'")
        print("Ready for Snowflake ingestion into ANOMALY_SCORES_GAS table")

    def visualize_predictions(self, meter_id=None, days=7):
        """
        Visualize predictions for a specific meter
        """
        if meter_id:
            df = self.features_df[self.features_df['date'].str.contains(meter_id)]
        else:
            df = self.features_df

        df = df.head(days * 24)  # First N days

        fig, axes = plt.subplots(3, 1, figsize=(15, 10), sharex=True)

        # Usage pattern
        axes[0].plot(df['datetime'], df['usage_therms'], label='Actual Usage', alpha=0.7)
        axes[0].fill_between(df['datetime'], 0, df['usage_therms'],
                             where=(df['predicted_anomaly']==1),
                             color='red', alpha=0.3, label='Predicted Anomaly')
        axes[0].set_ylabel('Usage (therms/hour)')
        axes[0].set_title('Gas Usage Pattern with Anomaly Detection')
        axes[0].legend()
        axes[0].grid(alpha=0.3)

        # Anomaly score
        axes[1].plot(df['datetime'], df['anomaly_score'], color='orange', label='Anomaly Score')
        axes[1].axhline(y=0.5, color='red', linestyle='--', label='Threshold')
        axes[1].set_ylabel('Anomaly Score')
        axes[1].set_title('Model Confidence Score')
        axes[1].legend()
        axes[1].grid(alpha=0.3)

        # Temperature (if available)
        if 'temp' in df.columns:
            axes[2].plot(df['datetime'], df['temp'], color='blue', label='Temperature')
            axes[2].set_ylabel('Temperature (°F)')
            axes[2].set_xlabel('Time')
            axes[2].set_title('Temperature Context')
            axes[2].legend()
            axes[2].grid(alpha=0.3)

        plt.tight_layout()
        plt.savefig('anomaly_predictions.png', dpi=300, bbox_inches='tight')
        print("\n✓ Prediction visualization saved as 'anomaly_predictions.png'")

# Example usage
if __name__ == "__main__":
    # Initialize detector
    detector = GasAnomalyDetector()

    # Load data
    detector.load_data(
        meter_logs_path='data/synthetic/meter_logs_synthetic.csv',
        units_registry_path='data/synthetic/units_gas_synthetic.csv',
        anomaly_log_path='data/synthetic/anomaly_log.csv'
    )

    # Create labels from ground truth
    detector.create_labels()

    # Engineer features
    detector.engineer_features()

    # Train model
    X_test, y_test, y_pred = detector.train_model(test_size=0.2)

    # Detect and classify anomalies
    detector.detect_anomalies_by_type(threshold=0.5)

    # Export scores for Snowflake
    detector.score_to_snowflake('anomaly_scores_for_snowflake.csv')

    # Visualize results
    detector.visualize_predictions(days=7)

    print("\n" + "="*60)
    print("PREDICTIVE ANALYTICS PIPELINE COMPLETE")
    print("="*60)