# The Multi-Stage Fraud Detection Funnel: A Playbook

This notebook implements the definitive fraud detection playbook. It is designed as a multi-stage funnel to systematically process and score purchase order data, starting with broad, efficient filters and progressing to more sophisticated, computationally intensive techniques.

**Objective:** To create a fully scored output dataset that identifies and quantifies fraudulent activity based on a series of predefined rules.

## Part 0: Environment Setup

Before running this notebook, you must configure your Google Cloud environment.

### 1. Configure Project Variables
In the next cell, replace the placeholder values with your specific Google Cloud `PROJECT_ID`, the `DATASET_ID` you want to use in BigQuery, and the `REGION` for your resources.

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
import os

# --- CONFIGURATION ---
# Replace with your project details
PROJECT_ID = "" # @param {type:"string"}
DATASET_ID = ""# @param {type:"string"}
REGION = "us-central1" # @param {type:"string"}
GOOGLE_MAPS_API_KEY = ""# @param {type:"string"}
FUNCTION_ENDPOINT = ""# @param {type:"string"}

# UNCOMMENT IF RUNNING IN COLLAB ENV

# --- Authenticate and set up project ---
#from google.colab import auth
#auth.authenticate_user()

os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
print(f"Project set to: {PROJECT_ID}")

# --- BigQuery Client ---
from google.cloud import bigquery
bq_client = bigquery.Client(project=PROJECT_ID)

# --- Helper function to run queries ---
def run_bq_query(sql: str):
    """Helper to run BQ queries and print job details."""
    print("Executing query...")
    # Replace placeholders in the query
    sql = sql.replace("project_id.dataset", f"{PROJECT_ID}.{DATASET_ID}")
    job = bq_client.query(sql)
    try:
        job.result()  # Wait for the job to complete
        print(f"Successfully finished job: {job.job_id}")
    except Exception as e:
        print(f"Job failed: {job.job_id}")
        print(f"Error details: {e}")

Project set to: liuchristie-111-20250627151202


### 2. Enable APIs
Make sure the following APIs are enabled in your Google Cloud project:
- BigQuery API
- Cloud Functions API
- Cloud Build API
- Address Validation API
- Vertex AI API
- Cloud Run Admin API (aka Cloud Run API)
- BigQuery Connection API

### 3. Create BigQuery Dataset
The following command will create the BigQuery dataset where all your tables will be stored.

In [None]:
dataset = bigquery.Dataset(f"{PROJECT_ID}.{DATASET_ID}")
dataset.location = REGION
bq_client.create_dataset(dataset, exists_ok=True)
print(f"Dataset '{DATASET_ID}' created or already exists.")

Dataset 'orders2k' created or already exists.


## Part 2: The Fraud Detection Playbook

Now we begin the multi-stage analysis funnel as defined in the playbook.

### Stage 0A: Data Ingestion and Normalization
**Goal:** Create a clean, standardized, and indexed base table. This is the most critical step for the accuracy of all subsequent analysis.

**Action:** Create a materialized table `orders_normalized` with cleaned fields and a UDF for Levenshtein distance.

In [None]:
# NOTE: Cast the 'zip' column to a STRING before applying TRIM.
# NOTE: The regex for trimming 3-char prefixes is an example only, consider expanding or using AI/ML and LLM to expand this (TODO)
sql_stage_0_normalize = r"""
CREATE OR REPLACE TABLE `project_id.dataset`.orders_normalized AS (
  SELECT
    CAST(order_id AS STRING) AS order_id, -- Cast to string for consistency
    created_at, -- The source column is already a TIMESTAMP, no parsing needed.
    sku,
    ordered_quantity,
    -- Normalize names for consistent matching
    LOWER(TRIM(first_name)) AS first_name,
    LOWER(TRIM(last_name)) AS last_name,
    -- Store original address fields for context and fuzzy matching later
    address1 AS original_address1,
    address2 AS original_address2,
    city AS original_city,
    -- RULE #3 (Address Obfuscation) & #12 (Formatting Anomalies): Normalize the primary address line.
    -- This chain cleans common bot-generated variations before analysis.
    LOWER(
      TRIM(
        REGEXP_REPLACE( -- Standardize street types (ave -> avenue, etc.) for better grouping
          REGEXP_REPLACE( -- Remove random 3-char prefixes (e.g., '5EB ', '2TK ') seen in data
            address1, r'^[a-z0-9]{3}\s', ''
          ), r'\b(ave|av|le|ln|layn|waj|wey|stret)\b', 'way'
        )
      )
    ) AS normalized_address1,
    LOWER(TRIM(address2)) AS normalized_address2,
    -- Normalize location fields
    LOWER(TRIM(city)) AS city,
    LOWER(TRIM(country_code)) AS country_code,
    LOWER(TRIM(province_code)) AS province_code,
    -- CORRECTED: The 'zip' column is a number (INT64), so it must be CAST to a STRING before TRIM.
    TRIM(CAST(zip AS STRING)) AS zip,
    -- RULE #5 (Geospatial Clustering): Create a GEOGRAPHY point for optimized geospatial functions.
    ST_GEOGPOINT(longitude, latitude) AS geo_point,
    latitude,
    longitude,
    fraud_rules
  FROM
    `project_id.dataset`.raw_orders
  WHERE
    -- Basic data quality checks to prevent errors in subsequent stages
    latitude IS NOT NULL AND longitude IS NOT NULL AND latitude BETWEEN -90 AND 90 AND longitude BETWEEN -180 AND 180
    AND first_name IS NOT NULL
    AND address1 IS NOT NULL
);
"""

