# =============================================================================
# interac TRANSACTIONS CDC FEED PROJECT - ENHANCED MOCK DATA GENERATOR
# =============================================================================
# This notebook generates realistic mock data for interac transactions with CDC operations
# Purpose: Creates test data for development, testing, and demonstration of CDC streaming
# Features: INSERT, UPDATE, DELETE operations with realistic interac transaction patterns
# Output: Continuous CDC operations to test real-time streaming pipeline


In [0]:
# =============================================================================
# LIBRARY IMPORTS AND CONFIGURATION
# =============================================================================
# Import required libraries for mock data generation and CDC operations
# Purpose: Set up environment for generating realistic interac transaction test data

import random
import uuid
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable
import time

# =============================================================================
# CONFIGURATION SETUP
# =============================================================================
# Configure Unity Catalog and target table for mock data generation
# catalog_name: Unity Catalog for data governance
# schema_name: Target schema for interac transaction tables
# raw_table: Target table for inserting mock transaction data

catalog_name = "`interac_transaction_cdc`"
schema_name = "default"
raw_table = f"{catalog_name}.{schema_name}.raw_interac_transactions_v1"

print(f"Using catalog: {catalog_name}, schema: {schema_name}")
print(f"Target table: {raw_table}")


Using catalog: `gds_de_bootcamp_new`, schema: default
Target table: `gds_de_bootcamp_new`.default.raw_upi_transactions_v1


In [0]:
# Mock data constants
MERCHANTS = [
    {"merchant_id": "M001", "merchant_name": "Amazon Canada", "merchant_category": "E-commerce"},
    {"merchant_id": "M002", "merchant_name": "Uber Eats / SkipTheDishes", "merchant_category": "Food Delivery"},
    {"merchant_id": "M003", "merchant_name": "Lyft / Uber", "merchant_category": "Transportation"},
    {"merchant_id": "M004", "merchant_name": "Netflix / Disney+", "merchant_category": "Entertainment"},
    {"merchant_id": "M005", "merchant_name": "Loblaws / Metro", "merchant_category": "Grocery"},
    {"merchant_id": "M006", "merchant_name": "Canadian Tire", "merchant_category": "Retail"},
    {"merchant_id": "M007", "merchant_name": "Tim Hortons", "merchant_category": "Food & Beverage"},
    {"merchant_id": "M008", "merchant_name": "Indigo / Chapters", "merchant_category": "Retail"}
]

# Changed from UPI IDs to common Interac e-Transfer email formats
INTERAC_EMAILS = [
    "alex.smith@gmail.com", 
    "j.doe_toronto@outlook.com", 
    "vancouver_dev@shaw.ca", 
    "brampton.user@rogers.com", 
    "finance_pro@bell.net"
]

CUSTOMER_IDS = ["CUST_ON_001", "CUST_BC_002", "CUST_ON_003", "CUST_BC_004", "CUST_ON_005"]

# Updated to reflect Canadian payment landscape
PAYMENT_METHODS = ["Interac e-Transfer", "Credit Card (Visa/MC)", "Apple Pay", "Google Pay", "Presto Tap"]

DEVICE_TYPES = ["Mobile", "Tablet", "Desktop"]
OPERATING_SYSTEMS = ["Android", "iOS", "Windows", "macOS"]

# Localized to major hubs in Ontario and BC
CITIES = ["Toronto", "Brampton", "Mississauga", "Vancouver", "Victoria", "Surrey", "Ottawa", "Hamilton"]
PROVINCES = ["Ontario", "British Columbia"]

# Standard North American demographics
AGE_GROUPS = ["18-24", "25-34", "35-44", "45-54", "55-64", "65+"]
GENDERS = ["Male", "Female", "Non-binary", "Prefer not to say"]
# Transaction ID counter for unique IDs
transaction_counter = 1

