---
title: "Generate credit card transactions data and send to kafka topic"
date: 2021-04-25
type: technical_note
draft: false
---

# Generate credit card transactions data and send to kafka topic.

![overview-2.png](./images/overview-2.png)

### Inspiration of this example was taken from [here](https://github.com/aws-samples/amazon-sagemaker-feature-store-streaming-aggregation).

#### Prerequisites 

Install Faker if you don't have it installed already. We use Faker to generate synthetic data simulating credit cards and financial transactions.

In [1]:
#!pip install Faker

#### Imports 

In [1]:
from collections import defaultdict
from faker import Faker
import pandas as pd
import numpy as np
import datetime
import hashlib
import random
import math
import os

from hops import hdfs
from hops import pandas_helper as pandas

In [2]:
# Seed for Reproducibility
faker = Faker()
faker.seed_locale('en_US', 0)

In [3]:
SEED = 123
random.seed(SEED)
np.random.seed(SEED)
faker.seed_instance(SEED)

#### Constants 

In [25]:
TOTAL_UNIQUE_TRANSACTIONS = 5400
TOTAL_UNIQUE_USERS = 100
START_DATE = '2021-07-03 00:00:00'
END_DATE = '2021-07-06 00:01:00'
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'

# Change this according to your settings
KAFKA_BROKER_ADDRES = "broker.kafka.service.consul:9091"
KAFKA_TOPIC_NAME = "credit_card_transactions"

### Generate Transactions

#### Generate Unique Credit Card Numbers 
<p> Credit card numbers are uniquely assigned to users. Since, there are 10K users, we would want to generate 10K unique card numbers.</p>

In [26]:
def generate_unique_credit_card_numbers(n: int) -> list:
    cc_ids = set()
    for _ in range(n):
        cc_id = faker.credit_card_number(card_type='visa')
        cc_ids.add(cc_id)
    return list(cc_ids) 

In [27]:
credit_card_numbers = generate_unique_credit_card_numbers(TOTAL_UNIQUE_USERS)

In [28]:
assert len(credit_card_numbers) == TOTAL_UNIQUE_USERS 
assert len(credit_card_numbers[0]) == 16 # validate if generated number is 16-digit

In [29]:
# inspect random sample of credit card numbers 
random.sample(credit_card_numbers, 5)

['4173712112382760',
 '4819277670755385',
 '4070748215795379',
 '4461432145752357',
 '4391394388523066']

#### Generate Time Series


In [30]:
def generate_timestamps(n: int) -> list:
    start = datetime.datetime.strptime(START_DATE, DATE_FORMAT)
    end = datetime.datetime.strptime(END_DATE, DATE_FORMAT)
    timestamps = list()
    for _ in range(n):
        timestamp = faker.date_time_between(start_date=start, end_date=end, tzinfo=None).strftime(DATE_FORMAT)
        timestamps.append(timestamp)
    timestamps = sorted(timestamps)
    return timestamps

In [31]:
timestamps = generate_timestamps(TOTAL_UNIQUE_TRANSACTIONS)

In [32]:
assert len(timestamps) == TOTAL_UNIQUE_TRANSACTIONS

In [33]:
# inspect random sample of timestamps
random.sample(timestamps, 5)

['2021-07-05 05:45:21',
 '2021-07-03 10:20:33',
 '2021-07-03 00:32:53',
 '2021-07-03 08:04:36',
 '2021-07-04 10:53:32']

#### Generate Random Transaction Amounts 
<p>The transaction amounts are presumed to follow Pareto distribution, as it is logical for consumers to make many more smaller purchases than large ones. The break down of the distribution is shown in the table below.</p>


| Percentage        | Range (Amount in $)     |
| :-------------: | :----------: |
|  5\% | 0.01 to 1    |
| 7.5\%   | 1 to 10 |
| 52.5\%   | 10 to 100 |
| 25\%   | 100 to 1000 |
| 10\%   | 1000 to 10000 |

In [34]:
def get_random_transaction_amount(start: float, end: float) -> float:
    amt = round(np.random.uniform(start, end), 2)
    return amt

