# Lab 3: Real-time Streaming Pipeline

**Objective:** Build a continuous data streaming pipeline that processes credit card transactions in real-time to detect fraudulent activity.

**What you'll build:** A fraud detection system that:
- Consumes live transaction data from Kafka
- Calculates transaction velocity (distance / time between transactions)
- Flags suspicious transactions in real-time
- Writes to dual destinations for monitoring and storage

**Key Concept:** Streaming flows process data continuously, unlike batch flows that run on schedule. This enables real-time decision making and immediate responses to events.

## Understanding Streaming vs. Batch

**Batch Processing (Labs 1-2):**
- Processes data in scheduled chunks
- Example: Nightly data warehouse loads
- Use when: Historical analysis, bulk transformations, periodic updates

**Streaming Processing (This Lab):**
- Processes data continuously as it arrives
- Example: Real-time fraud detection, live dashboards
- Use when: Immediate action required, event-driven architectures

**This lab demonstrates streaming with a fraud detection use case: detecting impossible travel patterns (transactions from locations too far apart given the time elapsed).**

## Part 1: Setup and Configuration

### Cell 1: (Optional) SDK Reinstall

**Skip this if you just completed Labs 1-2.** Only uncomment if starting a fresh session.

In [None]:
# Uncomment the lines below ONLY if you need to reinstall the SDK
# !pip uninstall ibm_watsonx_data_integration -y
# !pip install ibm_watsonx_data_integration --force-reinstall

### Cell 2: Import Required Libraries

In [None]:
from ibm_watsonx_data_integration import Platform
from ibm_watsonx_data_integration.common.auth import IAMAuthenticator

print("Libraries imported successfully")

### Cell 3: Set Your Credentials

**Use the same API key and Project ID from previous labs.**

In [None]:
API_KEY = "YOUR_IBM_CLOUD_API_KEY"
PROJECT_ID = "YOUR_PROJECT_ID"

print("Credentials set (not displayed for security)")

### Cell 4: Connect to Platform and Project

In [None]:
# Authenticate
auth = IAMAuthenticator(API_KEY)
print("Authenticator created")

# Connect to platform
platform = Platform(auth, base_api_url="https://api.ca-tor.dai.cloud.ibm.com")
print("Platform connection initialized")

# Get project
project = platform.projects.get(guid=PROJECT_ID)
print(f"Connected to project: {project.name}")
print(f"Project ID: {PROJECT_ID}")

### Cell 5: Define Flow Parameters

**Key Parameters:**
- **Flow Name:** Unique identifier for this streaming flow
- **Kafka Topic:** Source of credit card transaction data
- **Velocity Threshold:** Speed in m/s that flags suspicious activity (268.224 m/s ≈ speed of light)

In [None]:
# Streaming flow configuration
FLOW_NAME = "StreamingFraudDetection-SDK"
KAFKA_TOPIC = "credit_card_transactions"
KAFKA_BROKER = "kafka:9092"
VELOCITY_THRESHOLD = 268.224

print(f"Flow Name: {FLOW_NAME}")
print(f"Kafka Topic: {KAFKA_TOPIC}")
print(f"Kafka Broker: {KAFKA_BROKER}")
print(f"Velocity Threshold: {VELOCITY_THRESHOLD} m/s")

### Cell 6: (Optional) Delete Existing Flow

**Use this if you need to re-run the lab.** Uncomment to delete the flow before recreating.

In [None]:
# Uncomment the code below to delete existing flow before re-running

# try:
#     existing_flow = project.flows.get(name=FLOW_NAME)
#     project.delete_flow(existing_flow)
#     print(f"Deleted existing flow: {FLOW_NAME}")
# except Exception as e:
#     print(f"Flow '{FLOW_NAME}' not found or already deleted")

print("Delete operation complete (if uncommented)")

## Part 2: Build Streaming Flow

**Streaming Flow Architecture:**

```
Kafka Consumer
    ↓
JDBC Lookup (calculate velocity from previous transaction)
    ↓
Expression Evaluator (flag if velocity > threshold)
    ↓         ↓
Elasticsearch  SingleStore
(monitoring)   (storage)
```

### Cell 7: Create Streaming Flow with All Stages

In [None]:
print("Creating streaming flow...")