sql_stage_0_udf = """
CREATE OR REPLACE FUNCTION `project_id.dataset`.levenshtein(a STRING, b STRING)
RETURNS INT64
LANGUAGE js AS r'''
  if (a.length === 0) return b.length;
  if (b.length === 0) return a.length;
  const matrix = Array(b.length + 1).fill(null).map(() => Array(a.length + 1).fill(null));
  for (let i = 0; i <= a.length; i++) { matrix[0][i] = i; }
  for (let j = 0; j <= b.length; j++) { matrix[j][0] = j; }
  for (let j = 1; j <= b.length; j++) {
    for (let i = 1; i <= a.length; i++) {
      const cost = a[i - 1] === b[j - 1] ? 0 : 1;
      matrix[j][i] = Math.min(matrix[j - 1][i] + 1, matrix[j][i - 1] + 1, matrix[j - 1][i - 1] + cost);
    }
  }
  return matrix[b.length][a.length];
''';
"""

run_bq_query(sql_stage_0_normalize)
run_bq_query(sql_stage_0_udf)

Executing query...
Successfully finished job: 70be4b58-a5f8-4c1f-82bc-42501a8e7074
Executing query...
Successfully finished job: 4b13b2c0-8405-4a24-bac5-c67ed10a0c6a


### Stage 0B: Set Up Remote Function for Address Validation

**Goal:** Use the Google Maps Address Validation API to check the validity of shipping addresses.

