In [None]:
import os
import sys
import json
import time
import argparse
import requests
import datetime

from simulator.shared import read_from_files


In [None]:
START = '2020-04-01'
END = '2020-04-02'

BRIDGE = 'https://transaction-bridge-http-fsi-fraud-detection.apps.cluster-hxjfg.hxjfg.sandbox899.opentlc.com'

In [None]:
def upload_transactions(bridge, topic='tx-inbox', start='2020-04-01', end='2020-04-02', loc='./data/simulated/pkl/', batch_size=100):

    KAFKA_ENDPOINT = f"{bridge}/topics/{topic}"
    KAFKA_HEADERS = {'content-type': 'application/vnd.kafka.json.v2+json'}

    # read the raw transaction data
    transactions_df = read_from_files(loc, start, end)

    # remove the TX_FRAUD,TX_FRAUD_SCENARIO columns in order to simulate a 'new' transaction
    transactions_df = transactions_df.drop(
        ['TX_FRAUD', 'TX_FRAUD_SCENARIO'], axis=1)

    NUM_TX = len(transactions_df)

    batch = []

    for index, row in transactions_df.iterrows():
        batch.append(row)

        if len(batch) % batch_size == 0:
            payload = {"records": []}

            for r in batch:
                record = {'value': r.to_json()}
                payload['records'].append(record)

            # post the payload with backoff/retry in case the bridge gets overloaded ...
            try:
                success = False
                retry = 0

                while not success:
                    r = requests.post(
                        KAFKA_ENDPOINT, headers=KAFKA_HEADERS, json=payload)
                    if r.status_code == 200:
                        success = True
                    else:
                        retry = retry + 1
                        if retry > 5:
                            print('aborting...')
                            sys.exit()
                        time.sleep(retry * 2)
                        print(f"backing-off/retry {retry}/5")
            except:
                print('exception/aborting...')
                sys.exit()

            batch = []
            print(f" --> uploaded {index+1}/{NUM_TX}")


In [None]:
print(f" --> Replaying transactions from {START} to {END}")

upload_transactions(BRIDGE, start=START, end=END)

print(" --> DONE.")