## Data Ingestion and Setup

This block of code sets up the Spark environment and reads in the initial purchase order dataset for anomaly detection.

- **Spark Session**: Starts a Spark session named "AnomalyDetection".
- **Imports**: Brings in essential PySpark functions (e.g. aggregation, transformation) and external libraries (Pandas, NumPy, Scikit-learn) for machine learning.
- **File Loading**: Reads a CSV file from a local data directory as the primary dataset (`main_df`). This setup is adaptable—data sources can be easily swapped for other formats or sources (e.g. Parquet, JDBC).
- **Flexibility**: By using `option("header", True)` and `csv(...)`, the code ensures easy extension to real-world datasets without relying on any company-specific configuration.

This block forms the foundation for subsequent data cleaning and anomaly detection steps.

In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, sum, min, max, countDistinct, count, when, abs, expr, desc, mean, stddev, current_timestamp, lit, udf, struct, first, monotonically_increasing_id, format_number, coalesce, log1p, regexp_replace
from pyspark.sql.types import DoubleType, StringType
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Initialize Spark session
spark = SparkSession.builder.appName("AnomalyDetection").getOrCreate()

# Get widget values
file_pattern = "sample_data.csv"

# Read in table
main_df = spark.read.table(delta_table)
print(f"Using file: {file_pattern} for anomaly detection.")

## Cleaning the `TOTAL_SPEND_USD` Field

This block focuses on cleaning and standardising the `TOTAL_SPEND_USD` field in the dataset, which is crucial for consistent analysis.

- **Step 1**: Removes all characters except digits, dots, and parentheses using regular expressions. This step ensures only numeric values and potential negative indicators (parentheses) remain.
- **Step 2**: Removes any trailing periods that might interfere with numeric conversion.
- **Step 3**: Strips parentheses without yet negating the value. This simplifies further processing.
- **Step 4**: Replaces empty strings with `null` to properly handle missing values in downstream analysis.
- **Step 5**: Casts the cleaned value to `DoubleType`. If the original value had parentheses (indicating a negative value), it multiplies the value by -1 to reflect that negative sign.
- **Step 6**: Drops the original `TOTAL_SPEND_USD` column and renames the cleaned version to preserve the column name consistency.

This systematic cleaning process ensures that the spend data is properly formatted and ready for accurate anomaly detection.

In [None]:
# Cleaning on TOTAL_SPEND_USD

# Step 1: Remove all characters except digits, dots, and parentheses
main_df = main_df.withColumn(
    "TOTAL_SPEND_USD_CLEAN",
    regexp_replace(col("TOTAL_SPEND_USD"), "[^0-9().]", "")
)

# Step 2: Remove trailing period
main_df = main_df.withColumn(
    "TOTAL_SPEND_USD_CLEAN",
    regexp_replace(col("TOTAL_SPEND_USD_CLEAN"), "\\.$", "")
)

# Step 3: Remove parentheses if present (but don’t negate yet)
main_df = main_df.withColumn(
    "TOTAL_SPEND_USD_CLEAN",
    when(
        col("TOTAL_SPEND_USD").rlike("\\(.*\\)"),
        regexp_replace(col("TOTAL_SPEND_USD_CLEAN"), "[()]", "")
    ).otherwise(col("TOTAL_SPEND_USD_CLEAN"))
)

# Step 4: Replace empty strings with null
main_df = main_df.withColumn(
    "TOTAL_SPEND_USD_CLEAN",
    when(col("TOTAL_SPEND_USD_CLEAN") == "", None)
    .otherwise(col("TOTAL_SPEND_USD_CLEAN"))
)

# Step 5: Cast to Double and apply negation if originally in parentheses
main_df = main_df.withColumn(
    "TOTAL_SPEND_USD_CLEAN",
    when(
        col("TOTAL_SPEND_USD").rlike("\\(.*\\)"),
        col("TOTAL_SPEND_USD_CLEAN").cast(DoubleType()) * -1
    ).otherwise(col("TOTAL_SPEND_USD_CLEAN").cast(DoubleType()))
)

# Step 6: Replace original column
main_df = main_df.drop("TOTAL_SPEND_USD") \
                 .withColumnRenamed("TOTAL_SPEND_USD_CLEAN", "TOTAL_SPEND_USD")

## Binning and Visualising Spend Data

This block bins the cleaned spend data (`TOTAL_SPEND_USD`) into predefined ranges and visualises its distribution.

- **Binning**: The `TOTAL_SPEND_USD` column is categorised into bins (e.g. `< $1K`, `$1K–$5K`, etc.) using PySpark’s `when` and `otherwise` functions. This approach helps to summarise spending behaviour at different levels.
- **Grouping and Counting**: The dataset is grouped by spend bins, and the count of records in each bin is computed. The results are sorted in descending order to highlight the most populated bins.
- **Data Conversion**: The grouped Spark DataFrame is converted into Pandas for easy plotting.
- **Visualisation**: A bar chart using Matplotlib shows the distribution of records across spend bins. This helps identify spend concentration and provides an overview of data spread, essential for anomaly detection insights.

This step helps in understanding the data distribution before applying any anomaly detection techniques.

In [None]:
# Binning
import matplotlib.pyplot as plt

# Define spend bins
main_df = main_df.withColumn("Spend_Bin", when(col("TOTAL_SPEND_USD") < 1000, "< $1K")
    .when(col("TOTAL_SPEND_USD") < 5000, "$1K–$5K")
    .when(col("TOTAL_SPEND_USD") < 10000, "$5K–$10K")
    .when(col("TOTAL_SPEND_USD") < 50000, "$10K–$50K")
    .when(col("TOTAL_SPEND_USD") < 100000, "$50K–$100K")
    .otherwise(">= $100K"))

# Group by bin
binned_spend_df = main_df.groupBy("Spend_Bin").count().orderBy("count", ascending=False)

# Convert to pandas df
binned_pd = binned_spend_df.toPandas()

spend_pd = main_df.select("TOTAL_SPEND_USD").dropna().toPandas()

