In [None]:
# CONSUMER RECEIVING DATA FROM KAFKA PRODUCER THEN STORE IT TO MONGO DB

In [1]:
import six
import sys
if sys.version_info >= (3, 12, 0):
    sys.modules['kafka.vendor.six.moves'] = six.moves
import json
from kafka import KafkaConsumer
from pymongo import MongoClient

def insert_data_to_mongodb(data, collection):
    # Insert data into MongoDB collection
    collection.insert_one(data)
    print(f"Inserted record: {data}")

if __name__ == "__main__":
    # Initialize MongoDB connection
    client = MongoClient(("mongodb://admin:admin@localhost:27017/"))
    db = client["kafka_consumer_data_01"]  # database name to store consumer data
    collection = db["kafka_consumer_collection_01"]  # collection name to store consumer data
    consumer = KafkaConsumer("ftde01-project4", bootstrap_servers="localhost")
    print("Starting the consumer")

    for msg in consumer:
        data = json.loads(msg.value)
        if isinstance(data, list):
            for record in data:
                insert_data_to_mongodb(record, collection)
        else:
            insert_data_to_mongodb(data, collection)

client.close()

Starting the consumer
Inserted record: {'step': 1, 'type': 'PAYMENT', 'amount': 9839.64, 'nameOrig': 'C1231006815', 'newbalanceOrig': 160296.36, 'nameDest': 'M1979787155', 'newbalanceDest': 0.0, '_id': ObjectId('66910e0ae44630bf9f92c6bd')}
Inserted record: {'step': 1, 'type': 'PAYMENT', 'amount': 1864.28, 'nameOrig': 'C1666544295', 'newbalanceOrig': 19384.72, 'nameDest': 'M2044282225', 'newbalanceDest': 0.0, '_id': ObjectId('66910e0fe44630bf9f92c6be')}
Inserted record: {'step': 1, 'type': 'TRANSFER', 'amount': 181.0, 'nameOrig': 'C1305486145', 'newbalanceOrig': 0.0, 'nameDest': 'C553264065', 'newbalanceDest': 0.0, '_id': ObjectId('66910e14e44630bf9f92c6bf')}
Inserted record: {'step': 1, 'type': 'CASH_OUT', 'amount': 181.0, 'nameOrig': 'C840083671', 'newbalanceOrig': 0.0, 'nameDest': 'C38997010', 'newbalanceDest': 0.0, '_id': ObjectId('66910e19e44630bf9f92c6c0')}
Inserted record: {'step': 1, 'type': 'PAYMENT', 'amount': 11668.14, 'nameOrig': 'C2048537720', 'newbalanceOrig': 29885.86, 'n

KeyboardInterrupt: 

In [None]:
# MERGING DATA FROM MONGO DB THAT STORES DATA CATHCED FROM KAFKA CONSUMER WITH DATA FROM POSTGRESQL

In [2]:
!pip install pandas sqlalchemy pymongo psycopg2-binary




[notice] A new release of pip is available: 24.0 -> 24.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
# Import library needed

import pandas as pd
from sqlalchemy import create_engine
from pymongo import MongoClient

In [3]:
# MongoDB connection
client = MongoClient(("mongodb://admin:admin@localhost:27017/"))
mongo_db = client["kafka_consumer_data_01"]
mongo_collection = mongo_db['kafka_consumer_collection_01']

# PostgreSQL connection
pg_engine = create_engine('postgresql://postgres:admin@localhost:5432/project_stream_processing')

# Fetch data from MongoDB
mongo_data = pd.DataFrame(list(mongo_collection.find()))

# Fetch data from PostgreSQL
pg_data = pd.read_sql_table('old_information', pg_engine)

# Merge data (data will be merged on 'nameOrig' and'nameDest')
merged_data = pd.merge(mongo_data, pg_data, on=['nameOrig','nameDest'])

# test script
print(merged_data.head(3))

                        _id  step      type   amount     nameOrig  \
0  66910e0ae44630bf9f92c6bd     1   PAYMENT  9839.64  C1231006815   
1  66910e0fe44630bf9f92c6be     1   PAYMENT  1864.28  C1666544295   
2  66910e14e44630bf9f92c6bf     1  TRANSFER   181.00  C1305486145   

   newbalanceOrig     nameDest  newbalanceDest  oldbalanceOrg  oldbalanceDest  
0       160296.36  M1979787155             0.0       170136.0             0.0  
1        19384.72  M2044282225             0.0        21249.0             0.0  
2            0.00   C553264065             0.0          181.0             0.0  


In [None]:
# RUNNING THE MODEL ON THE DATA MERGED FROM BOTH DATABASE

In [4]:
pip show scikit-learn

Name: scikit-learn
Version: 1.5.1
Summary: A set of python modules for machine learning and data mining
Home-page: https://scikit-learn.org
Author: 
Author-email: 
License: new BSD
Location: C:\Users\LENOVO\AppData\Local\Programs\Python\Python312\Lib\site-packages
Requires: joblib, numpy, scipy, threadpoolctl
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [5]:
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module='sklearn')


