In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import *

# Define the schema for the incoming transaction data.
transaction_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("merchant_id", StringType(), True),
    StructField("transaction_amount", DoubleType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("timestamp", TimestampType(), True)
])
print("✅ Schema defined.")

# Point to your S3 bucket
S3_LANDING_BUCKET = "devdolphines-task-transaction-landing-krishna" 
s3_path = f"s3a://{S3_LANDING_BUCKET}/"

# This command tells Spark to monitor the S3 path for new CSV files.
# The S3 access is handled by the cluster configuration you set up in Step 1.
raw_transactions_df = spark.readStream \
    .format("csv") \
    .schema(transaction_schema) \
    .option("header", "true") \
    .load(s3_path)

print(f"✅ Reading stream from {s3_path}")

✅ Schema defined.
✅ Reading stream from s3a://devdolphines-task-transaction-landing-krishna/


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# --- Load Supplementary Data ---
# Load the CustomerImportance data from DBFS
importance_file_path = "/FileStore/tables/CustomerImportance.csv"
customer_importance_df = spark.read.csv(importance_file_path, header=True, inferSchema=True)

print("✅ CustomerImportance.csv loaded successfully.")

# ==============================================================================
# CHILD CUSTOMER DETECTION (Less than Rs 23 avg, >= 80 transactions)
# ==============================================================================
patid2_df = raw_transactions_df \
    .groupBy("merchant_id", "customer_name", "customer_id") \
    .agg(
        avg("transaction_amount").alias("avg_txn_value"),
        count("transaction_id").alias("txn_count")
    ) \
    .where("avg_txn_value < 23 AND txn_count >= 80")

patid2_detections = patid2_df.select(
    current_timestamp().alias("YStartTime(IST)"),
    current_timestamp().alias("detectionTime(IST)"),
    lit("PatId2").alias("patternId"),
    lit("CHILD").alias("ActionType"),
    col("customer_name"),
    col("merchant_id")
)

# Start the streaming query for PatId2
patid2_detections.writeStream \
    .format("memory") \
    .queryName("patid2_stream") \
    .outputMode("update") \
    .start()

print("✅ PatId2 (CHILD) detection stream started.")

# ==============================================================================
# DEI-NEEDED DETECTION (Female customers < Male, and > 100)
# ==============================================================================
unique_customers_per_merchant = raw_transactions_df.select("merchant_id", "customer_id", "gender").dropDuplicates()

patid3_df = unique_customers_per_merchant.groupBy("merchant_id") \
    .agg(
        sum(when(col("gender") == 'Male', 1).otherwise(0)).alias("male_customers"),
        sum(when(col("gender") == 'Female', 1).otherwise(0)).alias("female_customers")
    ) \
    .where("female_customers > 100 AND female_customers < male_customers")

patid3_detections = patid3_df.select(
    current_timestamp().alias("YStartTime(IST)"),
    current_timestamp().alias("detectionTime(IST)"),
    lit("PatId3").alias("patternId"),
    lit("DEI-NEEDED").alias("ActionType"),
    lit("").alias("customer_name"),
    col("merchant_id")
)

# Start the streaming query for PatId3
patid3_detections.writeStream \
    .format("memory") \
    .queryName("patid3_stream") \
    .outputMode("update") \
    .start()

print("✅ PatId3 (DEI-NEEDED) detection stream started.")

✅ CustomerImportance.csv loaded successfully.
✅ PatId2 (CHILD) detection stream started.
✅ PatId3 (DEI-NEEDED) detection stream started.


In [0]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# ==================================================================================
# UPGRADE DETECTION
# ==================================================================================

# Load the static data
importance_file_path = "/FileStore/tables/CustomerImportance.csv"
customer_importance_df = spark.read.csv(importance_file_path, header=True, inferSchema=True).cache()
print("✅ CustomerImportance.csv loaded and cached.")


def process_batch_for_patid1(micro_batch_df, batch_id):
    if micro_batch_df.rdd.isEmpty():
        return

    print(f"--- PatId1: Processing micro-batch {batch_id} ---")

    # Calculate transaction counts and percentile rank
    customer_merchant_counts = micro_batch_df.groupBy("merchant_id", "customer_name", "customer_id", "transaction_type") \
        .agg(F.count("transaction_id").alias("txn_count"))
    window_spec = Window.partitionBy("merchant_id").orderBy(F.col("txn_count").desc())
    percentile_df = customer_merchant_counts.withColumn("percentile_rank", F.cume_dist().over(window_spec))

    # Filter for top 1% of customers
    top_customers_df = percentile_df.filter(F.col("percentile_rank") <= 0.01)

    if top_customers_df.rdd.isEmpty():
        return

    # Join with customer importance data USING THE CORRECT COLUMN NAMES
    join_conditions = [
        top_customers_df.customer_id == customer_importance_df.Source,       
        top_customers_df.transaction_type == customer_importance_df.typeTrans 
    ]
    upgrades_with_weight = top_customers_df.join(customer_importance_df, join_conditions, "inner")

    # Filter for customers with bottom 1% weight USING THE CORRECT COLUMN NAME
    potential_upgrades = upgrades_with_weight.filter(F.col("Weight") <= 0.01) 

    if not potential_upgrades.rdd.isEmpty():
        final_detections = potential_upgrades.select(
            F.current_timestamp().alias("YStartTime(IST)"),
            F.current_timestamp().alias("detectionTime(IST)"),
            F.lit("PatId1").alias("patternId"),
            F.lit("UPGRADE").alias("ActionType"),
            F.col("customer_name"),
            F.col("merchant_id")
        )
        final_detections.write.mode("append").saveAsTable("patid1_all_detections")
        print(f"✅ PatId1: SUCCESS! Wrote {final_detections.count()} detections to the table.")