# Bar chart to see distribution of data
plt.figure(figsize=(8, 5))
plt.bar(binned_pd["Spend_Bin"], binned_pd["count"])
plt.xlabel("Spend Bin")
plt.ylabel("Number of Records")
plt.title("Distribution of TOTAL_SPEND_USD")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## Data Aggregation

This block focuses on aggregating transactional data at the purchase order (PO) level, preparing it for anomaly detection.

- **Casting Spend to Numeric**: Ensures that the `TOTAL_SPEND_USD` column is properly cast to a `DoubleType`, which is essential for accurate aggregation.
- **Distinct PO Count**: Counts the number of unique purchase orders in the dataset by considering `PURCHASE_ORDER_NUM` and `SUPPLIER_NAME`. This helps establish a baseline for overall data volume.
- **Total Spend Calculation**: Computes the total spend across all records, providing context for spend distribution analysis.
- **PO-Level Aggregation**: Groups the data by `PURCHASE_ORDER_NUM` and `SUPPLIER_NAME`, aggregating:
  - Number of distinct line items (`LINE_ITEM_COUNT`).
  - Total spend for each PO (`SUM_SPEND_USD`).
  - Earliest and latest purchase and received dates.
- **Natural Log Transformation**: Adds a `LOG_SUM_SPEND_USD` column using a log transformation (`log1p`) to reduce skew in the spend distribution, which is he_

In [None]:
# Aggregate data

# Change spend column to numeric type
main_df = main_df.withColumn("TOTAL_SPEND_USD", col("TOTAL_SPEND_USD").cast(DoubleType()))

# Count distinct POs based on PURCHASE_ORDER_NUM, SUPPLIER_NAME, and DATA_SOURCE
distinct_po_count = main_df.select("PURCHASE_ORDER_NUM", "SUPPLIER_NAME") \
    .distinct() \
    .count()

# Calculate total spend in file
total_spend_in_file = main_df.select(sum(col("TOTAL_SPEND_USD"))).collect()[0][0]

# Print distinct purchase order count and total spend
print(f"Count of distinct POs in file (aggregated by PURCHASE_ORDER_NUM AND SUPPLIER_NAME): {distinct_po_count}")
print(f"Total spend in file: ${total_spend_in_file}")

# PO-Level Aggregation
po_agg_df = main_df.groupBy("PURCHASE_ORDER_NUM", "SUPPLIER_NAME").agg(
    countDistinct(col("PURCHASE_ORDER_LINE_NUM")).alias("LINE_ITEM_COUNT"),
    sum(col("TOTAL_SPEND_USD")).alias("SUM_SPEND_USD"),
    min(col("PURCHASE_ORDER_DATE")).alias("EARLIEST_PO_DATE"),
    max(col("PURCHASE_ORDER_DATE")).alias("LATEST_PO_DATE"),
    min(col("RECEIVED_DATE")).alias("EARLIEST_RECEIVED_DATE"),
    max(col("RECEIVED_DATE")).alias("LATEST_RECEIVED_DATE")
)

# Add natural log transformation column to reduce skew in spend distribution
po_agg_df = po_agg_df.withColumn("LOG_SUM_SPEND_USD", log1p(col("SUM_SPEND_USD")))

## Z-Score Anomaly Detection

This block applies Z-Score anomaly detection to the aggregated spend data.

- **Threshold Configuration**: Defines a threshold value for the Z-Score, typically around 3.0 for strong outlier detection.
- **Mean and Standard Deviation Calculation**: Computes the mean and standard deviation of the target column (`LOG_SUM_SPEND_USD`) to understand its distribution.
- **Z-Score Calculation**: Calculates the Z-Score for each record, measuring how many standard deviations a point is from the mean.
- **Anomaly Flagging**: Flags records as anomalies if their absolute Z-Score exceeds the defined threshold. This helps identify unusually high or low spend amounts.
- **Execution**: Applies the function to the aggregated PO dataset, enriching it with Z-Score values and anomaly flags.

Z-Score is a simple yet effective statistical method to detect outliers based on distribution properties.

In [None]:
from pyspark.sql.functions import col, mean, stddev, when

# Define Z-score threshold (configurable)
z_score_threshold = 3.0 # default threshold

# Z-score Anomaly Detection Function
def zscore_anomaly_detection(df, column):
    stats_df = df.select(mean(col(column)).alias("mean"), stddev(col(column)).alias("stddev"))
    stats = stats_df.collect()[0] 
    
    mean_val, stddev_val = stats["mean"], stats["stddev"]
    
    if mean_val is None or stddev_val is None or stddev_val == 0:
        print(f"Skipping Z-score anomaly detection for {column} (Invalid mean/stddev).")
        return df
    
    # Apply Z-score calculations
    df = df.withColumn("Z_Score", (col(column) - mean_val) / stddev_val)
    df = df.withColumn("is_anomaly_zscore", when(abs(col("Z_Score")) > z_score_threshold, True).otherwise(False))
    print(f"✅ Z-score anomaly detection completed.")
    return df

# Execution
po_agg_df = zscore_anomaly_detection(po_agg_df, "LOG_SUM_SPEND_USD")


## Z-Score Summary and Visualisation

This block summarises the results of Z-Score anomaly detection and visualises the Z-Score distribution.

- **Summary Function**: 
  - Calculates total record count, the number of anomalies (records flagged as anomalies by the Z-Score method), and non-anomalies.
  - Computes minimum, maximum, and mean Z-Score values to understand the data spread.
  - Returns a summary DataFrame showing anomaly statistics, helpful for both quantitative and qualitative analysis.

- **Display Results**: Prints the summary DataFrame for review.

- **Visualisation**: 
  - Plots a histogram of Z-Scores using Matplotlib to show the distribution of values.
  - Helps identify where anomalies lie in the context of the overall distribution.

This step highlights the effectiveness of Z-Score in capturing unusual records and provides both a tabular and visual perspective on the results.