In [6]:
from modelling import FraudModel

In [7]:
import os

path = os.getcwd()
path = path + "\\modelling\\"
path

'D:\\project_4_FTDE_fraud_detection\\modelling\\'

In [8]:
# Initialize an empty list to collect results
results = []

# Loop through each row in merged_data
for idx, row in merged_data.iterrows():
    # Convert the row to a dictionary
    predict = row.drop(['nameOrig', 'nameDest']).to_dict()
    # Run the model
    result = FraudModel.runModel(predict, path)
    # Append the result to the list
    results.append(result)

In [9]:
print(results)

['White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'White List', 'Whit

In [10]:
# Convert results to DataFrame
result_df = pd.DataFrame(results, index=merged_data.index)

# Rename the result column to 'predictionResult'
result_df.columns = ['predictionResult']

# Combine result with merged_data
final_data = merged_data.join(result_df, how='left')

In [11]:
# Print final data to verify

print(final_data.head(5))
final_data.columns

                        _id  step      type    amount     nameOrig  \
0  66910e0ae44630bf9f92c6bd     1   PAYMENT   9839.64  C1231006815   
1  66910e0fe44630bf9f92c6be     1   PAYMENT   1864.28  C1666544295   
2  66910e14e44630bf9f92c6bf     1  TRANSFER    181.00  C1305486145   
3  66910e19e44630bf9f92c6c0     1  CASH_OUT    181.00   C840083671   
4  66910e1ee44630bf9f92c6c1     1   PAYMENT  11668.14  C2048537720   

   newbalanceOrig     nameDest  newbalanceDest  oldbalanceOrg  oldbalanceDest  \
0       160296.36  M1979787155             0.0       170136.0             0.0   
1        19384.72  M2044282225             0.0        21249.0             0.0   
2            0.00   C553264065             0.0          181.0             0.0   
3            0.00    C38997010             0.0          181.0         21182.0   
4        29885.86  M1230701703             0.0        41554.0             0.0   

  predictionResult  
0       White List  
1       White List  
2       White List  
3       

Index(['_id', 'step', 'type', 'amount', 'nameOrig', 'newbalanceOrig',
       'nameDest', 'newbalanceDest', 'oldbalanceOrg', 'oldbalanceDest',
       'predictionResult'],
      dtype='object')

In [None]:
# CONVERTING DATA OF PREDICTION RESULT FROM DATA FRAME TO DICTIONARY AND INGEST TO MONGO DB AGAIN

In [12]:
from pymongo import MongoClient
mongo_client = MongoClient("mongodb://admin:admin@localhost:27017/")
db = mongo_client["project4_model_result_01"]
collection = db["project4_model_result_coll_01"]

# Convert the entire DataFrame to a list of dictionaries
data_dict = final_data.to_dict('records')

# Insert the data into MongoDB
collection.insert_many(data_dict)
print("Data stored to MongoDB")

Data stored to MongoDB
