# XAI Fraud Agent

## Phase 1: The ML & Explainability Layer

### Import Libraries for XGBoost Model

In [3]:
import xgboost as xgb
import json
import pandas as pd
import numpy as np
import shap


  from .autonotebook import tqdm as notebook_tqdm


### Preprocessing

In [4]:
def preprocess_transaction(transaction_row, preprocessor=None):
    """
    Helper function to preprocess a single transaction row.
    Returns processed DataFrame.
    """
    # Convert to DataFrame if it's a dict or Series
    if not isinstance(transaction_row, pd.DataFrame):
        df_input = pd.DataFrame([transaction_row])
    else:
        df_input = transaction_row.copy()
        
    # 1. Drop irrelevant columns
    cols_to_drop = ['month', 'device_fraud_count', 'fraud_bool']
    df_input = df_input.drop(columns=[c for c in cols_to_drop if c in df_input.columns], errors='ignore')
    
    # 2. Convert types
    for col in df_input.columns:
        if col not in ['payment_type', 'employment_status', 'housing_status', 'source', 'device_os']:
             df_input[col] = pd.to_numeric(df_input[col], errors='coerce')
                
    # 3. Handle Missing Values
    missing_cols = [
        "prev_address_months_count", "current_address_months_count",
        "bank_months_count", "session_length_in_minutes"
    ]
    for col in missing_cols:
        if col in df_input.columns:
             df_input[col] = df_input[col].replace(-1, np.nan)
    
    # 4. Apply OneHotEncoding
    if preprocessor:
        try:
            X_transformed = preprocessor.transform(df_input)
            return X_transformed
        except Exception as e:
            print(f"Preprocessing error: {e}")
            return None
    else:
        return df_input

### Predict Function

In [5]:
def predict(transaction_row, preprocessor, model_params_path='XGBoostModelParameters.json', model_path='XGBoostModel.json'):
    """
    Takes a single transaction row, preprocesses it, and returns the fraud probability.
    """
    # Load parameters
    try:
        with open(model_params_path, 'r') as file:
            loaded_params = json.load(file)
    except FileNotFoundError:
        print(f"Error: {model_params_path} not found.")
        return None

    X_transformed = preprocess_transaction(transaction_row, preprocessor)
    if X_transformed is None:
        return None
    
    X_numpy = X_transformed.to_numpy()

    # Load Model (Note: This assumes model file exists)
    try:
        model = xgb.XGBClassifier(**loaded_params)
        model.load_model(model_path)
    except Exception as e:
        print(f"Error loading model: {e}")
        return None
    
    # Inference
    try:
        probability = model.predict_proba(X_numpy)[0, 1]
        return float(probability)
    except Exception as e:
        print(f"Prediction error: {e}")
        return None


### Create SHAP Explanation Values

In [6]:
def get_shap_explanation(transaction_data, model, preprocessor):
    """
    Generates a SHAP explanation for a single transaction.
    Returns a dictionary with fraud probability and top 3 contributing features.
    """
    # Preprocess
    X_transformed = preprocess_transaction(transaction_data, preprocessor)
    if X_transformed is None:
        return {"error": "Preprocessing failed"}
    
    # Ensure we use DataFrame for column names in SHAP
    feature_names = preprocessor.get_feature_names_out() if hasattr(preprocessor, 'get_feature_names_out') else X_transformed.columns
    X_df = pd.DataFrame(X_transformed, columns=feature_names)
    
    # Calculate SHAP values
    explainer = shap.TreeExplainer(model)
    shap_values = explainer(X_df)
    
    # Get values for the first (and only) row
    # shap_values.values shape is (1, n_features)
    # Binary classification: some shap versions output values for both classes, some just one.
    # For XGBClassifier binary, it usually outputs log-odds for class 1.
    
    row_values = shap_values.values[0]
    # base_value = shap_values.base_values[0] # Not strictly needed for top 3
    data_values = X_df.iloc[0]
    
    # Calculate probability
    prob = model.predict_proba(X_df)[0, 1]
    
    # Identify top 3 features pushing score HIGHER (positive contribution to fraud class)
    # We want features that increase the probability of fraud.
    
    # Create list of (feature_name, shap_value, feature_value)
    contributions = []
    
    # Handle multi-class output shape if SHAP returns (1, n_features, 2)
    if len(row_values.shape) > 1:
        # Assuming class 1 is index 1
        row_values = row_values[:, 1]

    for name, val, feat_val in zip(X_df.columns, row_values, data_values):
        contributions.append((name, val, feat_val))
    
    # Sort by SHAP value descending (highest positive impact first)
    contributions.sort(key=lambda x: x[1], reverse=True)
    
    top_3 = contributions[:3]
    
    top_reasons = []
    for name, val, feat_val in top_3:
        # Clean up feature name (remove 'cat__' etc if present)
        clean_name = str(name).replace('cat__', '').replace('remainder__', '')
        
        # Format based on value type
        if isinstance(feat_val, (int, float)):
             reason = f"{clean_name} = {feat_val:.2f}"
        else:
             reason = f"{clean_name} = {feat_val}"
        
        top_reasons.append(reason)
        
    return {
        "score": float(prob),
        "top_reasons": top_reasons
    }