In [None]:
# Z-Score Summary
def summarize_zscore_anomalies(df, column):
    anomaly_col = "is_anomaly_zscore"
    z_score_col = "Z_Score"

    if anomaly_col in df.columns and z_score_col in df.columns:
        total_count = df.count()
        anomaly_count = df.filter(col(anomaly_col) == True).count()
        non_anomaly_count = total_count - anomaly_count
        anomaly_percentage = (anomaly_count / total_count) * 100 if total_count > 0 else 0

        # Aggregate statistics
        stats = df.agg(
            min(col(z_score_col)).alias("Min Z-Score"),
            max(col(z_score_col)).alias("Max Z-Score"),
            mean(col(z_score_col)).alias("Mean Z-Score")
        ).collect()[0]

        summary_data = [{
            "Column": column,
            "Total Records": total_count,
            "Anomalies": anomaly_count,
            "Non-Anomalies": non_anomaly_count,
            "Anomaly Percentage (%)": anomaly_percentage,
            "Min Z-Score": stats["Min Z-Score"] if stats["Min Z-Score"] is not None else "N/A",
            "Max Z-Score": stats["Max Z-Score"] if stats["Max Z-Score"] is not None else "N/A",
            "Mean Z-Score": stats["Mean Z-Score"] if stats["Mean Z-Score"] is not None else "N/A"
        }]

        summary_df = spark.createDataFrame(summary_data)
        summary_df.show(truncate=False)
        return summary_df
    else:
        print(f"Missing required columns in DataFrame: {anomaly_col}, {z_score_col}")
        return spark.createDataFrame([], schema=[
            "Column", "Total Records", "Anomalies", "Non-Anomalies",
            "Anomaly Percentage (%)", "Min Z-Score", "Max Z-Score", "Mean Z-Score"
        ])

# Generate summary
anomaly_summary_df = summarize_zscore_anomalies(po_agg_df, "LOG_SUM_SPEND_USD")

# Visualise Z-Score Distribution
z_scores = po_agg_df.select("Z_Score").dropna().toPandas()
plt.hist(z_scores["Z_Score"], bins=50)
plt.title("Z-Score Distribution")
plt.xlabel("Z-Score")
plt.ylabel("Frequency")
plt.show()


## IQR Anomaly Detection

This block applies the Interquartile Range (IQR) method to detect outliers in the spend data.

- **Threshold Definition**: Sets the IQR threshold, typically 1.5, but adjustable based on business needs. This threshold determines the cutoff for identifying points as potential anomalies.
- **Quantile Calculation**: Uses the approximate quantile method to efficiently calculate the first (Q1) and third quartiles (Q3) of the target column (`LOG_SUM_SPEND_USD`).
- **IQR Score Calculation**: Computes a relative score that measures the distance from a point to the nearest quartile, scaled by the IQR. This provides a sense of how extreme an outlier is.
- **Anomaly Flagging**: Flags records as anomalies if they fall below the lower bound (Q1 - threshold × IQR) or above the upper bound (Q3 + threshold × IQR).
- **Execution**: Applies the function to the aggregated PO-level DataFrame, adding both the `IQR_Score` and `is_anomaly_iqr` columns for downstream analysis.

IQR is a robust, non-parametric method that effectively identifies anomalies in skewed or non-normal data distributions.


In [None]:
# Define IQR threshold
iqr_threshold = 1.5 # configurable

# IQR Anomaly Detection Function
def iqr_anomaly_detection(df: DataFrame, column: str) -> DataFrame:
    quantiles = df.approxQuantile(column, [0.25, 0.75], 0.01)
    Q1, Q3 = quantiles[0], quantiles[1]
    IQR = Q3 - Q1
    lower_bound, upper_bound = Q1 - iqr_threshold * IQR, Q3 + iqr_threshold * IQR

    # Compute IQR Score (distance from nearest quartile divided by IQR)
    df = df.withColumn(
        "IQR_Score",
        when(col(column) < Q1, (Q1 - col(column)) / IQR)
        .when(col(column) > Q3, (col(column) - Q3) / IQR)
        .otherwise(0)
    )

    # Flag anomalies
    df = df.withColumn("is_anomaly_iqr", (col(column) < lower_bound) | (col(column) > upper_bound))
    print(f"✅ IQR anomaly detection completed.")
    return df

# Execution
po_agg_df = iqr_anomaly_detection(po_agg_df, "LOG_SUM_SPEND_USD")


## IQR Anomaly Detection Summary

This block summarises the results of the IQR-based anomaly detection.

- **Summary Function**:
  - Calculates the total number of records, the number of flagged anomalies, and the number of non-anomalies.
  - Computes the anomaly rate as a percentage of the dataset.
  - Aggregates key statistics including minimum, maximum, and mean IQR scores to provide insights into anomaly severity.

- **Data Display**:
  - Prints the summary DataFrame using `show()` for a clear tabular view of results.

- **Execution**:
  - Applies the summary function to the aggregated dataset, adding context to the IQR anomaly detection step.

This step helps evaluate the effectiveness of the IQR method in capturing outliers and provides quick visibility into the distribution of anomaly scores.

In [None]:
# Function to summarise IQR anomaly detection results
def summarize_iqr_anomalies(df, column):
    anomaly_col = "is_anomaly_iqr"
    iqr_score_col = "IQR_Score"

    if anomaly_col in df.columns and iqr_score_col in df.columns:
        total_count = df.count()
        if total_count == 0:
            print("No records found.")
            return spark.createDataFrame([], schema=[
                "Column", "Total Records", "Anomalies", "Non-Anomalies",
                "Anomaly Percentage (%)", "Min IQR Score", "Max IQR Score", "Mean IQR Score"
            ])
        
        anomaly_count = df.filter(col(anomaly_col) == True).count()
        non_anomaly_count = total_count - anomaly_count
        anomaly_percentage = (anomaly_count / total_count) * 100

        # Aggregate statistics
        stats = df.agg(
            min(col(iqr_score_col)).alias("Min IQR Score"),
            max(col(iqr_score_col)).alias("Max IQR Score"),
            mean(col(iqr_score_col)).alias("Mean IQR Score")
        ).collect()[0]

        summary_data = [{
            "Column": column,
            "Total Records": total_count,
            "Anomalies": anomaly_count,
            "Non-Anomalies": non_anomaly_count,
            "Anomaly Percentage (%)": anomaly_percentage,
            "Min IQR Score": stats["Min IQR Score"] if stats["Min IQR Score"] is not None else "N/A",
            "Max IQR Score": stats["Max IQR Score"] if stats["Max IQR Score"] is not None else "N/A",
            "Mean IQR Score": stats["Mean IQR Score"] if stats["Mean IQR Score"] is not None else "N/A"
        }]

        summary_df = spark.createDataFrame(summary_data)
        summary_df.show(truncate=False)
        return summary_df
    else:
        print(f"Missing required columns in DataFrame: {anomaly_col}, {iqr_score_col}")
        return spark.createDataFrame([], schema=[
            "Column", "Total Records", "Anomalies", "Non-Anomalies",
            "Anomaly Percentage (%)", "Min IQR Score", "Max IQR Score", "Mean IQR Score"
        ])