def insert_new_transactions(num_transactions=5):
    """Insert new transactions using simple createDataFrame approach with Canadian parameters"""
    global transaction_counter
    try:
        print(f"INSERT: Adding {num_transactions} new Canadian transactions...")
        
        # Generate transaction data as tuples
        transaction_data = []
        
        for i in range(num_transactions):
            merchant = random.choice(MERCHANTS)
            interac_email = random.choice(INTERAC_EMAILS) # Updated to use your new list
            customer_id = random.choice(CUSTOMER_IDS)
            
            # Realistic transaction amounts in CAD
            if merchant["merchant_category"] == "E-commerce":
                amount = round(random.uniform(50, 2500), 2)
            elif merchant["merchant_category"] == "Food Delivery":
                amount = round(random.uniform(15, 120), 2)
            elif merchant["merchant_category"] == "Transportation":
                amount = round(random.uniform(10, 80), 2)
            elif merchant["merchant_category"] == "Entertainment":
                amount = round(random.uniform(20, 500), 2)
            elif merchant["merchant_category"] == "Grocery":
                amount = round(random.uniform(30, 450), 2)
            elif merchant["merchant_category"] == "Retail":
                amount = round(random.uniform(25, 1000), 2)
            else:
                amount = round(random.uniform(5, 500), 2)
            
            # Timestamp for CDC testing
            transaction_time = datetime.now() - timedelta(minutes=random.randint(1, 60))
            
            # Transaction status weights
            status_weights = {"completed": 0.82, "failed": 0.10, "initiated": 0.04, "refunded": 0.03, "cancelled": 0.01}
            status = random.choices(list(status_weights.keys()), weights=list(status_weights.values()))[0]
            
            # Processing fees and commission
            processing_fee = round(amount * 0.005, 2)
            commission = round(amount * 0.01, 2)
            
            # Unique ID with 'CAN' prefix
            transaction_id = f"TXN_CAN_{datetime.now().strftime('%Y%m%d')}_{transaction_counter:06d}"
            transaction_counter += 1
            
            # Localized Geography: Randomly pick between Ontario and BC ranges
            is_ontario = random.choice([True, False])
            if is_ontario:
                lat = round(random.uniform(43.6, 43.9), 6) # GTA
                lon = round(random.uniform(-79.9, -79.3), 6)
                province = "Ontario"
            else:
                lat = round(random.uniform(48.4, 49.3), 6) # BC
                lon = round(random.uniform(-123.4, -122.7), 6)
                province = "British Columbia"
            
            transaction_tuple = (
                transaction_id,
                interac_email,      # interac_id field
                merchant["merchant_id"],
                merchant["merchant_name"],
                merchant["merchant_category"],
                float(amount),
                "CAD",
                transaction_time,
                status,
                random.choice(PAYMENT_METHODS),
                random.choice(DEVICE_TYPES),
                random.choice(OPERATING_SYSTEMS),
                f"v{random.randint(1, 5)}.{random.randint(0, 9)}.{random.randint(0, 9)}",
                lat,
                lon,
                random.choice(CITIES),
                province,           # Using the province variable determined above
                "Canada",
                customer_id,
                random.choice(AGE_GROUPS),
                random.choice(GENDERS),
                float(processing_fee),
                float(commission),
                datetime.now(),
                datetime.now()
            )
            
            transaction_data.append(transaction_tuple)
        
        # Create DataFrame
        transaction_df = spark.createDataFrame(transaction_data, [
            "transaction_id", "interac_id", "merchant_id", "merchant_name", "merchant_category",
            "transaction_amount", "transaction_currency", "transaction_timestamp", "transaction_status",
            "payment_method", "device_type", "device_os", "app_version", "latitude", "longitude",
            "city", "state", "country", "customer_id", "age_group", "gender",
            "processing_fee", "commission", "created_at", "updated_at"
        ])
        
        transaction_df.write.format("delta").mode("append").saveAsTable(raw_table)
        
        print(f"INSERT: Successfully added {num_transactions} Canadian transactions to {raw_table}")
        return True
        
    except Exception as e:
        print(f"INSERT failed: {str(e)}")
        return False