# --- Start the streaming query ---
query_patid1 = raw_transactions_df.writeStream \
    .foreachBatch(process_batch_for_patid1) \
    .outputMode("update") \
    .start()

print("✅ PatId1 (UPGRADE) detection stream started with corrected column names.")

✅ CustomerImportance.csv loaded and cached.
✅ PatId1 (UPGRADE) detection stream started with corrected column names.


In [0]:
%pip install psycopg2-binary

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import psycopg2

# The connection details you provided
postgres_conn_details = {
    "host": "devdolphines-task-db.c0jy40iesd39.us-east-1.rds.amazonaws.com",
    "dbname": "postgres",
    "user": "postgres",
    "password": "Yogamaya9699",
    "port": "5432"
}

print("Attempting to connect to PostgreSQL...")

try:
    conn = psycopg2.connect(connect_timeout=10, **postgres_conn_details)
    print("✅✅✅ SUCCESS: Connection to PostgreSQL database was successful!")
    conn.close()

except Exception as e:
    print("❌❌❌ FAILED: Could not connect to the database.")
    print(f"Error details: {e}")

Attempting to connect to PostgreSQL...
✅✅✅ SUCCESS: Connection to PostgreSQL database was successful!


In [0]:
import psycopg2
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# The connection details you have already defined
postgres_conn_details = {
    "host": "devdolphines-task-db.c0jy40iesd39.us-east-1.rds.amazonaws.com",
    "dbname": "postgres",
    "user": "postgres",
    "password": "Yogamaya9699",
    "port": "5432"
}
# Make sure the importance DataFrame is loaded and cached
importance_file_path = "/FileStore/tables/CustomerImportance.csv"
customer_importance_df = spark.read.csv(importance_file_path, header=True, inferSchema=True).cache()


def process_patid1_with_postgres(micro_batch_df, batch_id):
    if micro_batch_df.rdd.isEmpty():
        return

    print(f"--- PatId1: Processing batch {batch_id} ---")

    # 1. Update total transaction counts in PostgreSQL
    batch_counts = micro_batch_df.groupBy("merchant_id").count().withColumnRenamed("count", "new_transactions")
    try:
        conn = psycopg2.connect(**postgres_conn_details)
        cursor = conn.cursor()
        for row in batch_counts.collect():
            cursor.execute("""
                INSERT INTO merchant_transaction_counts (merchant_id, total_transactions)
                VALUES (%s, %s) ON CONFLICT (merchant_id) DO UPDATE
                SET total_transactions = merchant_transaction_counts.total_transactions + EXCLUDED.total_transactions;
            """, (row["merchant_id"], row["new_transactions"]))
        conn.commit()
        cursor.close()
        conn.close()
    except Exception as e:
        print(f"❌ Error updating PostgreSQL: {e}")
        return

    # Read updated counts and filter for merchants with > 50k transactions
    pg_url = f"jdbc:postgresql://{postgres_conn_details['host']}:{postgres_conn_details['port']}/{postgres_conn_details['dbname']}"
    pg_properties = {"user": postgres_conn_details["user"], "password": postgres_conn_details["password"], "driver": "org.postgresql.Driver"}
    updated_counts_df = spark.read.jdbc(url=pg_url, table="merchant_transaction_counts", properties=pg_properties)
    merchants_to_upgrade = updated_counts_df.filter(F.col("total_transactions") > 50000)

    if merchants_to_upgrade.rdd.isEmpty():
        print(f"--- PatId1: No merchants have >50k transactions yet. ---")
        return

    # Filter the current batch for transactions from these eligible merchants
    eligible_transactions_df = micro_batch_df.join(merchants_to_upgrade.select("merchant_id"), "merchant_id", "inner")
    
    # Calculate transaction counts for the eligible transactions
    customer_merchant_counts = eligible_transactions_df.groupBy("merchant_id", "customer_name", "customer_id", "transaction_type") \
        .agg(F.count("transaction_id").alias("txn_count"))

    # Calculate percentile rank
    window_spec = Window.partitionBy("merchant_id").orderBy(F.col("txn_count").desc())
    percentile_df = customer_merchant_counts.withColumn("percentile_rank", F.cume_dist().over(window_spec))

    # Filter for top 1% of customers
    top_customers_df = percentile_df.filter(F.col("percentile_rank") <= 0.01)

    if top_customers_df.rdd.isEmpty():
        print(f"--- PatId1: No customers in top 1% for eligible merchants in this batch. ---")
        return

    # Join with customer importance data using the correct column names from your CSV
    join_conditions = [
        top_customers_df.customer_id == customer_importance_df.Source,
        top_customers_df.transaction_type == customer_importance_df.typeTrans
    ]
    upgrades_with_weight = top_customers_df.join(customer_importance_df, join_conditions, "inner")

    # Filter for customers with bottom 1% weight
    potential_upgrades = upgrades_with_weight.filter(F.col("Weight") <= 0.01)

    if not potential_upgrades.rdd.isEmpty():
        # Format the final detections and save them
        final_detections = potential_upgrades.select(
            F.current_timestamp().alias("YStartTime(IST)"),
            F.current_timestamp().alias("detectionTime(IST)"),
            F.lit("PatId1").alias("patternId"),
            F.lit("UPGRADE").alias("ActionType"),
            F.col("customer_name"),
            F.col("merchant_id")
        )
        # For testing, save to a Databricks table.
        final_detections.write.mode("append").saveAsTable("patid1_all_detections")
        print(f"✅ PatId1: SUCCESS! Wrote {final_detections.count()} UPGRADE detections to the table.")