**Action:** This is a multi-step process:
1. **Set up Google Maps Platform**: [Instructions here](https://developers.google.com/maps/get-started)
2. **Enable API & Generate API Key**: Enable [Address Validation API](https://developers.google.com/maps/documentation/address-validation/get-api-key) and generate API key
3. **(Recommended) Secret Manager**: [Create a secret](https://cloud.google.com/secret-manager/docs/create-secret-quickstart) for API key with name `MAPS_API_KEY`. Ensure you grant your Cloud Functions service account `Secret Manager Secret Accessor` role
4.  **Create a Cloud Function:** The Python code below will be deployed as a secure Cloud Function. (Recommended) Configure environment variables: `Secrets exposed as environment variables`. Ensure sufficient CPU and memory.
5.  **Create a BigQuery Connection:** This allows BigQuery to securely invoke the Cloud Function. Grant connection service account `Cloud Run Invoker` and `Cloud Functions Invoker` roles.
6.  **Create a BigQuery Remote Function:** This maps a SQL function name to the Cloud Function endpoint.

**NOTE: We tested 2,000 records on Cloud Run instance with 2 x CPU and 1GB RAM.**



#### Step 1: Define the Cloud Function

Create a new directory `address_validator`, and save the following code as `main.py` inside it. Also create an empty `requirements.txt` file and add `requests` and `google-cloud-functions` to it.

In [None]:
!pip install functions_framework

In [None]:
%%writefile address_validator/main.py
import os
import requests
import functions_framework
from flask import jsonify
import time
import random


API_KEY = os.environ.get('MAPS_API_KEY')
API_URL = f"https://addressvalidation.googleapis.com/v1:validateAddress?key={API_KEY}"

def make_request_with_backoff(payload):
    """
    Makes a request to the Address Validation API with exponential backoff.
    This handles transient errors like rate limiting (429) and server errors (5xx).
    """
    max_retries = 5
    base_delay = 1.0  # seconds

    for i in range(max_retries):
        try:
            response = requests.post(API_URL, json=payload)

            if response.status_code == 200:
                return response.json()

            if 400 <= response.status_code < 500 and response.status_code != 429:
                response.raise_for_status() # Raise an exception immediately

            # For rate limits or server errors, proceed to backoff and retry
            print(f"Request failed with status {response.status_code}. Retrying in {base_delay * (2 ** i):.2f}s...")

        except requests.exceptions.RequestException as e:
            print(f"Request failed with exception: {e}. Retrying...")

        # If this was the last retry, raise the final error
        if i == max_retries - 1:
            response.raise_for_status()

        # Calculate delay with jitter (randomness) to avoid thundering herd
        delay = (base_delay * (2 ** i)) + random.uniform(0, 1)
        time.sleep(delay)

@functions_framework.http
def validate_address(request):
    try:
        request_json = request.get_json()
        calls = request_json['calls']
        replies = []

        for call in calls:
            address1, address2, city, zip_code, province_code = call

            address_lines = [address1]
            if address2:
                address_lines.append(address2)

            address_payload = {
                "address": {
                    "regionCode": "US",
                    "locality": city,
                    "administrativeArea": province_code,
                    "postalCode": str(zip_code),
                    "addressLines": address_lines
                },
                "enableUspsCass": True
            }

            # backoff logic
            response_data = make_request_with_backoff(address_payload)
            result = response_data.get('result', {})
            validation_status = process_validation_result(result)

            replies.append(validation_status)

        return jsonify({'replies': replies})

    except Exception as e:
        # Return error details in the expected JSON format for BigQuery
        return jsonify({'errorMessage': str(e)}), 400

def process_validation_result(result):
    """
    Interprets the API result based on the specified business rules.
    Returns a string for the 'review' table or None if the address is OK.
    """
    verdict = result.get('verdict', {})
    usps_data = result.get('uspsData', {})
    dpv_confirmation = usps_data.get('dpvConfirmation')

    # Check for unconfirmed components first, as it's a strong signal
    if verdict.get('hasUnconfirmedComponents'):
        return 'Unconfirmed components'

    if dpv_confirmation == 'Y': # Confirmed by USPS
        return None # Address is considered valid
    elif dpv_confirmation == 'S': # Confirmed, but secondary number is wrong
        return 'Sub-premise does not exist according to USPS'
    elif dpv_confirmation == 'D': # Confirmed, but secondary number is missing
        return 'Multi-tenant building, missing subpremise'
    else: # 'N' or missing. Address is not recognized at all.
        return 'Address not recognized by USPS'



Writing address_validator/main.py


In [None]:
%%writefile address_validator/requirements.txt
functions-framework==3.*
requests

Writing address_validator/requirements.txt


#### Step 2: Deploy the Cloud Function

Run the following `gcloud` command in your terminal. **Remember to replace `your-maps-api-key` with your actual Google Maps API key.** It is highly recommended to use Secret Manager for this.

In [None]:
# This command should be run in a gcloud shell, not in the notebook.
FUNCTION_NAME = "validate_address_bq"

!gcloud functions deploy {FUNCTION_NAME} \
 --gen2 \
 --region={REGION} \
 --runtime=python311 \
 --source=./address_validator \
 --entry-point=validate_address \
 --trigger-http \
 --allow-unauthenticated \
 --set-env-vars MAPS_API_KEY=your-maps-api-key

## #### Step 3: Create the BigQuery Connection and Remote Function **(IN-WORK: May need to perform by hand, console)**

This sets up the connection that allows BigQuery to call the function you just deployed.

In [None]:
# Run this in a gcloud shell to get the service account ID
!gcloud bq connections describe bq-remote-function-connection --location=US --format="value(cloudResource.serviceAccountId)"

# You must then grant the 'Cloud Functions Invoker' role to this service account on your function.
# Find your function in the Cloud Console, go to Permissions, and add the service account ID as a new principal.

In [None]:
!bq mk --connection --location={REGION} --project_id={PROJECT_ID} --connection_type=CLOUD_RESOURCE bq-remote-function-connection

After getting the function URL from the deployment step, run the SQL below.

In [None]:
FUNCTION_ENDPOINT = "https://validate-address-bq-lavgppolfa-uc.a.run.app"

sql_stage_2a_remote_function = f"""
CREATE OR REPLACE FUNCTION project_id.dataset.validate_address (
  normalized_address1 STRING,
  normalized_address2 STRING,
  city STRING,
  zip STRING,
  province_code STRING
) RETURNS STRING
REMOTE WITH CONNECTION `{PROJECT_ID}.{REGION}.bq-remote-function-connection`
OPTIONS (
  endpoint = '{FUNCTION_ENDPOINT}'
);
"""

run_bq_query(sql_stage_2a_remote_function)

### Stage 1: High-Velocity, Address Validation and Broad Flagging

**Goal:** Efficiently identify orders that are part of a rapid-fire purchasing burst.

**Action:** Create a table `orders_flagged` containing orders that have at least one initial fraud signal.

#### Step 1: Run Velocity Check and Address Validation and Merge Results

Now, invoke the remote function on all unique addresses and merge any identified issues into the `orders_flagged` table. We process only distinct addresses to minimize API calls.

In [None]:
sql_stage_1 = """
CREATE OR REPLACE TABLE `project_id.dataset.orders_flagged` AS (
  WITH AddressResults AS (
      SELECT
        normalized_address1, normalized_address2, city, zip, province_code,
        `project_id.dataset`.validate_address(
          normalized_address1, normalized_address2, city, zip, province_code
        ) AS address_validation_issue
      FROM (
        SELECT DISTINCT normalized_address1, normalized_address2, city, zip, province_code
        FROM `project_id.dataset.orders_normalized`
        WHERE normalized_address1 IS NOT NULL AND city IS NOT NULL AND zip IS NOT NULL AND province_code IS NOT NULL
      )
    ), PerOrderSignals AS (
      SELECT
        *,
        TIMESTAMP_DIFF(
          created_at,
          LAG(created_at, 1) OVER (PARTITION BY ST_GEOHASH(geo_point, 5) ORDER BY created_at),
          SECOND
        ) AS seconds_since_proximal_order
      FROM `project_id.dataset.orders_normalized`
      WHERE geo_point IS NOT NULL

      UNION ALL

      SELECT
        *,
        CAST(NULL AS INT64) AS seconds_since_proximal_order
      FROM `project_id.dataset.orders_normalized`
      WHERE geo_point IS NULL
  ), FinalFlags AS(
      SELECT
        s.*,
        r.address_validation_issue,
        ARRAY_CONCAT(
          IF(s.seconds_since_proximal_order <= 10, ['HIGH_VELOCITY'], []),
          IF(r.address_validation_issue IS NOT NULL, ['ADDRESS_VALIDATION_ISSUE'], [])
        ) AS fraud_flags
      FROM PerOrderSignals AS s
      LEFT JOIN AddressResults AS r
        ON s.normalized_address1 = r.normalized_address1
        AND s.normalized_address2 = r.normalized_address2
        AND s.city = r.city
        AND s.zip = r.zip
        AND s.province_code = r.province_code
    )
SELECT *
FROM FinalFlags
WHERE ARRAY_LENGTH(fraud_flags) > 0
);
"""

run_bq_query(sql_stage_1)

Executing query...
Successfully finished job: e647f2f8-7ec5-4257-8a1a-befc3aca5947


### Stage 2B: Suspicious Cluster Identification

**Goal:** Group the flagged orders into discrete "incident clusters" based on physical proximity (Rule #5).

**Action:** Use `ST_CLUSTERDBSCAN` to assign a `cluster_id` to each suspicious order.

In [None]:
sql_stage_2b = """
CREATE OR REPLACE TABLE `project_id.dataset`.suspicious_clusters_stage2 AS (
  SELECT
    *,
    -- Groups points within 75 meters if at least 5 points are found. Tune these values as needed.
    ST_CLUSTERDBSCAN(geo_point, 75, 5) OVER() AS cluster_id
  FROM
    `project_id.dataset`.orders_flagged
  WHERE ARRAY_LENGTH(fraud_flags) > 0 -- only process orders that have at least one flag
);
"""
run_bq_query(sql_stage_2b)

Executing query...
Successfully finished job: 9e334d4b-0efe-4673-8bbe-6c4d64f33cc5


### Stage 3: Deep Analysis & Grounding (Hybrid Model)

Now we perform more complex checks on the much smaller, clustered dataset.

#### Stage 3a: Intra-Cluster SQL Analysis

**Goal:** Identify fuzzy matches (Rule #7) and sequential addresses (Rule #10) within each cluster.

In [None]:
sql_stage_3a = """
CREATE OR REPLACE TABLE `project_id.dataset`.cluster_analysis_flags_stage3 AS (
  WITH ClusterPairs AS (
    -- Self-join clusters to compare every order with every other order in the same incident.
    SELECT
      a.cluster_id,
      a.order_id,
      -- RULE #7 (Fuzzy Address Similarity): Calculate edit distance on original address.
      `project_id.dataset`.levenshtein(a.original_address1, b.original_address1) AS address_edit_distance,
      -- RULE #10 (Sequential Address Generation): Check for sequential numbers in street address.
      ABS(
        SAFE_CAST(REGEXP_EXTRACT(a.normalized_address1, r'^\\d+') AS INT64) -
        SAFE_CAST(REGEXP_EXTRACT(b.normalized_address1, r'^\\d+') AS INT64)
      ) AS street_number_diff
    FROM
      `project_id.dataset`.suspicious_clusters_stage2 a
    JOIN
      `project_id.dataset`.suspicious_clusters_stage2 b ON a.cluster_id = b.cluster_id AND a.order_id < b.order_id
    WHERE a.cluster_id IS NOT NULL AND b.cluster_id IS NOT NULL
  )
  SELECT
    order_id,
    ARRAY_AGG(DISTINCT flag) as detailed_flags
  FROM (
    SELECT order_id, 'FUZZY_ADDRESS_MATCH' as flag FROM ClusterPairs WHERE address_edit_distance BETWEEN 1 AND 3
    UNION ALL
    SELECT order_id, 'SEQUENTIAL_ADDRESS' as flag FROM ClusterPairs WHERE street_number_diff = 1
  )
  GROUP BY order_id
);
"""
run_bq_query(sql_stage_3a)

#### Stage 3b: Geospatial Grounding (Bulk API for Gross Errors)

**Goal:** Catch blatant location mismatches (Rule #6) by comparing order coordinates with the stated address city/state.

**Action:**
1. Export unique coordinates from the suspicious clusters.
2. Use a Python script to call the **Maps Reverse Geocoding API**.
3. Load the results into a new BigQuery table `geo_validation_results`.

In [None]:
# Step 1: Export unique coordinates to a Pandas DataFrame
sql_export_coords = """
SELECT DISTINCT cluster_id, latitude, longitude, city, province_code, country_code
FROM `project_id.dataset`.suspicious_clusters_stage2
WHERE cluster_id IS NOT NULL;
"""
coords_to_validate_df = bq_client.query(sql_export_coords.replace("project_id.dataset", f"{PROJECT_ID}.{DATASET_ID}")).to_dataframe()

print(f"Found {len(coords_to_validate_df)} unique coordinates to validate.")

Found 102 unique coordinates to validate.


In [None]:
# Step 2: Python script to call Reverse Geocoding API
import requests
import pandas as pd
from tqdm.auto import tqdm

# MAPS_API_KEY = "your-maps-api-key" # Replace with your key

results = []
for index, row in tqdm(coords_to_validate_df.iterrows(), total=coords_to_validate_df.shape[0]):
    lat, lon = row['latitude'], row['longitude']
    url = f"https://maps.googleapis.com/maps/api/geocode/json?latlng={lat},{lon}&key={GOOGLE_MAPS_API_KEY}"

    try:
        response = requests.get(url).json()
        if response['status'] == 'OK':
            # Extract components from the first result
            api_city, api_state, api_country = None, None, None
            for component in response['results'][0]['address_components']:
                if 'locality' in component['types']:
                    api_city = component['long_name']
                if 'administrative_area_level_1' in component['types']:
                    api_state = component['short_name']
                if 'country' in component['types']:
                    api_country = component['short_name']

            status = 'MATCH'
            if api_country and row['country_code'].upper() != api_country:
                status = 'MISMATCH_COUNTRY'
            elif api_state and row['province_code'].upper() != api_state:
                status = 'MISMATCH_STATE'

            results.append({'latitude': lat, 'longitude': lon, 'validation_status': status})
        else:
            results.append({'latitude': lat, 'longitude': lon, 'validation_status': 'API_ERROR'})
    except Exception as e:
        results.append({'latitude': lat, 'longitude': lon, 'validation_status': 'SCRIPT_ERROR'})

geo_results_df = pd.DataFrame(results)

# Step 3: Load results back to BigQuery
table_id = f"{PROJECT_ID}.{DATASET_ID}.geo_validation_results"
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("latitude", "FLOAT64"),
        bigquery.SchemaField("longitude", "FLOAT64"),
        bigquery.SchemaField("validation_status", "STRING"),
    ],
    write_disposition="WRITE_TRUNCATE",
)
load_job = bq_client.load_table_from_dataframe(geo_results_df, table_id, job_config=job_config)
load_job.result() # Wait for completion
print(f"Loaded {len(geo_results_df)} geo validation results to {table_id}")

  0%|          | 0/102 [00:00<?, ?it/s]

Loaded 102 geo validation results to liuchristie-111-20250627151202.orders2k.geo_validation_results


### Stage 4: Unified Scoring and Final Rule Implementation

**Goal:** Consolidate all evidence, implement the remaining business logic rules, and calculate a definitive fraud score for each cluster.

**Action:** Create the final, fully-scored cluster summary table `final_scored_clusters_stage4`.

In [None]:
sql_stage_4 = """
CREATE OR REPLACE TABLE `project_id.dataset`.final_scored_clusters_stage4 AS (
  WITH
    AllFlagsPerOrder AS (
      -- This CTE remains the same
      SELECT order_id, flag FROM `project_id.dataset`.orders_flagged, UNNEST(fraud_flags) AS flag
      UNION ALL
      SELECT order_id, flag FROM `project_id.dataset`.cluster_analysis_flags_stage3, UNNEST(detailed_flags) AS flag
      UNION ALL
      SELECT o.order_id, g.validation_status AS flag
      FROM `project_id.dataset`.suspicious_clusters_stage2 o
      JOIN `project_id.dataset`.geo_validation_results g ON o.latitude = g.latitude AND o.longitude = g.longitude WHERE g.validation_status != 'MATCH'
    ),
    TopSkuPerCluster AS (
      --  #1: Pre-calculate the top SKU for each cluster
      SELECT cluster_id, sku AS top_sku
      FROM (
        SELECT cluster_id, sku, ROW_NUMBER() OVER(PARTITION BY cluster_id ORDER BY COUNT(*) DESC) as rn
        FROM `project_id.dataset`.suspicious_clusters_stage2
        GROUP BY cluster_id, sku
      )
      WHERE rn = 1
    ),
    ClusterStats AS (
      SELECT
        s.cluster_id,
        COUNT(DISTINCT s.order_id) AS total_orders,
        COUNT(DISTINCT s.normalized_address2) AS distinct_units,
        COUNT(DISTINCT CONCAT(s.first_name, ' ', s.last_name)) AS distinct_names,
        SAFE_DIVIDE(MAX(sku_counts.sku_count_in_cluster), COUNT(s.order_id)) AS top_sku_concentration,
        STDDEV_SAMP(s.ordered_quantity) AS quantity_stddev,
        LOGICAL_OR(s.original_city != s.city) AS has_formatting_anomalies,
        ARRAY_AGG(DISTINCT f.flag IGNORE NULLS) AS all_cluster_flags,
        FARM_FINGERPRINT(
          CONCAT(
            CAST(COUNT(DISTINCT CONCAT(s.first_name, ' ', s.last_name)) AS STRING), '-',
            CAST(COUNT(DISTINCT s.normalized_address2) AS STRING), '-',
            -- FIX #2: Use the pre-calculated top SKU from the JOIN
            top_skus.top_sku
          )
        ) AS cluster_fingerprint
      FROM `project_id.dataset`.suspicious_clusters_stage2 s
      LEFT JOIN AllFlagsPerOrder f ON s.order_id = f.order_id
      LEFT JOIN (SELECT cluster_id, sku, COUNT(*) as sku_count_in_cluster FROM `project_id.dataset`.suspicious_clusters_stage2 GROUP BY 1, 2) sku_counts
          ON s.cluster_id = sku_counts.cluster_id AND s.sku = sku_counts.sku
      -- FIX #3: JOIN the top SKU information
      LEFT JOIN TopSkuPerCluster top_skus ON s.cluster_id = top_skus.cluster_id
      WHERE s.cluster_id IS NOT NULL
      GROUP BY s.cluster_id, top_skus.top_sku -- Also add top_sku to GROUP BY
  )
  SELECT
    *,
    (
      (IF('HIGH_VELOCITY' IN UNNEST(all_cluster_flags), 15, 0)) +
      (IF(distinct_units > 20, 25, 0)) +
      (IF(distinct_names > 20, 20, 0)) +
      (IF('FUZZY_ADDRESS_MATCH' IN UNNEST(all_cluster_flags), 40, 0)) +
      (IF('SEQUENTIAL_ADDRESS' IN UNNEST(all_cluster_flags), 35, 0)) +
      (IF(ARRAY_TO_STRING(all_cluster_flags, ',') LIKE '%MISMATCH%', 100, 0)) +
      (IF(top_sku_concentration > 0.9, 15, 0)) +
      (IF(quantity_stddev = 0 AND total_orders > 5, 10, 0)) +
      (IF(has_formatting_anomalies, 5, 0)) +
      (IF('ADDRESS_VALIDATION_ISSUE' IN UNNEST(all_cluster_flags), 50, 0))
    ) AS fraud_score
  FROM ClusterStats
);"""
run_bq_query(sql_stage_4)

### OPTIONAL: Stage 5: LLM Judgement & Geospatial Grounding (Vertex AI)

**Goal:** Use a Vertex AI model with Google Maps grounding to perform a final, nuanced review of ambiguous, high-risk clusters.

**Action:** Generate a table `llm_grounding_review_batch` where each row is a detailed prompt for a cluster that did not have a blatant geo-mismatch but is still highly suspicious.

In [None]:
sql_stage_5 = '''
CREATE OR REPLACE TABLE `project_id.dataset`.llm_grounding_review_batch AS (
  WITH ClusterDetails AS (
    -- Collect all necessary details for the prompt
    SELECT
      s.cluster_id,
      ANY_VALUE(s.latitude) as latitude, ANY_VALUE(s.longitude) as longitude,
      ANY_VALUE(s.city) as city, ANY_VALUE(s.province_code) as province_code,
      STRING_AGG( DISTINCT FORMAT('"%s"', s.original_address1), ', ' LIMIT 5) as sample_addresses,
      STRING_AGG( DISTINCT FORMAT('"%s"', s.original_address2), ', ' LIMIT 5) as sample_units,
      STRING_AGG( FORMAT('  - Name: %s %s, Address: "%s %s"', s.first_name, s.last_name, s.original_address1, s.original_address2), '\\n' ORDER BY s.created_at LIMIT 10 ) AS order_details
    FROM `project_id.dataset`.suspicious_clusters_stage2 s
    GROUP BY s.cluster_id
  )
  SELECT
    cs.cluster_id,
    cs.fraud_score,
    -- This prompt is engineered to force the LLM to use its grounding tools for verification.
    FORMAT(""" You are a senior fraud analyst. You MUST use your integrated Google Maps tool to verify real-world geographic information to make your final assessment.

    **Cluster ID:** %t
    **Calculated Fraud Score:** %d
    **Cluster Fingerprint (for replay detection):** %t

    **Cluster-Wide Statistics:**
    - Total Orders: %d
    - Distinct Shipping Units/Apts: %d
    - Distinct Customer Names Used: %d
    - Detected Patterns: %s

    **Information for Grounding:**
    - Coordinates: (%f, %f)
    - Stated Location: %s, %s
    - Sample Addresses Used: %s
    - Sample Units/Apts Used: %s

    **TASK 1: Geospatial Grounding & Verification**
    Use your Google Maps tool to answer the following:
    1.  **Location Type:** What type of building or location is at these coordinates? (e.g., 'Large Apartment Building', 'Single-Family Home', 'Commercial Mail Forwarder', 'Office Building', 'Vacant Lot').
    2.  **Plausibility Check:** Based on the location type, is it plausible for it to receive %d separate packages to %d different unit numbers? For example, is it reasonable for a single-family home to have 50 different 'Apt' numbers?

    **TASK 2: Final Assessment (Respond in JSON format only)**
    Based on your verified findings and the cluster data, provide your final judgment in the following JSON structure:
    {
      "location_type_from_maps": "[Your result from Task 1.1]",
      "plausibility_verdict": "['PLAUSIBLE' or 'IMPLAUSIBLE']",
      "final_verdict": "['FRAUD' or 'NOT_FRAUD']",
      "confidence_score": [A score from 1 (Low Confidence) to 5 (High Confidence)],
      "justification": "Briefly explain your reasoning, combining the on-site evidence with the data patterns. Example: 'Verdict is FRAUD. Maps shows a small single-family home at the coordinates, making the 75 distinct unit numbers used in the orders physically impossible. This indicates a deliberate address obfuscation scheme.'"
    }

    **Sample Orders for Context:**
    %s
    """,
      cs.cluster_id,
      cs.fraud_score,
      cs.cluster_fingerprint,
      cs.total_orders,
      cs.distinct_units,
      cs.distinct_names,
      ARRAY_TO_STRING(cs.all_cluster_flags, ', '),
      cd.latitude, cd.longitude,
      cd.city, cd.province_code,
      cd.sample_addresses,
      cd.sample_units,
      cs.total_orders,
      cs.distinct_units,
      cd.order_details
    ) AS llm_grounding_prompt
  FROM `project_id.dataset`.final_scored_clusters_stage4 cs
  JOIN ClusterDetails cd ON cs.cluster_id = cd.cluster_id
  -- Select clusters that are suspicious but didn't have a blatant geo-mismatch from Stage 3b.
  WHERE
    cs.fraud_score BETWEEN 40 AND 99
  ORDER BY cs.fraud_score DESC
  LIMIT 1000
);
'''
run_bq_query(sql_stage_5)

Executing query...
Successfully finished job: 421d4f04-09cc-42a6-8f17-34217e1b090e


### Conclusion and Final Output

The playbook has been fully executed. The final results are stored in the `final_scored_clusters_stage4` table.

You can now query this table to review the highest-scoring fraud clusters.

In [None]:
final_results_df = bq_client.query(f"SELECT * FROM {PROJECT_ID}.{DATASET_ID}.final_scored_clusters_stage4 ORDER BY fraud_score DESC LIMIT 20").to_dataframe()

display(final_results_df)

Unnamed: 0,cluster_id,total_orders,distinct_units,distinct_names,top_sku_concentration,quantity_stddev,has_formatting_anomalies,all_cluster_flags,cluster_fingerprint,fraud_score
0,5,76,1,76,0.009009,0.834618,True,"[FUZZY_ADDRESS_MATCH, ADDRESS_VALIDATION_ISSUE...",-1957696040669920617,250
1,15,29,15,15,0.269231,1.345476,True,"[FUZZY_ADDRESS_MATCH, ADDRESS_VALIDATION_ISSUE...",-2426288627388268182,210
2,19,60,57,60,0.183908,0.0,True,"[SEQUENTIAL_ADDRESS, ADDRESS_VALIDATION_ISSUE,...",-4826314633709669189,200
3,13,26,26,26,0.371429,0.0,True,"[ADDRESS_VALIDATION_ISSUE, HIGH_VELOCITY, SEQU...",2867206730150835883,200
4,10,40,39,40,0.347826,0.0,True,"[ADDRESS_VALIDATION_ISSUE, HIGH_VELOCITY, FUZZ...",2756045280499792354,165
5,18,30,29,30,0.349398,0.0,True,"[FUZZY_ADDRESS_MATCH, HIGH_VELOCITY, ADDRESS_V...",-1326618608499931165,165
6,25,40,39,39,0.353982,0.0,True,"[HIGH_VELOCITY, ADDRESS_VALIDATION_ISSUE, FUZZ...",2155117644859060255,165
7,17,35,34,35,0.347368,0.639849,True,"[ADDRESS_VALIDATION_ISSUE, HIGH_VELOCITY, FUZZ...",-7580286579208743975,155
8,22,27,26,27,0.337838,0.163269,True,"[FUZZY_ADDRESS_MATCH, HIGH_VELOCITY, ADDRESS_V...",-2352998642758766776,155
9,1,16,8,8,0.390244,0.0,True,"[ADDRESS_VALIDATION_ISSUE, HIGH_VELOCITY, FUZZ...",73927182204468805,120


###Summarize by tagging the original order records

Use suspect clusters and tie back to the original order_ids in the first round of triage

You can adjust risk threshold in the filter to expand scope of risk assesment

In [None]:
# NOTE:  Update SQL JOIN to use `project_id.dataset`.llm_grounding_review_batch from previous step if necessary

sql_stage_6 = """
-- This query retrieves all original order details for clusters identified as fraudulent.

SELECT
    -- Select all columns from the suspicious_clusters table, which contains the normalized order data plus the cluster_id.
    s.*,

    -- Also include the final score and the list of all flags for context.
    f.fraud_score,
    f.all_cluster_flags,
    f.cluster_fingerprint
FROM
    -- This table contains the individual orders that were grouped into suspicious clusters.
    `project_id.dataset.suspicious_clusters_stage2` AS s
JOIN
    -- This is the final summary table that contains the score for each cluster.
    `project_id.dataset.final_scored_clusters_stage4` AS f
    ON s.cluster_id = f.cluster_id
WHERE
    -- Set your threshold for what constitutes a fraudulent cluster.
    -- A score > 50 is a reasonable starting point for high-confidence fraud.
    f.fraud_score > 50
ORDER BY
    -- Order the results by cluster and then by time to see the sequence of events.
    f.cluster_id, s.created_at;
    """
run_bq_query(sql_stage_6)


Executing query...
Successfully finished job: 8cfdbcfa-6a20-44aa-be96-f8f2c7fc4058
