In [0]:
def load_csv_to_spark(path: str):
    """Load CSV file into Spark DataFrame."""
    try:
        df = spark.read.csv(path, header=True, inferSchema=True)
        print(f"✅ Loaded CSV into Spark: {path}")
        return df
    except Exception as e:
        print(f"❌ Failed to load CSV {path}: {e}")
        return None

# === Define files ===
files_to_download = {
    "transactions": {
        "file_id": "1AGXVlDhbMbhoGXDJG0IThnqz86Qy3hqb",
        "path": "/Volumes/workspace/default/vol/transactions.csv"
    },
    "cust_imp": {
        "file_id": "1abe9EkM_uf2F2hjEkbhMBG9Mf2dFE4Wo",
        "path": "/Volumes/workspace/default/vol/CustomerImportance.csv"
    }
}

# === Download and Load ===
dataframes = {}
for name, info in files_to_download.items():
    dataframes[name] = load_csv_to_spark(info["path"])

transactions = dataframes["transactions"]
cust_imp = dataframes["cust_imp"]

transactions.show(3)
cust_imp.show(3)

✅ Loaded CSV into Spark: /Volumes/workspace/default/vol/transactions.csv
✅ Loaded CSV into Spark: /Volumes/workspace/default/vol/CustomerImportance.csv
+----+-------------+---+------+----------+-------------+-----------+-------------------+------+-----+
|step|     customer|age|gender|zipcodeOri|     merchant|zipMerchant|           category|amount|fraud|
+----+-------------+---+------+----------+-------------+-----------+-------------------+------+-----+
|   0|'C1093826151'|'4'|   'M'|   '28007'| 'M348934600'|    '28007'|'es_transportation'|  4.55|    0|
|   0| 'C352968107'|'2'|   'M'|   '28007'| 'M348934600'|    '28007'|'es_transportation'| 39.68|    0|
|   0|'C2054744914'|'4'|   'F'|   '28007'|'M1823072687'|    '28007'|'es_transportation'| 26.89|    0|
+----+-------------+---+------+----------+-------------+-----------+-------------------+------+-----+
only showing top 3 rows
+-------------+-------------+------+-------------------+-----+
|       Source|       Target|Weight|          t

In [0]:
from pyspark.sql import functions as F, Window
from datetime import datetime
import pytz

ist_time = F.from_utc_timestamp(F.current_timestamp(), "Asia/Kolkata")

def get_current_ist_time():
    return datetime.now(pytz.timezone('Asia/Kolkata')).strftime('%Y-%m-%d %H:%M:%S')