# Create flow container
flow = project.create_flow(name=FLOW_NAME, environment=None, description="Real-time fraud detection")
print(f"Flow created: {FLOW_NAME}")

# Stage 1: Kafka Multitopic Consumer
kafka_multitopic_consumer_1 = flow.add_stage("Kafka Multitopic Consumer")
kafka_multitopic_consumer_1.max_batch_size_in_records = 1
kafka_multitopic_consumer_1.field_path_to_regex_group_mapping = [{'fieldPath': '/', 'group': 1}]
kafka_multitopic_consumer_1.broker_uri = KAFKA_BROKER
kafka_multitopic_consumer_1.topic_list = [KAFKA_TOPIC]
kafka_multitopic_consumer_1.import_sheets = ['']
kafka_multitopic_consumer_1.data_format = 'JSON'
kafka_multitopic_consumer_1.schema_registry_urls = ['']
print("  Stage 1: Kafka Consumer configured")

# Stage 2: JDBC Lookup (calculate velocity)
jdbc_lookup_1 = flow.add_stage("JDBC Lookup")
jdbc_lookup_1.password = "***"
jdbc_lookup_1.sql_query = """WITH prev AS (
    SELECT location, timestamp  
    FROM transactions 
    WHERE account_id = ${record:value('/account_id')} AND suspicious = FALSE
    ORDER BY transaction_id DESC 
    LIMIT 1
)
SELECT  
    GEOGRAPHY_DISTANCE(prev.location, \"${record:value('/location')}\") as delta_x,  
    TIMESTAMPDIFF(SECOND, prev.timestamp, \"${record:value('/timestamp')}\") as delta_t 
FROM prev;"""
jdbc_lookup_1.username = "***"
jdbc_lookup_1.jdbc_connection_string = "jdbc:singlestore://10.89.0.2:3306/finance"
print("  Stage 2: JDBC Lookup configured")

# Stage 3: Expression Evaluator (flag suspicious transactions)
expression_evaluator_1 = flow.add_stage("Expression Evaluator")
expression_evaluator_1.field_attribute_expressions = [{'fieldToSet': '/'}]
expression_evaluator_1.field_expressions = [{'fieldToSet': '/suspicious', 'expression': f"${{record:exists('/delta_x') and record:exists('/delta_t') and record:value('/delta_x') / record:value('/delta_t') > {VELOCITY_THRESHOLD}}}"}]
expression_evaluator_1.header_attribute_expressions = [{}]
print("  Stage 3: Expression Evaluator configured")

# Stage 4: Elasticsearch (for monitoring/dashboards)
elasticsearch_1 = flow.add_stage("Elasticsearch")
elasticsearch_1.index = "transactions"
elasticsearch_1.http_urls = "http://10.89.0.196:9200"
print("  Stage 4: Elasticsearch configured")

# Stage 5: SingleStore (for persistent storage)
singlestore_1 = flow.add_stage("SingleStore")
singlestore_1.password = "***"
singlestore_1.schema_name = "finance"
singlestore_1.data_sqlstate_codes = ['']
singlestore_1.username = "***"
singlestore_1.table_name = "transactions"
singlestore_1.jdbc_connection_string = "jdbc:singlestore://10.89.0.2:3306/finance"
print("  Stage 5: SingleStore configured")

print("\nAll stages configured successfully")

### Cell 8: Connect Stages and Save Flow

In [None]:
print("Connecting stages...")

kafka_multitopic_consumer_1.connect_output_to(jdbc_lookup_1)
print("  Kafka Consumer -> JDBC Lookup")

jdbc_lookup_1.connect_output_to(expression_evaluator_1)
print("  JDBC Lookup -> Expression Evaluator")

expression_evaluator_1.connect_output_to(elasticsearch_1)
print("  Expression Evaluator -> Elasticsearch")

expression_evaluator_1.connect_output_to(singlestore_1)
print("  Expression Evaluator -> SingleStore")

print("\nAll stages connected")

project.update_flow(flow)
print("\nFlow saved to watsonx.data Integration")
print("\n" + "="*70)
print("SUCCESS! Streaming flow created")
print("="*70)
print(f"\nFlow '{FLOW_NAME}' is now available in your project.")
print("Go to the UI to view, configure, or execute the flow.")
print("\nhttps://ca-tor.dai.cloud.ibm.com/df/home?context=df")