### Prerequisites

In [0]:
%pip install mlflow pyspark prophet

### Step 1: Load Data from Databricks Catalog

In [0]:
%sql
SELECT * FROM test_workspace.kaggle_sap_replicated_data.bkpf_csv LIMIT

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp
import mlflow
import mlflow.spark

# Initialize Spark Session
spark = SparkSession.builder.appName("SAP_CashManagement_AI").getOrCreate()

# Load SAP transactions from Databricks Catalog (assuming BSEG & BKPF are loaded)
df_bseg = spark.sql("SELECT * FROM test_workspace.kaggle_sap_replicated_data.bseg_csv")
df_bkpf = spark.sql("SELECT * FROM test_workspace.kaggle_sap_replicated_data.bkpf_csv")

# Join BSEG & BKPF on Document Number (BELNR), Company Code (BUKRS), and Fiscal Year (GJAHR)
df = df_bseg.join(df_bkpf, ["MANDT","BUKRS", "BELNR", "GJAHR"], "inner")

# Convert posting date to timestamp
df = df.withColumn("BUDAT", unix_timestamp(col("BUDAT")).cast("timestamp"))

# Display dataset
df.select("MANDT","BUKRS", "GJAHR", "BELNR", "BUZEI", "BUDAT", "HKONT", "DMBTR", "SHKZG", "BLART", "BSCHL", "BKTXT", "SGTXT", "KOART", "KOSTL", "PRCTR", "LIFNR", "KUNNR").show(5)


### Step 2: Train Transaction Classification Model
Goal: Classify transactions into categories (e.g., Payroll, Vendor Payments, Internal Transfers, Fraud)

Algorithm: Logistic Regression (SAP-specific feature selection)

In [0]:
%python
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Fill NaN values in the input columns
df = df.fillna({
    "DMBTR": 0,
    "HKONT": 0,
    "KOSTL": 0,
    "PRCTR": 0
})

# Convert categorical fields to indexed numerical values
indexer_kostl = StringIndexer(inputCol="KOSTL", outputCol="KOSTL_INDEX", handleInvalid="keep")
indexer_prctr = StringIndexer(inputCol="PRCTR", outputCol="PRCTR_INDEX", handleInvalid="keep")
indexer_blart = StringIndexer(inputCol="BLART", outputCol="BLART_INDEX", handleInvalid="keep")

# Assemble features
assembler = VectorAssembler(
    inputCols=["DMBTR", "HKONT", "KOSTL_INDEX", "PRCTR_INDEX"],
    outputCol="features",
    handleInvalid="keep"  # Handle null values
)

# Define the model
lr = LogisticRegression(featuresCol="features", labelCol="BLART_INDEX")

# Create pipeline (ensure order of transformations)
pipeline = Pipeline(stages=[indexer_kostl, indexer_prctr, indexer_blart, assembler, lr])

# Ensure the transformations persist
df_transformed = pipeline.fit(df).transform(df)

# Verify that the required columns exist before training
df_transformed.select("BLART_INDEX", "KOSTL_INDEX", "PRCTR_INDEX", "features").show(5)

# Train the model
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)
display(predictions.select("BUKRS", "BELNR", "GJAHR", "BLART", "prediction"))

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    when,
    unix_timestamp,
    count,
    avg,
    stddev,
    sum,
    lag,
    datediff,
    month,
    quarter,
    dayofweek,
    hour,
    countDistinct,
)
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize Spark Session
spark = SparkSession.builder.appName("SAP_CashManagement_FraudDetection").getOrCreate()

# Load SAP transactions from Databricks Catalog (assuming BSEG & BKPF are loaded)
df_bseg = spark.sql("SELECT * FROM test_workspace.kaggle_sap_replicated_data.bseg_csv")
df_bkpf = spark.sql("SELECT * FROM test_workspace.kaggle_sap_replicated_data.bkpf_csv")

# Join BSEG & BKPF on Document Number (BELNR), Company Code (BUKRS), and Fiscal Year (GJAHR)
df = df_bseg.join(df_bkpf, ["MANDT", "BUKRS", "BELNR", "GJAHR"], "inner")

# Convert posting date to timestamp
df = df.withColumn("BUDAT", unix_timestamp(col("BUDAT")).cast("timestamp"))