# Generate IQR anomaly summary
anomaly_summary_df = summarize_iqr_anomalies(po_agg_df, "LOG_SUM_SPEND_USD")


## Isolation Forest Anomaly Detection

This block applies the Isolation Forest algorithm to detect outliers using a combination of numerical features.

- **Feature Engineering**:
  - `LOG_SUM_SPEND_USD`: A log transformation of spend to handle skewness.
  - `SPEND_PER_LINE_ITEM`: Calculated spend per line item.
  - `IS_LARGE_PO`: A binary indicator for purchase orders with exceptionally high spend (> $1,000,000)

- **Unique Identifier**: Adds a `unique_id` column to facilitate merging results back into the Spark DataFrame.

- **Pandas Conversion and Scaling**:
  - Converts Spark DataFrame to Pandas for compatibility with Scikit-learn.
  - Scales the features using `StandardScaler` for consistent performance.

- **Model Training and Scoring**:
  - Fits an Isolation Forest model with the specified contamination rate.
  - Scores each record and flags as anomaly if its score exceeds the defined threshold.

- **Merge Results**:
  - Joins the results back into the Spark DataFrame, adding `IF_score_value` and `is_anomaly_isoforest` columns.

This process leverages machine learning to detect multivariate anomalies that may not be evident with simpler methods like Z-Score or IQR.


In [None]:
from sklearn.preprocessing import StandardScaler

# Isolation Forest Function
def isolation_forest_anomaly_detection_multi(df, features, contamination=0.01):
    
    # Step 1: Feature Engineering
    df = df.withColumn("LOG_SUM_SPEND_USD", log1p(col("SUM_SPEND_USD")))
    df = df.withColumn(
        "SPEND_PER_LINE_ITEM",
        when(col("LINE_ITEM_COUNT") != 0, col("SUM_SPEND_USD") / col("LINE_ITEM_COUNT")).otherwise(None)
    )
    df = df.withColumn(
        "IS_LARGE_PO",
        when(col("SUM_SPEND_USD") > 1000000, 1).otherwise(0)
    )

    # Step 2: Add unique ID
    df = df.withColumn("unique_id", monotonically_increasing_id())

    # Step 3: Select relevant columns and convert to pandas
    selected_cols = ["unique_id"] + features
    df_pd = df.select(*selected_cols).toPandas()

    # Step 4: Clean and scale
    df_pd.dropna(subset=features, inplace=True)
    df_pd[features] = df_pd[features].apply(pd.to_numeric, errors='coerce')
    df_pd.dropna(subset=features, inplace=True)

    scaler = StandardScaler()
    df_scaled = pd.DataFrame(scaler.fit_transform(df_pd[features]), columns=features)

    # Step 5: Fit Isolation Forest
    iso_forest = IsolationForest(contamination=contamination, random_state=42)
    iso_forest.fit(df_scaled)

    # Step 6: Score and flag
    df_pd["IF_score_value"] = -iso_forest.score_samples(df_scaled)
    threshold = df_pd["IF_score_value"].quantile(1 - contamination)
    df_pd["is_anomaly_isoforest"] = (df_pd["IF_score_value"] > threshold).astype(bool)

    # Step 7: Merge back to Spark
    df_results = spark.createDataFrame(df_pd[["unique_id", "IF_score_value", "is_anomaly_isoforest"]])
    df = df.join(df_results, on="unique_id", how="left").drop("unique_id")

    print(f"✅ Isolation Forest completed using: {features}")
    return df

# Define features to use
features_to_use = [
    "LOG_SUM_SPEND_USD",
    "LINE_ITEM_COUNT",
    "SPEND_PER_LINE_ITEM",
    "IS_LARGE_PO"
]

# Set contamination threshold manually
iso_forest_threshold = 0.01  # Typically 1% or configurable

# Execution
po_agg_df = isolation_forest_anomaly_detection_multi(po_agg_df, features_to_use, iso_forest_threshold)


## Isolation Forest Anomaly Summary

This block summarises the results of Isolation Forest anomaly detection and visualises the distribution of anomaly scores.

- **Summary Function**:
  - Calculates total records, anomalies flagged, non-anomalies, and the anomaly rate.
  - Computes minimum, maximum, and mean Isolation Forest scores to assess the severity and spread of anomalies.
  - Returns a Spark DataFrame summarising the results for easy review.

- **Data Display**:
  - Prints the summary DataFrame using `.show()` to provide a clear, tabular summary of results.

- **Visualisation**:
  - Plots a histogram of Isolation Forest scores using Matplotlib, offering a quick look at score distribution and helping interpret thresholds and contamination rates.

This step complements other anomaly detection methods (e.g. Z-Score, IQR) by adding a multivariate machine learning perspective to the anomaly detection process.