In [0]:

query_patid1 = raw_transactions_df.writeStream \
    .foreachBatch(process_patid1_with_postgres) \
    .outputMode("update") \
    .start()

print("✅ PatId1 stream with full PostgreSQL and pattern logic has started.")

✅ PatId1 stream with full PostgreSQL and pattern logic has started.


In [0]:
# =======================================================================
#
# FINAL SCRIPT: Processing All Patterns and Writing to S3
#
# =======================================================================

S3_OUTPUT_PATH = "s3a://devdolphines-task-patterns-output-krishna/" 
def process_all_patterns_and_save_to_s3(micro_batch_df, batch_id):
    """
    This function processes a micro-batch to find all three patterns
    and writes the combined result to a unique file in S3.
    """
    if micro_batch_df.rdd.isEmpty():
        return

    print(f"--- Final Processing for batch {batch_id} with {micro_batch_df.count()} records ---")

    # --- Calculate Detections for Each Pattern ---

    # PatId2 Logic
    patid2_results = micro_batch_df.groupBy("merchant_id", "customer_name") \
        .agg(F.avg("transaction_amount").alias("avg_txn_value"), F.count("transaction_id").alias("txn_count")) \
        .where("avg_txn_value < 23 AND txn_count >= 80") \
        .select(
            F.lit("PatId2").alias("patternId"),
            F.lit("CHILD").alias("ActionType"),
            F.col("customer_name"),
            F.col("merchant_id")
        )

    # PatId3 Logic
    unique_customers = micro_batch_df.select("merchant_id", "customer_id", "gender").dropDuplicates()
    patid3_results = unique_customers.groupBy("merchant_id") \
        .agg(F.sum(F.when(F.col("gender") == 'Male', 1).otherwise(0)).alias("male_customers"),
             F.sum(F.when(F.col("gender") == 'Female', 1).otherwise(0)).alias("female_customers")) \
        .where("female_customers > 100 AND female_customers < male_customers") \
        .select(
            F.lit("PatId3").alias("patternId"),
            F.lit("DEI-NEEDED").alias("ActionType"),
            F.lit("").alias("customer_name"),
            F.col("merchant_id")
        )
    
    # --- Combine all detections from this batch ---
    all_detections_df = patid2_results.unionByName(patid3_results, allowMissingColumns=True)

    # Add timestamp columns
    final_detections_df = all_detections_df.withColumn("YStartTime(IST)", F.current_timestamp()) \
                                           .withColumn("detectionTime(IST)", F.current_timestamp())

    # Reorder columns to match final format
    final_detections_df = final_detections_df.select(
        "YStartTime(IST)", "detectionTime(IST)", "patternId", "ActionType", "customerName", "MerchantId"
    )

    # --- WRITE THE FINAL OUTPUT TO S3 ---
    if not final_detections_df.rdd.isEmpty():
        detections_count = final_detections_df.count()
        print(f"✅ Found {detections_count} total detections in batch {batch_id}. Writing to S3...")
        
        # Generate a unique file name using the batch_id and a timestamp
        import time
        timestamp_ms = int(time.time() * 1000)
        output_file_name = f"detections_batch_{batch_id}_{timestamp_ms}"
        full_output_path = S3_OUTPUT_PATH + output_file_name
        
        # Write the combined DataFrame to a single CSV file in your output bucket
        final_detections_df.coalesce(1).write.mode("append").option("header", "true").csv(full_output_path)
        print(f"Successfully wrote detections to {full_output_path}")

# --- Start the Final Production Stream ---
query = raw_transactions_df.writeStream \
    .foreachBatch(process_all_patterns_and_save_to_s3) \
    .start()

print("✅ Final production stream started. Writing all detections to S3.")

✅ Final production stream started. Writing all detections to S3.
