In [1]:
pip install kafka-python

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install pymongo

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [3]:
import json
from pymongo import MongoClient
import math
import pandas as pd
import numpy as np
import datetime

In [4]:
speed_threshold = 0.25  # km/sec - Average speed of flight 900 km/hr

In [5]:
class GEO_Map():
	"""
	It hold the  map for zip code and its latitute and longitute
	"""
	__instance = None

	@staticmethod
	def get_instance():
		""" Static access method. """
		if GEO_Map.__instance == None:
			GEO_Map()
		return GEO_Map.__instance

	def __init__(self):
		""" Virtually private constructor. """
		if GEO_Map.__instance != None:
			raise Exception("This class is a singleton!")
		else:
			GEO_Map.__instance = self
			self.map = pd.read_csv("s3://aws-emr-resources-560518250500-us-east-1/Creditcard_frauddeduction/uszipsv.csv", header=None, names=['A',"B",'C','D','E'])
			self.map['A'] =  self.map['A'].astype(str)

	def get_lat(self, pos_id):
		#print("printing value from class ", self.map[self.map.A == pos_id ].B)        
		return self.map[self.map.A == pos_id ].B

	def get_long(self, pos_id):
		return self.map[self.map.A == pos_id ].C

	def distance(self, lat1, long1, lat2, long2):
		theta = long1 - long2
		dist = math.sin(self.deg2rad(lat1)) * math.sin(self.deg2rad(lat2)) + math.cos(self.deg2rad(lat1)) * math.cos(self.deg2rad(lat2)) * math.cos(self.deg2rad(theta))
		dist = math.acos(dist)
		dist = self.rad2deg(dist)
		dist = dist * 60 * 1.1515 * 1.609344
		return dist

	def rad2deg(self, rad):
		return rad * 180.0 / math.pi

	def deg2rad(self, deg):
		return deg * math.pi / 180.0

In [6]:
# Function to check rules for UCL and Credit Score
def verify_ucl_data(card_id, amount):
    try:
        client = MongoClient();
        # Database Name
        db = client["CREDIT_CARD_DB"]
        # Collection Name
        lookupTable = db["lookup_table"]
        lookupValue = lookupTable.find_one({'card_id': card_id}) #378303738095292
        if amount < float(lookupValue["ucl"]) and float(lookupValue["ucl"]) > 200:
            return True
        else:
            return False
    except Exception as e:
        raise Exception(e)

In [7]:
"""
Function to verify the following zipcode rules
ZIP code distance
:param card_id: (Long) Card id of the card customer
:param postcode: (Integer) Post code of the card transaction
:param transaction_dt: (String) Timestamp
:return: (Boolean)
"""
def verify_postcode_data(card_id, postcode, transaction_dt):

    try:
        
        client = MongoClient();
        # Database Name
        db = client["CREDIT_CARD_DB"]
        # Collection Name
        lookupTable = db["lookup_table"]
        lookupValue = lookupTable.find_one({'card_id': card_id}) #378303738095292
        geo_map = GEO_Map.get_instance()
        last_postcode = lookupValue["postcode"]
        last_transaction_dt = lookupValue["transaction_dt2"]
        current_lat = geo_map.get_lat(str(postcode))
        for data in current_lat:
            current_lat1 = data
        current_long = geo_map.get_long(str(postcode))
        for data in current_lon:
            current_long1 = data
        previous_lat = geo_map.get_lat(str(last_postcode))
        for data in previous_lat:
            previous_lat1 = data
        previous_long = geo_map.get_long(str(last_postcode))
        for data in previous_long:
            previous_long1 = data
        
        dist = geo_map.distance(lat1=current_lat1, long1=current_lon1, lat2=previous_lat1, long2=previous_long1)
        speed = calculate_speed(dist, transaction_dt, last_transaction_dt)

        if speed < speed_threshold:
            return True
        else:
            return False

    except Exception as e:
        raise Exception(e)