In [None]:
# Function to summarise Isolation Forest anomaly detection results
def summarize_isolation_forest_anomalies(df, column):
    anomaly_col = "is_anomaly_isoforest"
    if_score_col = "IF_score_value"

    if anomaly_col in df.columns and if_score_col in df.columns: 
        total_count = df.count()
        if total_count == 0:
            print("No records found.")
            return spark.createDataFrame([], schema=[
                "Column", "Total Records", "Anomalies", "Non-Anomalies",
                "Anomaly Percentage (%)", "Min IF Score", "Max IF Score", "Mean IF Score"
            ])
        
        anomaly_count = df.filter(col(anomaly_col) == True).count()
        non_anomaly_count = total_count - anomaly_count
        anomaly_percentage = (anomaly_count / total_count) * 100

        # Aggregate statistics
        stats = df.agg(
            min(col(if_score_col)).alias("Min IF Score"),
            max(col(if_score_col)).alias("Max IF Score"),
            mean(col(if_score_col)).alias("Mean IF Score")
        ).collect()[0]

        summary_data = [{
            "Column": column,
            "Total Records": total_count,
            "Anomalies": anomaly_count,
            "Non-Anomalies": non_anomaly_count,
            "Anomaly Percentage (%)": anomaly_percentage,
            "Min IF Score": stats["Min IF Score"] if stats["Min IF Score"] is not None else "N/A",
            "Max IF Score": stats["Max IF Score"] if stats["Max IF Score"] is not None else "N/A",
            "Mean IF Score": stats["Mean IF Score"] if stats["Mean IF Score"] is not None else "N/A"
        }]

        summary_df = spark.createDataFrame(summary_data)
        summary_df.show(truncate=False)
        return summary_df
    else:
        print(f"Missing required columns in DataFrame: {anomaly_col}, {if_score_col}")
        return spark.createDataFrame([], schema=[
            "Column", "Total Records", "Anomalies", "Non-Anomalies",
            "Anomaly Percentage (%)", "Min IF Score", "Max IF Score", "Mean IF Score"
        ])

# Generate Isolation Forest anomaly summary
anomaly_summary_df = summarize_isolation_forest_anomalies(po_agg_df, "LOG_SUM_SPEND_USD")

# Visualise Isolation Forest scores
if_scores = po_agg_df.select("IF_score_value").dropna().toPandas()
plt.hist(if_scores["IF_score_value"], bins=50)
plt.title("Isolation Forest Score Distribution")
plt.xlabel("Score")
plt.ylabel("Frequency")
plt.show()

## Weighted Anomaly Scoring

This block calculates a weighted anomaly score by combining multiple detection methods and a spend-based component.

- **Raw Weights**:
  - Assigns initial weights to each anomaly detection method:
    - `is_anomaly_zscore`: 30%
    - `is_anomaly_iqr`: 20%
    - `is_anomaly_isoforest`: 30%
    - `spend_magnitude`: 20%
  - Normalises these weights to ensure they sum to 1.0.

- **Model Votes**:
  - Each anomaly detection flag (`is_anomaly_zscore`, `is_anomaly_iqr`, and `is_anomaly_isoforest`) is converted to a numeric indicator and weighted accordingly.

- **Spend Component**:
  - Incorporates the relative spend magnitude by comparing each record’s spend to the 95th percentile.
  - This component captures unusually high spend behaviour that may not be flagged by individual models.

- **Score Calculation**:
  - Combines all weighted components into a single `weighted_score` column.

This composite score helps prioritise records for review by combining multiple detection methods with domain-relevant metrics like spend.

In [None]:
# Weighted Score w/ Spend
def apply_weighted_score(df):
    # Raw weights
    raw_weights = {
        "is_anomaly_zscore": 30,
        "is_anomaly_iqr": 20,
        "is_anomaly_isoforest": 30,
        "spend_magnitude": 20
    }

    # Normalize weights
    total_weight = 0
    for v in raw_weights.values():
        total_weight += v
    weights = {key: value / total_weight for key, value in raw_weights.items()}
    print("Total Weight:", total_weight)

    # Set column list
    anomaly_columns = ["is_anomaly_zscore", "is_anomaly_iqr", "is_anomaly_isoforest"]

    # Build the anomaly flag score expression
    model_exprs = [
        f"{weights[col]} * CAST(COALESCE(CAST({col} AS INT), 0) AS DOUBLE)"
        for col in anomaly_columns
    ]

    # Get the 95th percentile value for spend
    p95_value = df.selectExpr("percentile_approx(SUM_SPEND_USD, 0.95)").first()[0]

    # Spend component
    spend_expr = f"{weights['spend_magnitude']} * LEAST(SUM_SPEND_USD / {p95_value}, 1.0)"

    # Combine all into one string expression
    full_expr = " + ".join(model_exprs + [spend_expr])

    # Return DataFrame with weighted score
    return df.withColumn("weighted_score", expr(full_expr))

# Execution
po_agg_df = apply_weighted_score(po_agg_df)

## Model Agreement Bonus

This block enhances the anomaly scoring by introducing a bonus mechanism based on the agreement between different detection models.

- **Model Votes**:
  - Sums the binary flags from the three detection methods (`is_anomaly_zscore`, `is_anomaly_iqr`, and `is_anomaly_isoforest`).
  - This creates a `model_votes` column indicating how many models agreed on a given anomaly.

- **Agreement Bonus**:
  - If two models flag the same record as an anomaly, adds a bonus of 0.15 to the score.
  - If all three models agree, adds a bonus of 0.25.
  - This approach rewards consensus and highlights anomalies that multiple models identify.

- **Final Scoring**:
  - Combines the `weighted_score` with the `agreement_bonus` to produce a final `score_for_severity`.
  - This score better captures both individual model insights and cross-model agreement.

This strategy refines anomaly ranking by incorporating both model votes and individual detection weights, making it easier to prioritise anomalies for further investigation.


In [None]:
# Model Agreement Bonus

# Add model_votes and bonus
po_agg_df = po_agg_df.withColumn(
    "model_votes",
    expr("CAST(is_anomaly_zscore AS INT) + CAST(is_anomaly_iqr AS INT) + CAST(is_anomaly_isoforest AS INT)")
).withColumn(
    "agreement_bonus",
    when(col("model_votes") == 2, 0.15)
    .when(col("model_votes") == 3, 0.25)
    .otherwise(0.0)
)

po_agg_df = po_agg_df.withColumn(
    "score_for_severity",
    col("weighted_score") + col("agreement_bonus")
)

## Severity Classification

This block classifies records into severity levels based on their anomaly scores.