# ---------------------
#  Pattern 1: UPGRADE
# ---------------------
def detect_pat1_upgrade_customers(transactions_df, cust_imp_df, y_start_time, detection_time):
    transactions_clean = transactions.select(
        F.col("customer").alias("customerId"),
        F.col("merchant").alias("merchantId"),
        F.col("category").alias("transactionType")
    )

    cust_imp_clean = cust_imp.select(
        F.col("Source").alias("customerId"),
        F.col("Target").alias("merchantId"),
        F.col("Weight").cast("double").alias("weight"),
        F.col("typeTrans").alias("transactionType")
    )

    # Step 1:
    # Total transactions per merchant
    merchant_txn_counts = transactions_clean.groupBy("merchantId").agg(
        F.count("*").alias("total_txns")
    )

    # filter merchants with ≥ 50k transactions
    eligible_merchants = merchant_txn_counts.filter("total_txns >= 50000")

    # Join to transactions to keep only eligible merchants
    filtered_txns = transactions_clean.join(
        eligible_merchants.select("merchantId"), on="merchantId", how="inner"
    )

    # Step 2:
    # Count of transactions per customer per merchant
    cust_txn_counts = filtered_txns.groupBy("merchantId", "customerId").agg(
        F.count("*").alias("txn_count")
    )

    # Window for percentile calculation per merchant
    txn_window = Window.partitionBy("merchantId").orderBy(F.desc("txn_count"))

    # Add raw percentile rank
    cust_txn_counts = cust_txn_counts.withColumn(
        "txn_percentile_rank_raw",
        F.percent_rank().over(txn_window)
    )

    # Round to 4 decimal places to remove scientific notation
    cust_txn_counts = cust_txn_counts.withColumn(
        "txn_percentile_rank", F.round("txn_percentile_rank_raw", 4)
    ).drop("txn_percentile_rank_raw")

    # Top 10% customers by txn count
    top_txn_customers = cust_txn_counts.filter(F.col("txn_percentile_rank") <= 0.1)
    # Step 3:
    # Join with cust_imp to get average weight
    joined = top_txn_customers.join(
        cust_imp_clean,
        on=["merchantId", "customerId"],
        how="inner"
    )

    avg_weight_df = joined.groupBy("merchantId", "customerId").agg(
        F.avg("weight").alias("avg_weight")
    )

    # Window to rank avg_weight per merchant
    weight_window = Window.partitionBy("merchantId").orderBy("avg_weight")

    avg_weight_df = avg_weight_df.withColumn(
        "weight_percentile_rank",
        F.percent_rank().over(weight_window)
    )

    # Bottom 10% by weight
    final_pat1 = avg_weight_df.filter("weight_percentile_rank <= 0.1")
    result_pat1 = final_pat1.withColumn("patternId", F.lit("PatId1")) \
        .withColumn("actionType", F.lit("UPGRADE")) \
        .withColumn("YStartTime", ist_time) \
        .withColumn("detectionTime", ist_time) \
        .withColumn("customerName", F.col("customerId")) \
        .select(
            "YStartTime", "detectionTime", "patternId", "actionType",
            "customerName", "merchantId"
        )

    return result_pat1




# ---------------------
#  Pattern 2: CHILD
# ---------------------
def detect_pat2_child_customers(transactions_df, y_start_time, detection_time):
    pat2_df = transactions_df.groupBy("customer", "merchant") \
                .agg(
                    F.avg("amount").alias("avg_amount"),
                    F.count("*").alias("txn_count")
                ).filter("avg_amount < 23 AND txn_count >= 80")

    result_df = pat2_df.selectExpr(
        f"'{y_start_time}' as YStartTime",
        f"'{detection_time}' as detectionTime",
        "'PatId2' as patternId",
        "'CHILD' as actionType",
        "customer as customerName",
        "merchant as merchantId"
    )

    return result_df

# ---------------------
# Pattern 3: DEI-NEEDED
# ---------------------
def detect_pat3_dei_needed_merchants(transactions_df, y_start_time, detection_time):
    gender_df = transactions.select(
        F.expr("substring(merchant, 2, length(merchant) - 2)").alias("merchantId"),  # remove quotes
        F.expr("substring(customer, 2, length(customer) - 2)").alias("customerId"),  # remove quotes
        F.expr("substring(gender, 2, 1)").alias("gender")  # 'F' → F, 'M' → M
    )

    # Count distinct genders per customer per merchant
    gender_clean_check = gender_df.groupBy("merchantId", "customerId") \
        .agg(F.countDistinct("gender").alias("gender_type_count"))

    # Keep only clean records (1 gender only per customer per merchant)
    clean_customers = gender_clean_check.filter("gender_type_count = 1") \
        .select("merchantId", "customerId")

    # Join back to filter ambiguous entries
    gender_cleaned = gender_df.join(clean_customers, on=["merchantId", "customerId"], how="inner")
    # Remove duplicate customer-merchant-gender combos
    unique_pairs = gender_cleaned.select("merchantId", "customerId", "gender").distinct()

    # Pivot to get gender counts
    gender_counts = unique_pairs.groupBy("merchantId") \
        .pivot("gender", ["F", "M"]).count().fillna(0)
    dei_merchants = gender_counts.filter(
        (F.col("F") > 100) & (F.col("F") < F.col("M"))
    )
    pat3_df = dei_merchants \
        .withColumn("patternId", F.lit("PatId3")) \
        .withColumn("actionType", F.lit("DEI-NEEDED")) \
        .withColumn("YStartTime", F.lit(y_start_time)) \
        .withColumn("detectionTime", F.lit(detection_time)) \
        .withColumn("customerName", F.lit("")) \
        .select(
            "YStartTime", "detectionTime", "patternId", "actionType",
            "customerName", "merchantId"
        )

    return pat3_df


