In [0]:
%run "/Users/ovidiumtoma@gmail.com/wind_turbine_project/src/wt_logger"

In [0]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (mean, stddev, col, when, abs as spark_abs,
                                  lit, pow, monotonically_increasing_id, count, sum as spark_sum,
                                  min as spark_min, max as spark_max, avg as spark_avg, round, window)
from sklearn.ensemble import IsolationForest

class DataTransformer:
    def __init__(self, spark):
        """ Initializes the DataTransformer class with a Spark session and logger. """
        self.spark = spark
        self.logger = LoggerUtility.setup_logging()
        self.logger.info("DataTransformer initialized.")

    def compute_expected_power(self, df: DataFrame) -> DataFrame:
        """ 
        Compute expected power output using a simplified wind power equation.
        
        Formula: P = 0.5 * ρ * A * V³ * Cp
        Where:
        - ρ = 1.2 kg/m³ (air density)
        - A = 5024 m² (swept area for ~100m rotor)
        - Cp = 0.45 (power coefficient)
        - V = wind speed (m/s)
        
        Converts power from watts to MW (dividing by 1,000,000).
        """
        self.logger.info("Computing expected power output...")

        df = df.withColumn(
            "expected_power",
            0.5 * lit(1.2) *
            lit(5024) *
            pow(col("wind_speed"), 3) *
            lit(0.45) / lit(1_000_000)
        )

        self.logger.info("Expected power computation complete.")
        return df

    def detect_zscore_anomalies(self, df: DataFrame) -> DataFrame:
        """
        Detect anomalies using a simple ±2 standard deviation check on the error:
            error = (power_output - expected_power)

        - Compute mean and stddev of error
        - Flag records as "z_anomaly" = 1 if error is outside mean ± 2 * stddev,
          else 0
        """
        self.logger.info("Detecting anomalies using ±2σ on the error (z-score method)...")

        # 1) Create error column
        df = df.withColumn("error", col("power_output") - col("expected_power"))

        # 2) Collect mean and stddev
        stats = (df
                 .select(
                     mean(col("error")).alias("mean_error"),
                     stddev(col("error")).alias("std_error")
                 )
                 .collect()[0])
        mean_err = stats["mean_error"]
        std_err  = stats["std_error"]

        # 3) Flag anomalies outside mean ± 2*std
        threshold = 2.0
        df = df.withColumn(
            "z_anomaly",
            when(
                spark_abs(col("error") - lit(mean_err)) > threshold * lit(std_err),
                lit(1)
            ).otherwise(lit(0))
        )

        self.logger.info("Z-score anomaly detection complete.")
        return df

    def detect_record_anomalies(self, df: DataFrame) -> DataFrame:
        """
        Detects anomalies using Isolation Forest (multivariate).
        
        - Uses wind_speed, expected_power, and power_output as features.
        - Contamination rate is 15% by default (change if needed).
        - Returns a column "iso_anomaly" = 1 if anomaly, else 0.
        """
        self.logger.info("Detecting record-level anomalies with Isolation Forest...")

        # Add unique record ID for join-back
        df = df.withColumn("record_id", monotonically_increasing_id())

        # Convert relevant columns to Pandas
        pdf = (df
               .select("record_id", "wind_speed", "expected_power", "power_output")
               .toPandas()
               )

        # Fit Isolation Forest
        iso_forest = IsolationForest(
            contamination=0.15,  # adjust as needed
            n_estimators=200,
            max_samples='auto',
            random_state=42
        )

        # Predict anomalies
        pdf["iso_score"] = iso_forest.fit_predict(
            pdf[["wind_speed", "expected_power", "power_output"]]
        )

        # Convert -1 to 1 (anomaly), 1 to 0 (normal)
        pdf["iso_score"] = pdf["iso_score"].apply(lambda x: 1 if x == -1 else 0)

        # Prepare a Pandas subset for joining
        anomaly_pdf = pdf[["record_id", "iso_score"]]

        # Convert back to Spark
        anomaly_sdf = self.spark.createDataFrame(anomaly_pdf)

        # Join on record_id
        df = df.join(anomaly_sdf, on="record_id", how="left")

        # Rename to iso_anomaly
        df = df.withColumnRenamed("iso_score", "iso_anomaly")

        self.logger.info("Isolation Forest anomaly detection complete.")
        return df

    def combine_anomalies(self, df: DataFrame) -> DataFrame:
        """
        Create a 'combined_anomaly' column using the OR of z_anomaly and iso_anomaly.
        Keeps all anomaly columns in the final DataFrame.
        """
        self.logger.info("Combining anomalies with OR logic (z_anomaly OR iso_anomaly).")
        df = df.withColumn(
            "combined_anomaly",
            when((col("z_anomaly") == 1) | (col("iso_anomaly") == 1), lit(1)).otherwise(lit(0))
        )
        return df

    def detect_turbine_anomalies(self, df: DataFrame) -> DataFrame:
        """
        Detects turbines with an unusually high number of anomalies.
        
        - Groups by turbine_id and calculates the anomaly rate (based on combined_anomaly).
        - Classification:
            > 60% → FAULTY_SENSOR
            30-60% → REVIEW_REQUIRED
            < 30% → NORMAL
        """
        self.logger.info("Detecting turbines with high anomaly rates...")

        turbine_anomaly_df = (
            df.groupBy("turbine_id")
            .agg(
                spark_sum("combined_anomaly").alias("total_anomalies"),
                count("*").alias("total_records")
            )
            .withColumn("anomaly_rate", col("total_anomalies") / col("total_records"))
        )

        turbine_anomaly_df = turbine_anomaly_df.withColumn(
            "turbine_status",
            when(col("anomaly_rate") > 0.6, "FAULTY_SENSOR")
            .when(col("anomaly_rate") > 0.3, "REVIEW_REQUIRED")
            .otherwise("NORMAL")
        )

        self.logger.info("Turbine anomaly detection complete.")
        return turbine_anomaly_df

    def apply_smart_filtering(self, df: DataFrame, turbine_anomaly_df: DataFrame) -> DataFrame:
        """
        Applies filtering based on turbine anomaly classification:
        
        - FAULTY_SENSOR turbines are removed entirely.
        - REVIEW_REQUIRED turbines remain but can be flagged.
        - NORMAL turbines remain.
        """
        self.logger.info("Applying smart filtering...")

        # Join status onto each record
        df = df.join(turbine_anomaly_df.select("turbine_id", "turbine_status"),
                     on="turbine_id", how="left")

        # Filter out FAULTY_SENSOR turbines
        df = df.filter(col("turbine_status") != "FAULTY_SENSOR")

        self.logger.info("Smart filtering complete.")
        return df
    
    def calculate_summary_statistics(self, df: DataFrame) -> DataFrame:
        """
        Computes summary statistics (min, max, avg power output) per turbine over a 24-hour window.

        Returns a DataFrame with:
        - turbine_id
        - 24-hour window start
        - min power output
        - max power output
        - avg power output
        """

        self.logger.info("Calculating summary statistics over a 24-hour period...")

        summary_df = (
            df.groupBy("turbine_id", window(col("timestamp"), "24 hours"))
            .agg(
                round(spark_min("power_output"), 2).alias("min_power"),
                round(spark_max("power_output"), 2).alias("max_power"),
                round(spark_avg("power_output"), 2).alias("avg_power")
            )
            .select(
                col("turbine_id"),
                col("window.start").alias("window_start"),
                col("min_power"),
                col("max_power"),
                col("avg_power")
            )
        )

        self.logger.info("Summary statistics calculation complete.")
        return summary_df

    def save_turbine_analysis(self, df: DataFrame, table_name: str = "gold_turbine_analysis"):
        """
        Saves the final turbine dataset to the Gold layer.
        
        - Ensures the gold_data schema exists.
        - Saves the dataset as a Delta table.
        """
        self.logger.info(f"Saving results to gold_data.{table_name}")
        self.spark.sql("CREATE SCHEMA IF NOT EXISTS gold_data")
        df.write.mode("overwrite").format("delta").saveAsTable(f"gold_data.{table_name}")
        self.logger.info(f"Successfully saved to gold_data.{table_name}")

    def save_summary_table(self, df: DataFrame, table_name: str = "gold_turbine_summary"):
        """
        Saves summary statistics to the Gold layer.
        """
        self.logger.info(f"Saving summary statistics to gold_data.{table_name}")
        self.spark.sql("CREATE SCHEMA IF NOT EXISTS gold_data")
        df.write.mode("overwrite").format("delta").saveAsTable(f"gold_data.{table_name}")
        self.logger.info(f"Successfully saved summary statistics to gold_data.{table_name}")