- **Classification Function**:
  - Defines thresholds to classify each record into one of four severity categories:
    - `'None'`: Score less than 0.4, indicating normal behaviour.
    - `'Mild'`: Score between 0.4 and 0.65, indicating potential anomalies.
    - `'High'`: Score between 0.65 and 0.85, indicating likely anomalies.
    - `'Severe'`: Score of 0.85 or higher, indicating strong anomalies.

- **Implementation**:
  - Uses PySpark’s `expr()` function to define a CASE statement, creating a new `severity` column.
  - Applies the function to the anomaly DataFrame, enriching it with an easy-to-interpret severity label.

This classification step helps prioritise records by severity, enabling more focused investigation and reporting.


In [None]:
# Severity
def apply_severity_classification(df: DataFrame, score_col: str = "weighted_score") -> DataFrame:
    return df.withColumn("severity",
        expr(f"""
            CASE
                WHEN {score_col} < 0.4 THEN 'None'
                WHEN {score_col} >= 0.4 AND {score_col} < 0.65 THEN 'Mild'
                WHEN {score_col} >= 0.65 AND {score_col} < 0.85 THEN 'High'
                WHEN {score_col} >= 0.85 THEN 'Severe'
            END
        """)
    )


# Execution
po_agg_df = apply_severity_classification(po_agg_df, score_col="score_for_severity")

## Generating Anomaly Insights

This block generates textual insights for each record based on which anomaly detection methods flagged it as anomalous.

- **Insight Generation Function**:
  - For each record, extracts relevant fields like purchase order number (`PURCHASE_ORDER_NUM`), supplier name (`SUPPLIER_NAME`), and total spend.
  - Checks each anomaly detection flag (`is_anomaly_zscore`, `is_anomaly_iqr`, `is_anomaly_isoforest`).
  - For each flagged anomaly type:
    - Retrieves the corresponding score (Z-Score, IQR Score, Isolation Forest Score).
    - Appends a descriptive insight to a list.
  - Joins all applicable insights into a single string. If no anomalies are detected, returns `"No anomalies detected."`.

- **Application to DataFrame**:
  - Registers the insight function as a PySpark UDF.
  - Applies the UDF to the DataFrame, creating a new `insight` column that summarises the anomalies detected for each record.

This step makes the anomaly results more interpretable and provides context for stakeholders, enabling more effective decision-making and anomaly investigation.

In [None]:
# Generate insights
def generate_insight(row):
    insights = []

    po_num = getattr(row, "PURCHASE_ORDER_NUM", "UNKNOWN")
    supplier = getattr(row, "SUPPLIER_NAME", "UNKNOWN")

    try:
        spend = round(float(getattr(row, "SUM_SPEND_USD", 0) or 0))
    except:
        spend = 0

    if getattr(row, "is_anomaly_zscore", False):
        try:
            z = round(float(getattr(row, "Z_Score", 0) or 0), 2)
            insights.append(f"PO {po_num} from {supplier} was flagged by Z-score (Amount: ${spend}, Score: {z}).")
        except:
            pass

    if getattr(row, "is_anomaly_iqr", False):
        try:
            iqr = round(float(getattr(row, "IQR_Score", 0) or 0), 2)
            insights.append(f"PO {po_num} from {supplier} was flagged by IQR (Amount: ${spend}, Score: {iqr}).")
        except:
            pass

    if getattr(row, "is_anomaly_isoforest", False):
        try:
            iso = round(float(getattr(row, "IF_score_value", 0) or 0), 2)
            insights.append(f"PO {po_num} from {supplier} was flagged by Isolation Forest (Amount: ${spend}, Score: {iso}).")
        except:
            pass

    return " ".join(insights) if insights else "No anomalies detected."


# Apply insights function
def apply_insights(df: DataFrame) -> DataFrame:
    generate_insight_udf = udf(generate_insight, StringType())
    return df.withColumn("insight", generate_insight_udf(struct([df[x] for x in df.columns])))

# Execution
po_agg_df = apply_insights(po_agg_df)

## Outlier Summary

This block consolidates the results of anomaly detection and presents a summary of key metrics.

- **Data Preparation**:
  - Ensures numerical columns (e.g. `weighted_score`, `Z_Score`, `IQR_Score`, `IF_score_value`) are properly cast to double precision for consistency.
  
- **Counting Anomalies**:
  - Counts the number of flagged anomalies for each detection method (Z-Score, IQR, Isolation Forest) and by severity level (Severe, High, Mild).

- **Spend Analysis**:
  - Calculates the total spend per severity level, offering insights into the potential financial impact of anomalies.
  - Computes percentages of spend per severity relative to total spend to prioritise investigations.

- **Final Summary**:
  - Prints a comprehensive summary of anomaly counts, detection percentages, and spend impact.

This step ties the entire pipeline together by translating detection outputs into actionable insights for review and prioritisation.

In [None]:
# Outlier Summary

# --- Cast numeric columns ---
numeric_cols = ["weighted_score", "Z_Score", "IQR_Score", "IF_score_value"]
for col_name in numeric_cols:
    po_agg_df = po_agg_df.withColumn(col_name, col(col_name).cast(DoubleType()))

# --- Count anomaly flags ---
anomalies_z = po_agg_df.filter(col("is_anomaly_zscore")).count()
anomalies_iqr = po_agg_df.filter(col("is_anomaly_iqr")).count()
anomalies_isoforest = po_agg_df.filter(col("is_anomaly_isoforest")).count()
severe_anomalies = po_agg_df.filter(col("severity") == 'Severe').count()
high_anomalies = po_agg_df.filter(col("severity") == 'High').count()
mild_anomalies = po_agg_df.filter(col("severity") == 'Mild').count()

# --- Calculate detection percentages ---
percent_z = (anomalies_z / distinct_po_count) * 100
percent_iqr = (anomalies_iqr / distinct_po_count) * 100
percent_isoforest = (anomalies_isoforest / distinct_po_count) * 100
percent_severe = (severe_anomalies / distinct_po_count) * 100
percent_high = (high_anomalies / distinct_po_count) * 100
percent_mild = (mild_anomalies / distinct_po_count) * 100