In [35]:
distribution_percentages = {0.05: (0.01, 1.01), 
                            0.075: (1, 11.01),
                            0.525: (10, 100.01),
                            0.25: (100, 1000.01),
                            0.10: (1000, 10000.01)}

In [36]:
amounts = []

for percentage, span in distribution_percentages.items():
    n = int(TOTAL_UNIQUE_TRANSACTIONS * percentage)
    start, end = span
    for _ in range(n):
        amounts.append(get_random_transaction_amount(start, end+1))
        
random.shuffle(amounts)

In [37]:
assert len(amounts) == TOTAL_UNIQUE_TRANSACTIONS

In [38]:
# inspect random sample of transaction amounts
random.sample(amounts, 5)

[0.97, 43.44, 3409.1, 221.29, 68.57]

#### Generate Credit Card Transactions
<br>
<div style="text-align: justify">
Using the random credit card numbers, timestamps and transaction amounts generated in the above steps, 
we can generate random credit card transactions by combining them. The transaction id for the transaction is the md5
hash of the above mentioned entities.
</div>

In [39]:
def generate_transaction_id(timestamp: str, credit_card_number: str, transaction_amount: float) -> str:
    hashable = f'{timestamp}{credit_card_number}{transaction_amount}'
    hexdigest = hashlib.md5(hashable.encode('utf-8')).hexdigest()
    return hexdigest

In [40]:
transactions = []
for timestamp, amount in zip(timestamps, amounts):
    credit_card_number = random.choice(credit_card_numbers)
    transaction_id = generate_transaction_id(timestamp, credit_card_number, amount)
    transactions.append({'tid': transaction_id, 
                         'datetime': timestamp, 
                         'cc_num': credit_card_number, 
                         'amount': amount, 
                         'fraud_label': 0})

In [41]:
assert len(transactions) == TOTAL_UNIQUE_TRANSACTIONS

In [42]:
# inspect random sample of credit card transactions
random.sample(transactions, 1)

[{'tid': 'd51b2a0600c0fd0e0dcf73f52a732f38',
  'datetime': '2021-07-03 08:55:46',
  'cc_num': '4534099886119592',
  'amount': 79.48,
  'fraud_label': 0}]

## Log financial transaction into Kafka

In [43]:
from hops import kafka
from hops import tls
from hops import hdfs
import json
from confluent_kafka import Producer

In [23]:
config = {
    "bootstrap.servers": KAFKA_BROKER_ADDRES,
    "security.protocol": kafka.get_security_protocol(),
    "ssl.ca.location": tls.get_ca_chain_location(),
    "ssl.certificate.location": tls.get_client_certificate_location(),
    "ssl.key.location": tls.get_client_key_location(),
    "group.id": "1"
}

producer = Producer(config)

In [44]:
i = 0
for transaction in transactions:
    if "fraud_label" in transaction:
        transaction.pop("fraud_label")
    if i % 1000 == 0:
        print(json.dumps(transaction))
    producer.produce(KAFKA_TOPIC_NAME, json.dumps(transaction))
    producer.flush()    
    i += 1

{"tid": "40543f2f6258bbe60237c8dab7293f40", "datetime": "2021-07-03 00:01:17", "cc_num": "4954603686205039", "amount": 67.79}
{"tid": "04072834d13a99811d35666b2781207d", "datetime": "2021-07-03 12:49:54", "cc_num": "4691298379888791", "amount": 32.51}
{"tid": "75b1097130bad9c6a5442108bcd165cf", "datetime": "2021-07-04 02:26:19", "cc_num": "4590824025018285", "amount": 1.54}
{"tid": "8c1eb5cb01c27f9f096ab76b2c82a0a1", "datetime": "2021-07-04 15:16:54", "cc_num": "4700385810202986", "amount": 2995.11}
{"tid": "455e036fbbf1dc14f702239d06e0546b", "datetime": "2021-07-05 05:35:30", "cc_num": "4222541624502388", "amount": 42.87}
{"tid": "d5f9910a1ff55b7c647a05209b082a2d", "datetime": "2021-07-05 19:08:38", "cc_num": "4313567388270789", "amount": 36.84}


# Simulate fraudulent transactions