## Phase 2: The Data Environment

In [8]:
import sqlite3
import kagglehub

# Setup SQLLite connection 
connection = sqlite3.connect("Fraud_Agent.db")

# Download latest version
path = kagglehub.dataset_download("sgpjesus/bank-account-fraud-dataset-neurips-2022")

#print("Path to dataset files:", path)
# ensure we point to a .csv file (dataset_download may return a path without extension)
csv_path = str(path) + "/Base.csv"

# read the CSV into a DataFrame and setup the final test data
df = pd.read_csv(csv_path)
mask = df["month"] == 7
full_test_data = df[mask].sample(frac=0.5).reset_index(drop=True).drop('month',axis=1) 

print(full_test_data)

# Setup a Table in SQL
table_name = "transaction_history"
full_test_data.to_sql(table_name, connection, if_exists='replace', index=False)

# Verify the data was written by reading it back into a new DataFrame
query = f"SELECT * FROM {table_name}"
result_df = pd.read_sql_query(query, connection)
print("\nData read from SQLite table:")
#print(result_df)

# Close the database connection
connection.close()


       fraud_bool  income  name_email_similarity  prev_address_months_count  \
0               0     0.9               0.335133                         -1   
1               0     0.7               0.304756                         -1   
2               0     0.9               0.341796                        280   
3               0     0.4               0.998801                         -1   
4               0     0.9               0.712391                         -1   
...           ...     ...                    ...                        ...   
48417           1     0.8               0.117968                         -1   
48418           0     0.8               0.350479                         -1   
48419           0     0.5               0.590204                         12   
48420           0     0.1               0.068729                         -1   
48421           0     0.5               0.734618                         -1   

       current_address_months_count  customer_age  

In [26]:
def get_db_size_pragma(db_path):
    """
    Gets the size of a SQLite database in bytes using PRAGMA statements.
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Get page count
    cursor.execute("PRAGMA page_count;")
    page_count = cursor.fetchone()[0]

    # Get page size
    cursor.execute("PRAGMA page_size;")
    page_size = cursor.fetchone()[0]

    conn.close()

    # Calculate total size in bytes
    size_in_bytes = page_count * page_size
    return size_in_bytes

db_file_path = "Fraud_Agent.db"
size = get_db_size_pragma(db_file_path)

print(f"The size of the database is: {size} bytes (via PRAGMA)")

The size of the database is: 13254656 bytes (via PRAGMA)


In [15]:
import os
#os.remove("Fraud_Agent.db")

### The Vector Store. 
Write a script to read that PDF, split it into chunks using LangChain's RecursiveCharacterTextSplitter, and save it into a local ChromaDB vector database.

In [18]:

import os
import chromadb
from langchain_chroma import Chroma
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.embeddings.sentence_transformer import (
    SentenceTransformerEmbeddings,
)
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Fix for SQLite on Mac (common issue with ChromaDB)
import sqlite3
import sys
if sys.platform.startswith('darwin'):
    try:
        __import__('pysqlite3')
        sys.modules['sqlite3'] = sys.modules['pysqlite3']
    except ImportError:
        pass

# Configuration
CHROMA_PATH = "./test_chroma"
CHROMA_COLLECTION_NAME = "reports"
PDF_PATH = "Fraud_Detection_Policy.pdf"

def ingest_pdf():
    # 1. Load PDF
    if not os.path.exists(PDF_PATH):
        print(f"Error: {PDF_PATH} not found.")
        return

    print(f"Loading {PDF_PATH}...")
    loader = PyPDFLoader(PDF_PATH)
    documents = loader.load()

    # 2. Split Text
    print("Splitting text...")
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
    chunked_documents = text_splitter.split_documents(documents)
    
    print(f"Created {len(chunked_documents)} chunks.")

    # 3. Initialize Embeddings
    print("Initializing embeddings...")
    embedding_function = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2")

    # 4. Initialize Chroma Client
    print("Initializing ChromaDB client...")
    chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)

    # 5. Add to Chroma
    # The error in the notebook was using os.getenv(CHROMA_COLLECTION_NAME), which returned None.
    # We use the variable directly here.
    print(f"Adding documents to collection '{CHROMA_COLLECTION_NAME}'...")
    Chroma.from_documents(
        documents=chunked_documents,
        embedding=embedding_function,
        collection_name=CHROMA_COLLECTION_NAME, # Fixed: Use string directly
        client=chroma_client,
    )
    
    print(f"Successfully added {len(chunked_documents)} chunks to ChromaDB at {CHROMA_PATH}")

if __name__ == "__main__":
    ingest_pdf()


Loading Fraud_Detection_Policy.pdf...
Splitting text...
Created 3 chunks.
Initializing embeddings...


Loading weights: 100%|██████████| 103/103 [00:00<00:00, 415.15it/s, Materializing param=pooler.dense.weight]                             
[1mBertModel LOAD REPORT[0m from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

[3mNotes:
- UNEXPECTED[3m	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.[0m


Initializing ChromaDB client...
Adding documents to collection 'reports'...
Successfully added 3 chunks to ChromaDB at ./test_chroma