# --- Total row and spend calculations ---
total_rows = main_df.count()
total_spend = main_df.agg(sum("TOTAL_SPEND_USD")).collect()[0][0] or 0
total_spend = round(total_spend, 2)

# --- Helper function ---
def get_anomaly_spend(df, severity_level):
    spend = (
        df
        .filter(col("severity") == severity_level)
        .dropDuplicates(["PURCHASE_ORDER_NUM"])
        .agg(sum("SUM_SPEND_USD"))
        .collect()[0][0]
    )
    return round(spend or 0, 2)

# --- Spend by severity level ---
severe_anomaly_spend = get_anomaly_spend(po_agg_df, "Severe")
high_anomaly_spend = get_anomaly_spend(po_agg_df, "High")
mild_anomaly_spend = get_anomaly_spend(po_agg_df, "Mild")

# --- Percentage of spend per severity level ---
severe_anomaly_pct = (severe_anomaly_spend / total_spend) * 100 if total_spend else 0
high_anomaly_pct = (high_anomaly_spend / total_spend) * 100 if total_spend else 0
mild_anomaly_pct = (mild_anomaly_spend / total_spend) * 100 if total_spend else 0

# --- Investigate anomalies with percent > 100 ---
percent_dict = {
    "percent_z": percent_z,
    "percent_iqr": percent_iqr,
    "percent_isoforest": percent_isoforest,
    "percent_severe": percent_severe,
    "percent_high": percent_high,
    "percent_mild": percent_mild,
    "severe_anomaly_pct": severe_anomaly_pct,
    "high_anomaly_pct": high_anomaly_pct,
    "mild_anomaly_pct": mild_anomaly_pct
}

# --- Final Summary ---
print("\n=== Anomaly Detection Summary ===")
print(f"Total Records: {total_rows}")
print(f"Total Spend: {total_spend}")
print(f"Z-Score Outliers: {anomalies_z} ({percent_z:.2f}%)")
print(f"IQR Outliers: {anomalies_iqr} ({percent_iqr:.2f}%)")
print(f"Isolation Forest Outliers: {anomalies_isoforest} ({percent_isoforest:.2f}%)")
print(f"Severe Outliers: {severe_anomalies} ({percent_severe:.2f}%)")
print(f"Severe Outlier Spend: {severe_anomaly_spend}")
print(f"High Outliers: {high_anomalies} ({percent_high:.2f}%)")
print(f"High Outlier Spend: {high_anomaly_spend}")
print(f"Mild Outliers: {mild_anomalies} ({percent_mild:.2f}%)")
print(f"Mild Outlier Spend: {mild_anomaly_spend}")
print("==========================================================")


## Automated Email Report

This block generates an HTML summary report that can be embedded in an email for stakeholders.

- **Function Overview**:
  - Accepts key summary metrics: total records, total spend, distinct PO count, and anomaly spend/percentages by severity.
  - Formats the results into an HTML table using in-line CSS for styling.

- **Highlights**:
  - Clear tabular layout showing anomaly breakdown by spend category: Mild, High, and Severe.
  - Includes helpful formatting (thousands separators, currency formatting, percentages).
  - Provides a consistent, professional presentation for sharing insights with non-technical stakeholders.

- **Execution**:
  - Calls the function with current summary statistics to generate the final HTML string.

This reporting step transforms numerical insights into an executive-friendly format, enabling quick review and decision-making.

In [None]:
# Automated email report
def generate_exec_html_email(file_pattern, total_rows, total_spend, distinct_po_count,
                              mild_anomaly_spend, mild_anomaly_pct,
                              high_anomaly_spend, high_anomaly_pct,
                              severe_anomaly_spend, severe_anomaly_pct):

    total_spend = total_spend or 0.0
    mild_anomaly_spend = mild_anomaly_spend or 0.0
    high_anomaly_spend = high_anomaly_spend or 0.0
    severe_anomaly_spend = severe_anomaly_spend or 0.0
    mild_anomaly_pct = mild_anomaly_pct or 0.0
    high_anomaly_pct = high_anomaly_pct or 0.0
    severe_anomaly_pct = severe_anomaly_pct or 0.0

    report = f"""
    <html>
    <body style="font-family: Arial, sans-serif;">
        <h2 style="color: #003366;">Outlier Detection Summary</h2>

        <table cellpadding="6" cellspacing="0" style="border-collapse: collapse; font-size: 14px; width: 60%;">
            <tr style="background-color: #d9e1f2; font-weight: bold;">
                <th style="border: 1px solid #999;">CATEGORY</th>
                <th style="border: 1px solid #999;">VALUE</th>
                <th style="border: 1px solid #999;">SPEND %</th>
            </tr>
            <tr><td style="border: 1px solid #999;">Extract</td><td colspan="2" style="border: 1px solid #999;">{file_pattern}</td></tr>
            <tr><td style="border: 1px solid #999;">Total records</td><td style="border: 1px solid #999;">{total_rows:,}</td><td style="border: 1px solid #999;">-</td></tr>
            <tr><td style="border: 1px solid #999;">Total spend</td><td style="border: 1px solid #999;">${total_spend:,.2f}</td><td style="border: 1px solid #999;">100%</td></tr>
            <tr><td style="border: 1px solid #999;">PO count</td><td style="border: 1px solid #999;">{distinct_po_count}</td><td style="border: 1px solid #999;">-</td></tr>
            <tr style="background-color: #fff8dc;"><td style="border: 1px solid #999;">Mild outliers</td><td style="border: 1px solid #999;">${mild_anomaly_spend:,.2f}</td><td style="border: 1px solid #999;">{mild_anomaly_pct:.3f}%</td></tr>
            <tr style="background-color: #ffe4e1;"><td style="border: 1px solid #999;">High outliers</td><td style="border: 1px solid #999;">${high_anomaly_spend:,.2f}</td><td style="border: 1px solid #999;">{high_anomaly_pct:.3f}%</td></tr>
            <tr style="background-color: #f08080;"><td style="border: 1px solid #999;">Severe outliers</td><td style="border: 1px solid #999;">${severe_anomaly_spend:,.2f}</td><td style="border: 1px solid #999;">{severe_anomaly_pct:.3f}%</td></tr>
        </table>

        <p style="font-size: 12px; color: #666; margin-top: 20px;">
            This report summarises flagged anomalies by spend category.
        </p>
    </body>
    </html>
    """
    return report