# --------------------------
#  Master detection runner
# --------------------------
def run_pattern_detections(transactions_df, cust_imp_df, include_pattern2=True):
    y_start_time = get_current_ist_time()
    detection_time = y_start_time

    pat1_df = detect_pat1_upgrade_customers(transactions_df, cust_imp_df, y_start_time, detection_time)
    pat3_df = detect_pat3_dei_needed_merchants(transactions_df, y_start_time, detection_time)

    if include_pattern2:
        pat2_df = detect_pat2_child_customers(transactions_df, y_start_time, detection_time)
        return pat1_df.unionByName(pat2_df).unionByName(pat3_df)
    else:
        return pat1_df.unionByName(pat3_df)



In [0]:
import boto3
import pandas as pd
import time
from io import StringIO

main_df = pd.DataFrame()         # Global detection store
full_txn_df = pd.DataFrame()     # To collect all chunk data for CHILD pattern
detection_buffer_df = pd.DataFrame()  # Buffer for batched detection writes    # File naming counter

bucket = "pattern-detection-kanakb"
chunk_prefix = "transaction_chunks"
detection_prefix = 'output_detections'  


s3 = boto3.client("s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key)

chunk_size = 10000

def read_csv_from_s3(bucket, key):
    """Read CSV file from S3 and return as pandas DataFrame."""
    obj = s3.get_object(Bucket=bucket, Key=key)
    return pd.read_csv(StringIO(obj['Body'].read().decode('utf-8')))


def write_csv_to_s3(df, bucket, key):
    """Write a DataFrame as CSV to S3."""
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue())
    print(f"📤 Written to s3://{bucket}/{key}")


In [0]:
import pandas as pd
import time
chunk_size = 10000
from threading import Lock


chunk_index = 1
detection_file_counter = 1   
chunk_index_lock = Lock()


def producer():
    print("🚀 Stream Producer started...")

    global chunk_index
    full_df = transactions.toPandas()

    for i in range(0, len(full_df), chunk_size):
        chunk = full_df[i:i+chunk_size]

        with chunk_index_lock:
            current_index = chunk_index
            chunk_index += 1

        filename = f"/tmp/{current_index}.csv"

        # Save locally
        chunk.to_csv(filename, index=False)

        # Upload to S3
        s3_key = f"{chunk_prefix}/chunk_{current_index}.csv"
        s3.upload_file(filename, bucket, s3_key)
        print(f"✅ Uploaded: s3://{bucket}/{s3_key}")

        time.sleep(1)

In [0]:
producer()


🚀 Stream Producer started...
✅ Uploaded: s3://pattern-detection-kanakb/transaction_chunks/chunk_1.csv
✅ Uploaded: s3://pattern-detection-kanakb/transaction_chunks/chunk_2.csv


In [0]:


