In [1]:
#loading required python classes and packages
from confluent_kafka import Consumer, KafkaError, KafkaException, Producer, TopicPartition
from confluent_kafka import Producer
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score

In [20]:
#loading & displaying dataset values
dataset = pd.read_csv("Dataset/BankingTransactionDataSet1.csv")
dataset

Unnamed: 0,transaction_id,sender_account_id,recipient_account_id,amount,payment_mode,timestamp,sender_location,recipient_location,device_fingerprint,transaction_frequency,is_fraud
0,T12348,AXIS00004,PNB00014,75.00,UPI,01-01-2025 08:15,Kerala,Rajasthan,DF004,8,0
1,T12349,ICICI00003,ICICI00011,22676.41,UPI,02-01-2025 18:20,Maharashtra,Karnataka,DF001,10,0
2,T12348,HDFC00002,HDFC00013,178991.58,NEFT,03-01-2025 07:10,Maharashtra,Maharashtra,DF002,12,0
3,T12345,ICICI00003,AXIS00012,88590.55,NEFT,03-01-2025 07:10,Gujarat,Tamil Nadu,DF005,8,1
4,T12346,PNB00005,PNB00014,38150.83,UPI,01-01-2025 08:15,Rajasthan,Karnataka,DF005,11,1
...,...,...,...,...,...,...,...,...,...,...,...
95,T12346,ICICI00003,ICICI00011,155545.15,RTGS,01-01-2025 14:30,Gujarat,Maharashtra,DF004,12,0
96,T12348,ICICI00003,PNB00014,166001.05,UPI,02-01-2025 18:20,Maharashtra,Tamil Nadu,DF001,7,1
97,T12349,SBIN00001,SBIN00010,75.00,NEFT,02-01-2025 09:45,Kerala,Uttar Pradesh,DF003,9,0
98,T12349,PNB00005,HDFC00013,75.00,NEFT,02-01-2025 18:20,Kerala,Uttar Pradesh,DF003,12,0


In [21]:
#dataset cleaning and processing by converting non-numeric values to numeric values
label_encoder = []
columns = dataset.columns
types = dataset.dtypes.values
for j in range(len(types)):
    name = types[j]
    if name == 'object': #finding column with object type
        le = LabelEncoder()
        dataset[columns[j]] = pd.Series(le.fit_transform(dataset[columns[j]].astype(str)))#encode all str columns to numeric
        label_encoder.append([columns[j], le])
dataset.fillna(dataset.mean(), inplace = True)#replace missing values with meaan if exists
dataset

Unnamed: 0,transaction_id,sender_account_id,recipient_account_id,amount,payment_mode,timestamp,sender_location,recipient_location,device_fingerprint,transaction_frequency,is_fraud
0,3,0,3,75.00,3,0,2,2,3,8,0
1,4,2,2,22676.41,3,3,3,0,0,10,0
2,3,1,1,178991.58,1,4,3,1,1,12,0
3,0,2,0,88590.55,1,4,1,3,4,8,1
4,1,3,3,38150.83,3,0,4,0,4,11,1
...,...,...,...,...,...,...,...,...,...,...,...
95,1,2,2,155545.15,2,1,1,1,3,12,0
96,3,2,3,166001.05,3,3,3,3,0,7,1
97,4,4,4,75.00,1,2,2,4,2,9,0
98,4,3,1,75.00,1,3,2,4,2,12,0


In [22]:
#extracting training features and target label
Y = dataset['is_fraud'].ravel()
dataset.drop(['is_fraud'], axis = 1,inplace=True)
X = dataset.values
scaler = StandardScaler()
X = scaler.fit_transform(X)
print("Features Normalization Completed : "+str(X))