In [8]:
#A function to calculate the speed from distance and transaction timestamp differentials

def calculate_speed(dist, transaction_dt1, transaction_dt2):
    transaction_dt1 = datetime.datetime.strptime(transaction_dt1, '%d-%m-%Y %H:%M:%S')
    transaction_dt2 = datetime.datetime.strptime(transaction_dt2, '%Y-%m-%d %H:%M:%S')
    elapsed_time = transaction_dt1 - transaction_dt2
    elapsed_time = elapsed_time.total_seconds()
    try:
        return dist / elapsed_time
    except ZeroDivisionError:
        return 299792.458  # (Speed of light)

In [9]:
#A function to update genuine and fraud transactions into card_transactions collections

def updateCardTransactions(kafkajsonObj, Status):
    client = MongoClient();
    # Database Name
    mydb = client["CREDIT_CARD_DB"]
    mycol = mydb["card_transactions"]
    if (Status==True):
        newStatus = "GENUINE"
    else:
        newStatus = "Fraud"
        
    mydict = {"card_id" : kafkajsonObj["card_id"], "member_id" : kafkajsonObj["member_id"], "amount" : kafkajsonObj["amount"], "postcode" : kafkajsonObj["postcode"], "pos_id" : kafkajsonObj["pos_id"], "transaction_dt" : kafkajsonObj["transaction_dt"], "status" : newStatus}    
    mycol.insert_one(mydict)
    

In [10]:
# A function to update genuine transactions into lookup_table collections
def updateLookUpTransactions(kafkajsonObj):
    client = MongoClient();
    # Database Name
    mydb = client["CREDIT_CARD_DB"]
    mycol = mydb["lookup_table"]
    lookupUpdQryOld = {"card_id" : kafkajsonObj["card_id"]}    
    lookupUpdQryNew = { "$set": {"card_id" : kafkajsonObj["card_id"], "member_id" : kafkajsonObj["member_id"], "amount" : kafkajsonObj["amount"], "postcode" : kafkajsonObj["postcode"], "pos_id" : kafkajsonObj["pos_id"], "transaction_dt" : kafkajsonObj["transaction_dt"], "status" : "GENUINE"}    }
    mycol.update_one(lookupUpdQryOld, lookupUpdQryNew)

In [11]:
# wrapper function to validate all 3 rules
def validateFraud(amount, card_id, postcode, txndate):
    status_ucl_crdScore = verify_ucl_data(card_id, amount)
    status_distance = verify_postcode_data(card_id, postcode, txndate)
    if status_ucl_crdScore==True and status_distance==True :
        print("3 Rules check passed!!!!!! ; congratulations! ")
        return True
    else : 
        return False

In [12]:
from kafka import KafkaConsumer
import sys

### Setting up the Python consumer
bootstrap_servers = ['18.211.252.152:9092']
topicName = 'transactions-topic-verified'
consumer = KafkaConsumer (topicName, bootstrap_servers = bootstrap_servers,
auto_offset_reset = 'earliest')   ## You can also set it as latest group_id = 'None'
i=0
### Reading the message from consumer
try:
    for message in consumer:
        kafkajsonObj = json.loads(message.value)
        status = validateFraud(kafkajsonObj["amount"], kafkajsonObj["card_id"],kafkajsonObj["postcode"], kafkajsonObj["transaction_dt"])
        updateCardTransactions(kafkajsonObj, status)
        print("Record inserted in transactions table")
        if (status==True):            
            updateLookUpTransactions(kafkajsonObj)
            print("Record updated in lookup table")
            
except KeyboardInterrupt:
    sys.exit()
    
import datetime
now = datetime.datetime.now()
print("Current date and time: ")
print(str(now))


Exception: localhost:27017: [Errno 111] Connection refused, Timeout: 30s, Topology Description: <TopologyDescription id: 63ea94a4553dc2465131e3ce, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [Errno 111] Connection refused')>]>