In [1]:
import redis
import json

# Define connection variables
host = 'localhost'
port =  6379
password = None #'password'

# Connect to Redis
r = redis.Redis(host=host, port=port, password=password, decode_responses=True)
print('Connected to Redis')

r.flushdb()

Connected to Redis


True

# Real-time Risk Scoring System

## Description of the use case
This use case involves calculating Risk scoring for the transactions in real-time by using Redis Bloom Filters and Streams. By analyzing transaction data, businesses can quickly calculate the risk score for the transaction.

## Redis Data Structures Used
- **Bloom Filters**: To check for known fraudulent entities.
- **Streams**: To capture and process real-time transaction data.
- **Hashes**: To store transaction details.


In [8]:
r.flushdb()
# Mock data for the use case
transactions = {
    'tx_1001': {'user_id': 'user_1', 'amount': 500, 'status': 'pending'},
    'tx_1002': {'user_id': 'user_2', 'amount': 4500, 'status': 'pending'},
    'tx_1003': {'user_id': 'user_3', 'amount': 700, 'status': 'pending'},
    'tx_1004': {'user_id': 'user_2', 'amount': 70000, 'status': 'pending'}
}

# Use Hashes to store transaction details
for tx_id, tx_info in transactions.items():
    r.hset(tx_id, mapping=tx_info)

usr_txn_data_hist = {
    'user_1': [500, 520, 530, 540, 560,560,560,520,200,520],
    'user_2': [5000, 5200, 5300, 5400, 5600,5600,5600,5200,2000,5200],
    'user_3': [700, 720, 730, 740, 760,760,760,720,700,720],
 }
# Use T-Digest to detect trends in sales data
for user_id, data in usr_txn_data_hist.items():
    r.tdigest().create(key=f'tdigest:{user_id}')
    for value in data:
        r.execute_command('TDIGEST.ADD', f'tdigest:{user_id}', value)


# Initialize Bloom Filter for known fraudulent ips
r.execute_command('BF.RESERVE', 'fraudulent_ips', 0.01, 1000)
fraudulent_ips = ['193.176.23.7','212.18.104.18','213.55.85.202','27.254.235.3','34.66.72.251','45.148.10.240','45.148.10.79',
                  '51.254.101.166','59.12.160.91','80.242.208.68','88.214.48.16','88.214.48.17','88.214.48.18','91.205.219.185']
for ip in fraudulent_ips:
    r.bf().add('fraudulent_ips', ip)

# Use Streams to capture real-time transaction data
r.xadd('transactions', {'tx_id': 'tx_1001', 'user_id': 'user_1', 'amount': 500, 'ip': '10.2.4.255'})
r.xadd('transactions', {'tx_id': 'tx_1002', 'user_id': 'user_2', 'amount': 3000,'ip': '88.214.48.17'})
r.xadd('transactions', {'tx_id': 'tx_1003', 'user_id': 'user_3', 'amount': 700, 'ip': '178.3.27.192'})
r.xadd('transactions', {'tx_id': 'tx_1004', 'user_id': 'user_2', 'amount': 70000, 'ip': '178.3.27.192'})


def get_ip_risk(ip):
        if not r.bf().exists('fraudulent_ips', ip):
            return 0
        else:
            return 1

def get_amt_risk(user_id, amt):
        # get 80th %ile value
        val = r.tdigest().quantile("tdigest:"+user_id, 0.8)
        if amt < val[0]*2:
            return 0
        else:
            print('Mean txn Val:'+str(val) + ' txn amt:'+ str(amt))
            return 1


# Function to process transactions and based on ip
def process_transactions():
    stream_entries = r.xrange('transactions')
    for entry in stream_entries:
        tx_id = entry[1]['tx_id']
        user_id = entry[1]['user_id']
        amount = int(entry[1]['amount'])
        ip = entry[1]['ip']
        risk_score_ip = get_ip_risk(ip)
        risk_score_amt = get_amt_risk(user_id, amount)
        total_risk_score = risk_score_ip + risk_score_amt;
        if total_risk_score > 0:
           r.hset(tx_id, 'risk_score', total_risk_score)
        else:
            r.hset(tx_id, 'status', 'Approved')

# Process the transactions
process_transactions()

# Print the updated transaction details
for tx_id in transactions.keys():
    print(f"{tx_id}: {r.hgetall(tx_id)}")


Mean txn Val:[5600] txn amt:70000
tx_1001: {'user_id': 'user_1', 'amount': '500', 'status': 'Approved'}
tx_1002: {'user_id': 'user_2', 'amount': '4500', 'status': 'pending', 'risk_score': '1'}
tx_1003: {'user_id': 'user_3', 'amount': '700', 'status': 'Approved'}
tx_1004: {'user_id': 'user_2', 'amount': '70000', 'status': 'pending', 'risk_score': '1'}