Features Normalization Completed : [[ 0.5114659  -1.66050831  0.6848137  -0.81046964  0.99068013 -1.42806078
   0.01393601 -0.10693845  0.55440852 -0.07941253]
 [ 1.21210412 -0.19103193 -0.00691731 -0.50250168  0.99068013  0.62178723
   0.71073668 -1.53278446 -1.47391533  0.41691581]
 [ 0.5114659  -0.92577012 -0.69864832  1.62745674 -0.73224184  1.3050699
   0.71073668 -0.81986146 -0.79780738  0.91324415]
 [-1.59044876 -0.19103193 -1.39037934  0.39564773 -0.73224184  1.3050699
  -0.68286465  0.60598455  1.23051647 -0.07941253]
 [-0.88981054  0.54370626  0.6848137  -0.29164645  0.99068013 -1.42806078
   1.40753735 -1.53278446  1.23051647  0.66507998]
 [ 0.5114659   1.27844445 -1.39037934 -0.28777352 -0.73224184 -1.42806078
   0.71073668  0.60598455  1.23051647  0.16875164]
 [ 1.21210412 -0.19103193 -0.00691731 -0.61902758  0.99068013 -0.06149544
  -0.68286465 -0.10693845  0.55440852 -0.32757671]
 [ 1.21210412 -0.19103193 -1.39037934 -0.81046964  0.99068013 -0.74477811
   1.40753735 -0.8

In [35]:
#split dataset into train and test
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.3)
print("80% Training Size = "+str(X_train.shape[0]))
print("20% Testing Size = "+str(X_test.shape[0]))

80% Training Size = 70
20% Testing Size = 30


In [36]:
#function to calculate all metrics
def calculateMetrics(algorithm, y_test, predict):
    global graph
    a = accuracy_score(y_test,predict)*100
    p = precision_score(y_test, predict,average='macro') * 100
    r = recall_score(y_test, predict,average='macro') * 100
    f = f1_score(y_test, predict,average='macro') * 100
    a = round(a, 3)
    p = round(p, 3)
    r = round(r, 3)
    f = round(f, 3)
    print(algorithm+" Accuracy  : "+str(a))
    print(algorithm+" Precision : "+str(p))
    print(algorithm+" Recall    : "+str(r))
    print(algorithm+" FSCORE    : "+str(f))

In [37]:
#training Randon Forest 
rf = RandomForestClassifier()
rf.fit(X_train, y_train)
#performing prediction on test data
predict = rf.predict(X_test)
#call this function to calculate accuracy and other metrics
calculateMetrics("Random Forest", y_test, predict)

Random Forest Accuracy  : 56.667
Random Forest Precision : 57.24
Random Forest Recall    : 57.143
Random Forest FSCORE    : 56.618


In [12]:
#kafka producer to send stream
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
#creating producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
#loading dataset
dataset = pd.read_csv("Dataset/BankingTransactionDataSet1.csv")
dataset = dataset.values
#producer publishing dataset topics to bank transaction
for i in range(len(dataset)):
    data = ""
    for j in range(len(dataset[i])):
        data += str(dataset[i,j])+"#"
    if len(data) > 0:
        data = data[0:len(data)-1]
    p.produce('BankTransaction', data, callback=delivery_report)
    p.flush()
p.produce('BankTransaction', "exit", callback=delivery_report)
p.flush()

Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivered to BankTransaction [0]
Message delivere

0

In [9]:
def consume():
    cols = ['transaction_id','sender_account_id','recipient_account_id','amount','payment_mode','timestamp','sender_location',
            'recipient_location','device_fingerprint','transaction_frequency']
    conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'sensor_stream_processor', 'auto.offset.reset': 'earliest'}
    consumer = Consumer(conf)
    # Subscribe to the BankTransaction
    consumer.subscribe(['BankTransaction'])
    # consume to stream data and process to cassandra
    flag = True
    while flag:
        msg = consumer.poll(1.0)
        if msg is None:
            # No message received in the last poll interval
            continue
        elif msg.error():
            # Handle any errors that occurred while polling for messages
            raise KafkaException(msg.error())
        else:
            msgs =  msg.value().decode('utf-8')
            print(msgs)
            if msgs == "exit":
                flag = False
                break
            else:
                data = msgs.split("#")
                values = []
                values.append([data[0], data[1], data[2], float(data[3]), data[4], data[5], data[6], data[7], data[8], int(data[9])])
                values = pd.DataFrame(values, columns=cols)
                temp = pd.DataFrame(values, columns=cols)
                for i in range(len(label_encoder)):
                    le = label_encoder[i]
                    values[le[0]] = pd.Series(le[1].transform(values[le[0]].astype(str)))#encode all str columns to numeric
                values = values.values
                values = scaler.transform(values)
                predict = rf.predict(values)[0]
                temp['Predicted'] = predict
                temp

In [None]:
from threading import Thread
thread = Thread(target = consume)
thread.start()
thread.join()