> NOTE: Before polluting the `credit_card_transactions` topic with fraudulent transactions, you can run the notebooks `3_stream-ingestion`, `4_create_training_dataset` and `5_model_training` to create a Training Dataset with the original transactions and train an autoencoder that learns these patterns. Once the model is served, you can create fraudulent transactions by running the code below.

#### Create Attack Transaction Chains 

In [45]:
FRAUD_RATIO = 0.0025 # percentage of transactions that are fraudulent
NUMBER_OF_FRAUDULENT_TRANSACTIONS = int(FRAUD_RATIO * TOTAL_UNIQUE_TRANSACTIONS)
ATTACK_CHAIN_LENGTHS = [3, 4, 5, 6, 7, 8, 9, 10]

In [46]:
visited = set()
chains = defaultdict(list)

In [47]:
def size(chains: dict) -> int:
    counts = {key: len(values)+1 for (key, values) in chains.items()}
    return sum(counts.values())

In [48]:
def create_attack_chain(i: int):
    chain_length = random.choice(ATTACK_CHAIN_LENGTHS)
    for j in range(1, chain_length):
        if i+j not in visited:
            if size(chains) == NUMBER_OF_FRAUDULENT_TRANSACTIONS:
                break
            chains[i].append(i+j)
            visited.add(i+j)

In [49]:
while size(chains) < NUMBER_OF_FRAUDULENT_TRANSACTIONS:
    i = random.choice(range(TOTAL_UNIQUE_TRANSACTIONS))
    if i not in visited:
        create_attack_chain(i)
        visited.add(i)

In [50]:
assert size(chains) == NUMBER_OF_FRAUDULENT_TRANSACTIONS

#### Modify Transactions with Fraud Chain Attacks 

In [51]:
def generate_timestamps_for_fraud_attacks(timestamp: str, chain_length: int) -> list:
    timestamps = []
    timestamp = datetime.datetime.strptime(timestamp, DATE_FORMAT)
    for _ in range(chain_length):
        # interval in seconds between fraudulent attacks
        delta = random.randint(30, 120)
        current = timestamp + datetime.timedelta(seconds=delta)
        timestamps.append(current.strftime(DATE_FORMAT))
        timestamp = current
    return timestamps 

In [52]:
def generate_amounts_for_fraud_attacks(chain_length: int) -> list:
    amounts = []
    for percentage, span in distribution_percentages.items():
        n = math.ceil(chain_length * percentage)
        start, end = span
        for _ in range(n):
            amounts.append(get_random_transaction_amount(start, end+1))
    return amounts[:chain_length]

In [53]:
for key, chain in chains.items():
    transaction = transactions[key]
    timestamp = transaction['datetime']
    cc_num = transaction['cc_num']
    amount = transaction['amount']
    transaction['fraud_label'] = 1
    inject_timestamps = generate_timestamps_for_fraud_attacks(timestamp, len(chain))
    inject_amounts = generate_amounts_for_fraud_attacks(len(chain))
    random.shuffle(inject_amounts)
    for i, idx in enumerate(chain):
            original_transaction = transactions[idx]
            inject_timestamp = inject_timestamps[i]
            original_transaction['datetime'] = inject_timestamp
            original_transaction['fraud_label'] = 1
            original_transaction['cc_num'] = cc_num
            original_transaction['amount'] = inject_amounts[i]
            original_transaction['tid'] = generate_transaction_id(inject_timestamp, cc_num, amount)
            transactions[idx] = original_transaction

## Log fraudulent transactions into Kafka

In [54]:
i = 0
for transaction in transactions:
    if i % 2000 == 0:
        print(json.dumps(transaction))
    producer.produce(KAFKA_TOPIC_NAME, json.dumps(transaction))
    producer.flush()    
    i += 1

{"tid": "40543f2f6258bbe60237c8dab7293f40", "datetime": "2021-07-03 00:01:17", "cc_num": "4954603686205039", "amount": 67.79}
{"tid": "75b1097130bad9c6a5442108bcd165cf", "datetime": "2021-07-04 02:26:19", "cc_num": "4590824025018285", "amount": 1.54}
{"tid": "455e036fbbf1dc14f702239d06e0546b", "datetime": "2021-07-05 05:35:30", "cc_num": "4222541624502388", "amount": 42.87}