# Fill NULL values in numerical columns with 0
df = df.fillna({"DMBTR": 0, "SHKZG": "S"})

# Fill NULL values in categorical fields with 'UNKNOWN'
df = df.fillna(
    {
        "BKTXT": "UNKNOWN",
        "SGTXT": "UNKNOWN",
        "KOART": "UNKNOWN",
        "KOSTL": "UNKNOWN",
        "PRCTR": "UNKNOWN",
        "LIFNR": "UNKNOWN",
        "KUNNR": "UNKNOWN",
    }
)

# Define window partition by vendor/customer ID
window_spec = Window.partitionBy("KUNNR")

### FRAUD DETECTION RULES ###

# 1️ High-Value Transactions
df = df.withColumn("High_Amount_Fraud", when(col("DMBTR") > 50000, 1).otherwise(0))

# 2️ Unusual Debit/Credit Ratio
df = df.withColumn(
    "Total_Debits",
    sum(when(col("SHKZG") == "S", col("DMBTR")).otherwise(0)).over(window_spec),
)
df = df.withColumn(
    "Total_Credits",
    sum(when(col("SHKZG") == "H", col("DMBTR")).otherwise(0)).over(window_spec),
)
df = df.withColumn(
    "Unusual_Debit_Credit_Ratio",
    when(col("Total_Debits") / (col("Total_Credits") + 1) > 5, 1).otherwise(0),
)  # Avoid division by zero

# 3️ Rapid Sequential Transactions
window_spec_ordered = window_spec.orderBy("BUDAT")
df = df.withColumn(
    "Time_Diff",
    unix_timestamp(col("BUDAT"))
    - unix_timestamp(lag("BUDAT").over(window_spec_ordered)),
)
df = df.withColumn(
    "Rapid_Transactions",
    when((col("Time_Diff") < 600) & (col("DMBTR") > 10000), 1).otherwise(0),
)

# 4️ Duplicate Transactions
df_duplicate_txns = (
    df.groupBy("KUNNR", "DMBTR", "BUDAT")
    .count()
    .withColumnRenamed("count", "Duplicate_Transactions")
)
df = df.join(df_duplicate_txns, ["KUNNR", "DMBTR", "BUDAT"], "left")
df = df.withColumn(
    "Possible_Duplicate", when(col("Duplicate_Transactions") > 1, 1).otherwise(0)
)

# 5️ Reversal Transactions
df = df.withColumn(
    "Potential_Reversal",
    when(lag("DMBTR").over(window_spec_ordered) == -col("DMBTR"), 1).otherwise(0),
)

# 6️ Transactions Outside Business Hours
df = df.withColumn("Transaction_Hour", hour("BUDAT"))
df = df.withColumn(
    "After_Hours_Transaction",
    when((col("Transaction_Hour") < 6) | (col("Transaction_Hour") > 20), 1).otherwise(
        0
    ),
)

# 7️ High-Risk Vendor/Customer
high_risk_vendors = ["123456", "789101"]  # Example flagged vendors
df = df.withColumn(
    "High_Risk_Vendor", when(col("KUNNR").isin(high_risk_vendors), 1).otherwise(0)
)

# 8️ Mismatched Vendor and Account Details
df = df.withColumn(
    "Mismatched_Vendor_Account", when(col("LIFNR") != col("HKONT"), 1).otherwise(0)
)

# 9️ Seasonal Anomaly
df = df.withColumn("Transaction_Month", month("BUDAT"))
df = df.withColumn(
    "Seasonal_Anomaly",
    when(
        (col("Transaction_Month") == 12)
        & (col("DMBTR") > 10 * avg("DMBTR").over(window_spec)),
        1,
    ).otherwise(0),
)

# 10 Unusual Transaction Count for a Vendor
df = df.withColumn("Vendor_Transaction_Count", count("DMBTR").over(window_spec))
df = df.withColumn(
    "Unusual_Transaction_Count",
    when(
        col("Vendor_Transaction_Count")
        > 5 * avg("Vendor_Transaction_Count").over(window_spec),
        1,
    ).otherwise(0),
)