def consumer():
    global main_df, full_txn_df, detection_buffer_df, detection_file_counter
    print("👂 Consumer started...")

    expected_chunk = 1
    total_rows = 0

    while expected_chunk < 61:
        s3_key = f"{chunk_prefix}/chunk_{expected_chunk}.csv"

        try:
            print(f"🔍 Looking for: s3://{bucket}/{s3_key}")
            time.sleep(1)
            chunk_df = read_csv_from_s3(bucket, s3_key)
            print(f"📥 Processed: chunk_{expected_chunk}.csv, rows = {len(chunk_df)}")

            total_rows += len(chunk_df)
            full_txn_df = pd.concat([full_txn_df, chunk_df], ignore_index=True)

            spark_df = spark.createDataFrame(chunk_df)

            # Run detection (excluding CHILD)
            detection_df = run_pattern_detections(spark_df, cust_imp, include_pattern2=False)
            detection_pd = detection_df.toPandas()
            print(f"🔎 Detected rows: {len(detection_pd)}")

            # Remove duplicates vs main_df and buffer
            combined_existing = pd.concat([main_df, detection_buffer_df], ignore_index=True)
            new_unique = pd.concat([combined_existing, detection_pd], ignore_index=True)
            new_unique = new_unique.drop_duplicates(
                subset=["patternId", "actionType", "customerName", "merchantId"],
                keep=False
            )

            # Add new unique detections to buffer
            detection_buffer_df = pd.concat([detection_buffer_df, new_unique], ignore_index=True)

            # Write detection files in batches of 50
            while len(detection_buffer_df) >= 50:
                write_df = detection_buffer_df.iloc[:50]
                s3_output_key = f"{detection_prefix}/detection_{detection_file_counter}.csv"
                write_csv_to_s3(write_df, bucket, s3_output_key)

                print(f"📤 Detection file written: {s3_output_key}")

                detection_file_counter += 1
                detection_buffer_df = detection_buffer_df.iloc[50:]  # remove written rows

            # Merge into main_df
            main_df = pd.concat([main_df, new_unique], ignore_index=True)
            main_df = main_df.drop_duplicates(
                subset=["patternId", "actionType", "customerName", "merchantId"],
                keep="first"
            )

            print(f"📊 Detections so far: {len(main_df)}")
            expected_chunk += 1

        except FileNotFoundError:
            time.sleep(1)
        except Exception as e:
            print(f"❌ Error processing {s3_key}: {e}")

    # Run CHILD pattern after all chunks are processed
    print(f"📦 Running CHILD detection on full {len(full_txn_df)} rows...")
    full_spark_df = spark.createDataFrame(full_txn_df)
    y_start_time = get_current_ist_time()
    detection_time = y_start_time
    pat2_df = detect_pat2_child_customers(full_spark_df, y_start_time, detection_time)
    pat2_pd = pat2_df.toPandas()
    print(f"Found CHILD Detections: {len(pat2_pd)}")

    detection_buffer_df = pd.concat([detection_buffer_df, pat2_pd], ignore_index=True)

    # Write remaining detections in batches of 50
    while len(detection_buffer_df) >= 50:
        write_df = detection_buffer_df.iloc[:50]
        s3_output_key = f"{detection_prefix}/detection_{detection_file_counter}.csv"
        write_csv_to_s3(write_df, bucket, s3_output_key)

        print(f"📤 Detection file written: {s3_output_key}")

        detection_file_counter += 1
        detection_buffer_df = detection_buffer_df.iloc[50:]

    # Write any remaining detections (even if <50) to a final file
    if not detection_buffer_df.empty:

        s3_output_key = f"{detection_prefix}/detection_{detection_file_counter}.csv"
        write_csv_to_s3(detection_buffer_df, bucket, s3_output_key)

        print(f"📤 Final detection file (less than 50 rows): {s3_output_key}")
        detection_file_counter += 1




In [0]:
from threading import Thread

producer_thread = Thread(target=producer)
consumer_thread = Thread(target=consumer)

# Start threads
producer_thread.start()
consumer_thread.start()

# Wait for both to finish
producer_thread.join()
consumer_thread.join()

🚀 Stream Producer started...
👂 Consumer started...
🔍 Looking for: s3://pattern-detection-kanakb/transaction_chunks/chunk_1.csv
❌ Error processing transaction_chunks/chunk_1.csv: An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist.
🔍 Looking for: s3://pattern-detection-kanakb/transaction_chunks/chunk_1.csv
❌ Error processing transaction_chunks/chunk_1.csv: An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist.
🔍 Looking for: s3://pattern-detection-kanakb/transaction_chunks/chunk_1.csv
✅ Uploaded: s3://pattern-detection-kanakb/transaction_chunks/chunk_1.csv
📥 Processed: chunk_1.csv, rows = 10000
✅ Uploaded: s3://pattern-detection-kanakb/transaction_chunks/chunk_2.csv
🔎 Detected rows: 81
📤 Written to s3://pattern-detection-kanakb/output_detections/detection_1.csv
📤 Detection file written: output_detections/detection_1.csv
📊 Detections so far: 81
🔍 Looking for: s3://pattern-detection-kanakb/tran