In [0]:
from confluent_kafka import Consumer, KafkaException
from pymongo import MongoClient
import json

# ✅ Kafka Configuration
KAFKA_BOOTSTRAP_SERVER = "Replace it with your bootstrap server."
KAFKA_API_KEY = "Replace with your actual API Key"
KAFKA_API_SECRET = "Replace with your actual API Secret"  
KAFKA_TOPIC = "txndata"
KAFKA_GROUP_ID = "fraud-detection-group"

# ✅ MongoDB Configuration
MONGO_URI = "Replace with your mongodb uri."
MONGO_DB = "txn_db"
MONGO_COLLECTION = "fraud_alerts"

client = MongoClient(MONGO_URI)
db = client[MONGO_DB]
collection = db[MONGO_COLLECTION]

def if_fraudulent(transaction):
      fraud_score = transaction['amount']/5000
      return fraud_score > 0.8
conf = {
    'bootstrap.servers': KAFKA_BOOTSTRAP_SERVER,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': KAFKA_API_KEY,
    'sasl.password': KAFKA_API_SECRET,
    'group.id':KAFKA_GROUP_ID,
    'auto.offset.reset':'earliest'
}

consumer = Consumer(conf)
consumer.subscribe([KAFKA_TOPIC])

print(f"listning for the msg on kafka topics :{KAFKA_TOPIC}")

try :
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print(f"kafka erro not able to get the msg {msg.error()}")
            continue
        transaction = json.loads(msg.value().decode('utf-8'))
        print(f"received msg from kafka is {transaction}")

        if if_fraudulent(transaction):
            print("fraud detected ",transaction )
            collection.insert_one(transaction)
except Exception as e :
    print(e)
finally:
    consumer.close()

listning for the msg on kafka topics :txndata
received msg from kafka is {'transaction_id': '09fb1749-20b2-434a-a941-6d1ebe192ac8', 'timestamp': 1742582665, 'user_id': 14556, 'amount': 3312.72, 'transaction_type': 'withdrawal', 'location': 'Escobartown', 'mearchant': 'Jackson, Banks and Dickerson', 'card_number': '349732511596353'}
received msg from kafka is {'transaction_id': 'fe6bb1ee-5448-458b-9c97-a4adab008139', 'timestamp': 1742582671, 'user_id': 48961, 'amount': 3202.38, 'transaction_type': 'purchase', 'location': 'Carlaport', 'mearchant': 'Wilson LLC', 'card_number': '30096507207902'}
received msg from kafka is {'transaction_id': '7a2ada52-530d-46f7-9825-1fd8160f811d', 'timestamp': 1742582673, 'user_id': 88200, 'amount': 4276.53, 'transaction_type': 'transfer', 'location': 'Arnoldchester', 'mearchant': 'Burke-Jones', 'card_number': '30106194575360'}
fraud detected  {'transaction_id': '7a2ada52-530d-46f7-9825-1fd8160f811d', 'timestamp': 1742582673, 'user_id': 88200, 'amount': 427

In [0]:
pip install confluent_kafka

Python interpreter will be restarted.
Collecting confluent_kafka
  Using cached confluent_kafka-2.8.2-cp39-cp39-manylinux_2_28_x86_64.whl (3.8 MB)
Installing collected packages: confluent-kafka
Successfully installed confluent-kafka-2.8.2
Python interpreter will be restarted.


In [0]:
pip install pymongo

Python interpreter will be restarted.
Collecting pymongo
  Downloading pymongo-4.11.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (921 kB)
Collecting dnspython<3.0.0,>=1.16.0
  Downloading dnspython-2.7.0-py3-none-any.whl (313 kB)
Installing collected packages: dnspython, pymongo
Successfully installed dnspython-2.7.0 pymongo-4.11.3
Python interpreter will be restarted.
