# kafka handler notebook (setup + producer )

In [None]:
! pip install confluent-kafka


## Getting started with kafka
- use the provided link or simply go to https://kafka.apache.org/
- download the the latest version of Kafka 
- unzip .tgz file into your root project directory (at the same level as this notebook)
- update KAFKA_FOLDER variable below to appropriate directory name is necessary
- run the next two cells to get the commands to run in a terminal because they run indefinitely

In [None]:
KAFKA_FOLDER= "kafka_2.13-3.9.0/"

#launch-kafka-cluster

#This script must be run in the terminal window below rather than jupyter!
! echo "{KAFKA_FOLDER}bin/zookeeper-server-start.sh {KAFKA_FOLDER}config/zookeeper.properties"


In [None]:
#start the kafka broker
# get the command to run in a new terminal window below (cannot run in this jupyter notebook)
! echo "{KAFKA_FOLDER}bin/kafka-server-start.sh {KAFKA_FOLDER}config/server.properties"

In [None]:
# create topic "ecommerce_topic"
! {KAFKA_FOLDER}bin/kafka-topics.sh --create --topic ecommerce_topic \
        --bootstrap-server localhost:9092 \
        --replication-factor 1 --partitions 1  
    


In [None]:
#launch-kafka-producer
! kafka_2.13-3.9.0/bin/kafka-console-producer.sh --topic ecommerce_topic --bootstrap-server localhost:9092


#this is an example of data 
{"transaction_id": "T12345", "user_id": "U56789", "amount": 250.0, "timestamp": "2024-12-05T15:30:00Z"}


In [None]:
#launch-kafka-consumer
! bin/kafka-console-consumer.sh --topic ecommerce_topic --from-beginning --bootstrap-server localhost:9092


# Kafka producer for the main project : 

In [None]:
from confluent_kafka import Producer
import json
import time
import random
from datetime import datetime

# Kafka producer configuration
producer = Producer({
    'bootstrap.servers': 'localhost:9092'
})

# Function to generate fake transaction data
def generate_fake_transaction():
    transaction_id = f"T{random.randint(10000, 99999)}"
    user_id = f"U{random.randint(1000, 9999)}"
    amount = round(random.uniform(10.0, 1000.0), 2)
    timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
    return {
        "transaction_id": transaction_id,
        "user_id": user_id,
        "amount": amount,
        "timestamp": timestamp
    }

try:
    while True:
        # Generate a fake transaction
        fake_transaction = generate_fake_transaction()

        # Send the transaction to the Kafka topic
        producer.produce('ecommerce_topic', value=json.dumps(fake_transaction))
        print(f"Sent: {fake_transaction}")

        # Ensure all messages are sent
        producer.flush()

        # Wait 5 seconds before sending the next event
        time.sleep(5)

except KeyboardInterrupt:
    print("\nStopping the producer...")


# use this fake data generator for practice task :


In [None]:
from confluent_kafka import Producer
import json
import time
import random
from datetime import datetime


# Kafka producer configuration
producer = Producer({
    'bootstrap.servers': 'localhost:9092'
})

# Function to generate fake transaction data
def generate_fake_transaction():
    transaction_id = f"T{random.randint(10000, 99999)}"
    user_id = [f"U{random.randint(1000, 1005)}" for _ in range(5)]  # A limited set of user IDs
    # Generate a higher proportion of transactions above $300
    amount = round(random.uniform(100.0, 1200.0), 2)
    timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    return {
        "transaction_id": transaction_id,
        "user_id": user_id,
        "amount": amount,
        "timestamp": timestamp
    }

# Generate and send transactions
print("Starting to send fake transactions to Kafka...")
try:
    while True:
        # Generate a fake transaction
        fake_transaction = generate_fake_transaction()

        # Send the transaction to the Kafka topic
        producer.produce('ecommerce_topic', value=json.dumps(fake_transaction))
        print(f"Sent: {fake_transaction}")

        # Ensure all messages are sent
        producer.flush()

        time.sleep(random.uniform(0.5, 2))  # Random delay to simulate streaming

except KeyboardInterrupt:
    print("\nStopping the producer...")
finally:
    producer.close()


# use this fake data generator for  task 4 :


In [None]:
from confluent_kafka import Producer
import json
import time
import random
from datetime import datetime


# Kafka producer configuration
producer = Producer({
    'bootstrap.servers': 'localhost:9092'
})

# Function to generate fake transaction data
def generate_fake_transaction():
    transaction_id = f"T{random.randint(10000, 99999)}"
    user_id = f"U{random.randint(1000, 1020)}"  # Small range to allow grouping by user_id
    amount = round(random.uniform(5000.0, 15000.0), 2)  # Ensure some transactions are >10,000
    timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    return {
        "transaction_id": transaction_id,
        "user_id": user_id,
        "amount": amount,
        "timestamp": timestamp
    }

# Generate and send transactions
print("Starting to send fake transactions to Kafka...")
try:
    while True:
        # Generate a fake transaction
        fake_transaction = generate_fake_transaction()

        # Send the transaction to the Kafka topic
        producer.produce('ecommerce_topic', value=json.dumps(fake_transaction))
        print(f"Sent: {fake_transaction}")

        # Ensure all messages are sent
        producer.flush()

        time.sleep(random.uniform(0.5, 2))  # Random delay to simulate streaming

except KeyboardInterrupt:
    print("\nStopping the producer...")
finally:
    producer.close()


# use this fake data generator for cumulative task :


In [None]:
def generate_fake_transaction():
    transaction_id = f"T{random.randint(10000, 99999)}"
    user_id = f"U{random.randint(1000, 9999)}"
    
    # Randomly assign some transactions as potentially fraudulent
    amount = round(random.uniform(10.0, 15000.0), 2)
    
    # Introduce occasional late or out-of-order timestamps
    delay = random.choice([0, -5, -10])  # Real-time, -5 minutes, -10 minutes
    timestamp = (datetime.utcnow() + timedelta(minutes=delay)).strftime("%Y-%m-%dT%H:%M:%SZ")
    
    return {
        "transaction_id": transaction_id,
        "user_id": user_id,
        "amount": amount,
        "timestamp": timestamp
    }