def update_existing_transactions(num_updates=3):
    """Update existing transactions using merge pattern with correct column names"""
    try:
        print(f"UPDATE: Updating {num_updates} existing transactions...")
        
        # Get existing transactions to update - FIXED COLUMN NAMES
        existing_transactions = spark.sql(f"""
            SELECT transaction_id, interac_id, merchant_id, merchant_name, merchant_category,
                   transaction_amount, transaction_currency, transaction_timestamp,
                   transaction_status, payment_method, device_type, device_os, app_version,
                   latitude, longitude, city, state, country, customer_id, age_group, gender,
                   processing_fee, commission, created_at
            FROM {raw_table}
            ORDER BY created_at DESC
            LIMIT {num_updates}
        """).collect()
        
        if len(existing_transactions) == 0:
            print("No existing transactions to update")
            return False
        
        # Prepare update data as tuples (following your pattern)
        update_data = []
        
        for row in existing_transactions:
            # Update the transaction status and amount
            new_status = random.choice(["completed", "failed", "refunded"])
            new_amount = round(row["transaction_amount"] * random.uniform(0.8, 1.2), 2)
            new_processing_fee = round(new_amount * 0.005, 2)
            new_commission = round(new_amount * 0.01, 2)
            
            # Create tuple for update (following your pattern) - FIXED COLUMN NAMES
            update_tuple = (
                row["transaction_id"],
                row["interac_id"],
                row["merchant_id"],
                row["merchant_name"],
                row["merchant_category"],
                float(new_amount),  # Use float/double as in your working example
                row["transaction_currency"],
                row["transaction_timestamp"],
                new_status,
                row["payment_method"],
                row["device_type"],  # FIXED: was device_info
                row["device_os"],    # FIXED: was device_info
                row["app_version"],  # FIXED: was device_info
                row["latitude"],     # FIXED: was location_info
                row["longitude"],    # FIXED: was location_info
                row["city"],         # FIXED: was location_info
                row["state"],        # FIXED: was location_info
                row["country"],      # FIXED: was location_info
                row["customer_id"],  # FIXED: was customer_info
                row["age_group"],    # FIXED: was customer_info
                row["gender"],       # FIXED: was customer_info
                float(new_processing_fee),  # Use float/double as in your working example
                float(new_commission),  # Use float/double as in your working example
                row["created_at"],
                datetime.now()  # Update timestamp
            )
            
            update_data.append(update_tuple)
        
        # Create DataFrame for updates (following your pattern) - FIXED COLUMN NAMES
        update_df = spark.createDataFrame(update_data, [
            "transaction_id", "interac_id", "merchant_id", "merchant_name", "merchant_category",
            "transaction_amount", "transaction_currency", "transaction_timestamp", "transaction_status",
            "payment_method", "device_type", "device_os", "app_version", "latitude", "longitude",
            "city", "state", "country", "customer_id", "age_group", "gender",
            "processing_fee", "commission", "created_at", "updated_at"
        ])
        
        # Use merge pattern (following your approach)
        delta_table = DeltaTable.forName(spark, raw_table)
        delta_table.alias("target").merge(
            update_df.alias("source"),
            "target.transaction_id = source.transaction_id"
        ).whenMatchedUpdateAll().execute()
        
        print(f"UPDATE: Successfully updated {len(existing_transactions)} transactions")
        return True
        
    except Exception as e:
        print(f"UPDATE failed: {str(e)}")
        return False

def delete_transactions(num_deletes=2):
    """Delete some transactions"""
    try:
        print(f"DELETE: Deleting {num_deletes} transactions...")
        
        # Get transactions to delete
        transactions_to_delete = spark.sql(f"""
            SELECT transaction_id
            FROM {raw_table}
            ORDER BY created_at DESC
            LIMIT {num_deletes}
        """).collect()
        
        if len(transactions_to_delete) == 0:
            print("No transactions to delete")
            return False
        
        # Delete transactions using DeltaTable
        target_table = DeltaTable.forName(spark, raw_table)
        
        transaction_ids = [row["transaction_id"] for row in transactions_to_delete]
        
        for transaction_id in transaction_ids:
            target_table.delete(f"transaction_id = '{transaction_id}'")
        
        print(f"DELETE: Successfully deleted {len(transaction_ids)} transactions")
        return True
        
    except Exception as e:
        print(f"DELETE failed: {str(e)}")
        return False