### FRAUD SCORE ###
fraud_features = [
    "High_Amount_Fraud",
    "Unusual_Debit_Credit_Ratio",
    "Rapid_Transactions",
    "Possible_Duplicate",
    "Potential_Reversal",
    "After_Hours_Transaction",
    "High_Risk_Vendor",
    "Mismatched_Vendor_Account",
    "Seasonal_Anomaly",
    "Unusual_Transaction_Count",
]

df = df.withColumn("Fraud_Score", sum([col(f) for f in fraud_features]))

# Flag transactions with multiple fraud indicators
df = df.withColumn(
    "Fraud_Flag", when(col("Fraud_Score") >= 3, 1).otherwise(0)
)  # Adjust threshold as needed

# Encode categorical features
indexer_cols = ["BLART", "BSCHL", "KOART", "KOSTL", "PRCTR", "LIFNR", "KUNNR"]
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep").fit(df)
    for col in indexer_cols
]

for indexer in indexers:
    df = indexer.transform(df)

# Convert Debit/Credit Indicator (SHKZG) to numerical
df = df.withColumn("SHKZG_num", when(col("SHKZG") == "S", 1).otherwise(0))

# Define features for the model
feature_cols = (
    fraud_features + ["Fraud_Score"] + [col + "_index" for col in indexer_cols]
)

# Assemble features
assembler = VectorAssembler(
    inputCols=feature_cols, outputCol="features", handleInvalid="keep"
)
df = assembler.transform(df)

# Select final columns
df = df.select("features", "Fraud_Flag")

# Split dataset into training and testing sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Train logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="Fraud_Flag", maxIter=10)
model = lr.fit(train_data)

# Evaluate the model
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(
    labelCol="Fraud_Flag", rawPredictionCol="prediction"
)
auc = evaluator.evaluate(predictions)

print(f"Model AUC: {auc:.4f}")

# Show some predictions
predictions.select("features", "Fraud_Flag", "prediction", "probability").show(10)

### Step 3: Train Anomaly Detection Model
Goal: Detect fraudulent transactions (e.g., duplicate payments, unusual vendor transactions)

Algorithm: Isolation Forest (SAP Data)

In [0]:
%python
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml import Pipeline
import mlflow.spark

# Convert string columns to numeric
df = df.withColumn("KOSTL", df["KOSTL"].cast("double"))
df = df.withColumn("PRCTR", df["PRCTR"].cast("double"))

# Scale features
assembler = VectorAssembler(
    inputCols=["DMBTR", "HKONT", "KOSTL", "PRCTR"],
    outputCol="features",
    handleInvalid="skip"
)
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features"
)
kmeans = BisectingKMeans(
    featuresCol="scaled_features",
    k=2
)  # k=2 for normal vs anomaly

# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, kmeans])

# Train model
anomaly_model = pipeline.fit(df)

# Predict anomalies
anomaly_predictions = anomaly_model.transform(df)
display(anomaly_predictions.select(
    "BUKRS", "BELNR", "GJAHR", "DMBTR", "KOSTL", "PRCTR", "prediction"
).limit(10))

# Log model
mlflow.spark.log_model(anomaly_model, "sap_anomaly_detection_model")

### Step 4: Train Cash Flow Forecasting Model
Goal: Predict future cash inflows and outflows

Algorithm: Prophet (Time-Series Model for SAP)

In [0]:
import pandas as pd
from prophet import Prophet

# Convert Spark DataFrame to Pandas (Aggregating cash transactions)
cash_flow_data = df.select("BLDAT", "DMBTR").groupBy("BLDAT").sum().toPandas()

# Rename columns for Prophet
cash_flow_data.rename(columns={"BLDAT": "ds", "sum(DMBTR)": "y"}, inplace=True)

# Train forecasting model
model = Prophet()
model.fit(cash_flow_data)

# Generate future predictions
future = model.make_future_dataframe(periods=90)  # Predict for next 90 days
forecast = model.predict(future)

# Display forecast
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()

# Save model
mlflow.prophet.log_model(model, "sap_cash_flow_forecasting_model")


### Step 5: Deploy & Monitor Models
Deploy these models for real-time insights.

In [0]:
# Save models for inference
model.write().overwrite().save("dbfs:/models/sap_transaction_classification")
anomaly_model.write().overwrite().save("dbfs:/models/sap_anomaly_detection")