# Generate the report with new values
report = generate_exec_html_email(
    file_pattern, total_rows, total_spend, distinct_po_count, mild_anomaly_spend, mild_anomaly_pct, high_anomaly_spend, high_anomaly_pct,severe_anomaly_spend, severe_anomaly_pct
)

## Automated Email Report with Microsoft Graph API

This block handles the final step of the pipeline: saving the anomaly report and sending it via email using the Microsoft Graph API.

- **Setup**:
  - Uses the `requests` library to interact with the Microsoft Graph API for authentication and email sending.
  - Encodes the CSV file in Base64 if an attachment is required.

- **Access Token**:
  - Retrieves an access token using client credentials

- **Email Composition**:
  - Builds the email payload, including:
    - Subject line with the file name.
    - HTML body with the anomaly summary.
    - Optional attachment of the CSV report.

- **Sending**:
  - Makes a POST request to the Graph API `/sendMail` endpoint.
  - Prints success or error messages for visibility.

- **Data Export**:
  - Saves the anomaly DataFrame to a local CSV file, providing a record for future reference.

This step ensures that stakeholders can receive the anomaly detection results automatically, reducing manual intervention and streamlining communication.

In [None]:
import requests
import json
import os
import shutil
import base64
import glob

# Set email recipient
email_recipient = "example@email.com"

# Function to get access token
def get_access_token():
    url = "https://login.microsoftonline.com/YOUR_TENANT_ID/oauth2/v2.0/token"
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    body = {
        'client_id': 'YOUR_CLIENT_ID',
        'client_secret': 'YOUR_CLIENT_SECRET',
        'grant_type': 'client_credentials',
        'scope': 'https://graph.microsoft.com/.default'
    }
    response = requests.post(url, data=body, headers=headers)
    return response.json().get('access_token')

# Function to encode file
def encode_file_to_base64(file_path):
    with open(file_path, "rb") as file:
        return base64.b64encode(file.read()).decode("utf-8")

# Function to send email
def send_email(report, file_pattern, attachment_path=None):
    access_token = get_access_token()
    url = "https://graph.microsoft.com/v1.0/users/YOUR_EMAIL/sendMail"
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json',
    }

    email_data = {
        "message": {
            "subject": f"Outlier Report for File: {file_pattern}",
            "body": {"contentType": "HTML", "content": report},
            "toRecipients": [{"emailAddress": {"address": email_recipient}}]
        }
    }

    if attachment_path:
        encoded_attachment = encode_file_to_base64(attachment_path)
        email_data["message"]["attachments"] = [{
            "@odata.type": "#microsoft.graph.fileAttachment",
            "name": os.path.basename(attachment_path),
            "contentType": "text/csv",
            "contentBytes": encoded_attachment
        }]

    response = requests.post(url, json=email_data, headers=headers)
    if response.status_code == 202:
        print("✅ Email sent successfully!" + (" (With Attachment)" if attachment_path else ""))
    else:
        print(f"❌ Failed to send email: {response.text}")

# Save DataFrame to local CSV
output_csv_path = f"output/{file_pattern}_outlier_report.csv"
po_agg_df.toPandas().to_csv(output_csv_path, index=False)
print(f"✅ CSV file saved at: {output_csv_path}")

# Send email with report and attachment
send_email(report, file_pattern, output_csv_path)


## Anomaly Detection Logging

This block captures and logs key metrics from the anomaly detection process for record-keeping and analysis.

- **Logging Setup**:
  - Defines table names for storing anomaly logs and anomaly data, parameterised by the business unit.
  - Uses Delta Lake for efficient, scalable storage and querying.

- **Log Function**:
  - Creates a DataFrame with summary metrics: file name, total rows, total spend, and anomaly counts for each method.
  - Adds a timestamp column to track when the run occurred.
  - Appends the log to the `high_level_anomaly_log` Delta table.

- **Anomaly Data Export**:
  - Appends the full anomaly detection results to a dedicated Delta table for the business unit.

- **Audit Check**:
  - Queries and displays the last 10 anomaly detection runs from the log table, providing quick access to recent results.

This step ensures traceability and accountability for each anomaly detection run, supporting audit and review processes.


In [None]:
# Logging

# Set business unit
business_unit = "example_business_unit"

# Define logging table names
log_table_name = "anomaly_detection.high_level_anomaly_log"
export_table = f"anomaly_detection.{business_unit}_file_anomalies"

def log_anomaly_detection_run(file_pattern, total_rows, total_spend, anomalies_z, anomalies_iqr, anomalies_isoforest, severe_anomalies):
    """
    Logs anomaly detection metrics for a specific file into a Delta table.
    """
    log_df = spark.createDataFrame([(
        file_pattern, total_rows, total_spend, anomalies_z, anomalies_iqr, anomalies_isoforest, severe_anomalies
    )], [
        "file_pattern", "total_rows", "total_spend",
        "anomalies_z", "anomalies_iqr", "anomalies_isoforest", "severe_anomalies"
    ]).withColumn("timestamp", current_timestamp())

    # Append to the table
    log_df.write.format("delta").mode("append").saveAsTable(log_table_name)
    print(f"✅ Logged anomaly detection run for file: {file_pattern}")

# Save anomaly data to table
po_agg_df.write.format("delta").mode("append").saveAsTable(export_table)
print(f"✅ Anomaly data written to table: {export_table}")

# Log anomaly detection run
log_anomaly_detection_run(
    file_pattern=file_pattern,
    total_rows=total_rows,
    total_spend=total_spend,
    anomalies_z=anomalies_z,
    anomalies_iqr=anomalies_iqr,
    anomalies_isoforest=anomalies_isoforest,
    severe_anomalies=severe_anomalies
)

# Show last 10 anomaly detection runs
last_10_logs = spark.sql(f"SELECT * FROM {log_table_name} ORDER BY timestamp DESC LIMIT 10")
last_10_logs.show(truncate=False)