print("UPDATE and DELETE functions defined with fixed column names")
print("Transaction data generation functions defined")


UPDATE and DELETE functions defined with fixed column names
Transaction data generation functions defined


In [0]:
def continuous_cdc_data_generation(duration_minutes=20):
    """Generate data continuously with CDC operations every 2 minutes"""
    global transaction_counter
    
    try:
        print(f"Starting continuous CDC data generation for {duration_minutes} minutes")
        print("Operations will run every 2 minutes")
        print("Operations: INSERT (70%), UPDATE (20%), DELETE (10%)")
        
        end_time = datetime.now() + timedelta(minutes=duration_minutes)
        batch_count = 0
        
        while datetime.now() < end_time:
            batch_count += 1
            print(f"Batch {batch_count} - {datetime.now().strftime('%H:%M:%S')}")
            
            # Determine operation type based on weights
            operation_weights = {"INSERT": 0.7, "UPDATE": 0.2, "DELETE": 0.1}
            operation = random.choices(list(operation_weights.keys()), weights=list(operation_weights.values()))[0]
            
            if operation == "INSERT":
                success = insert_new_transactions(random.randint(3, 8))
            elif operation == "UPDATE":
                success = update_existing_transactions(random.randint(1, 4))
            elif operation == "DELETE":
                success = delete_transactions(random.randint(1, 2))
            
            if success:
                print(f"Batch {batch_count} completed successfully")
            else:
                print(f"Batch {batch_count} had issues")
            
            # Wait for next batch (2 minutes)
            if datetime.now() < end_time:
                print(f"Waiting 2 minutes for next batch...")
                time.sleep(120)  # 2 minutes
        
        print(f"Continuous CDC data generation completed!")
        print(f"Total batches processed: {batch_count}")
        
    except KeyboardInterrupt:
        print("Continuous data generation stopped by user")
    except Exception as e:
        print(f"Error in continuous data generation: {str(e)}")
        raise

print("Continuous CDC data generation function defined")


Continuous CDC data generation function defined


In [0]:
# Set up initial data
print("Setting up initial data...")
success = insert_new_transactions(20)
if success:
    print("Initial data setup completed successfully!")
    print("You can now run: continuous_cdc_data_generation(duration_minutes=30)")
    continuous_cdc_data_generation(duration_minutes=20)
else:
    print("Initial data setup failed")


Setting up initial data...
INSERT: Adding 20 new transactions...
INSERT: Successfully added 20 transactions
Initial data setup completed successfully!
You can now run: continuous_cdc_data_generation(duration_minutes=30)
Starting continuous CDC data generation for 20 minutes
Operations will run every 2 minutes
Operations: INSERT (70%), UPDATE (20%), DELETE (10%)
Batch 1 - 19:54:08
DELETE: Deleting 1 transactions...
DELETE: Successfully deleted 1 transactions
Batch 1 completed successfully
Waiting 2 minutes for next batch...
Batch 2 - 19:56:14
UPDATE: Updating 1 existing transactions...
UPDATE: Successfully updated 1 transactions
Batch 2 completed successfully
Waiting 2 minutes for next batch...
Batch 3 - 19:58:26
INSERT: Adding 8 new transactions...
INSERT: Successfully added 8 transactions
Batch 3 completed successfully
Waiting 2 minutes for next batch...
Batch 4 - 20:00:31
DELETE: Deleting 1 transactions...
DELETE: Successfully deleted 1 transactions
Batch 4 completed successfully
Wai

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
# Uncomment this if manually we want to run upate

# update_existing_transactions(3)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:473)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:750)
	at com.data

In [0]:
# Uncomment this if manually we want to run delete

# delete_transactions(2)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:134)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:473)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:750)
	at com.data