In [1]:
import hopsworks
import json
import pandas as pd

from confluent_kafka import Producer

In [2]:
project = hopsworks.login()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://snurran.hops.works/p/15479


In [3]:
# create kafka topic
KAFKA_TOPIC_NAME = "real_time_live_transactions"
SCHEMA_NAME = "live_transactions_schema"

kafka_api = project.get_kafka_api()

schema = {
    "type": "record",
    "name": SCHEMA_NAME,
    "namespace": "ai.hopsworks.examples.bytewax.fraud",
    "fields": [
        {
            "name": "tid",
            "type": [
                "null",
                "string"
            ]
        },
        {
            "name": "timestamp",
            "type": [
                "null",
                {
                    "type": "long",
                    "logicalType": "timestamp-micros"
                }
            ]
        },
        {
            "name": "cc_num",
            "type": [
                "null",
                "long"
            ]
        },
        {
            "name": "amount",
            "type": [
                "null",
                "double"
            ]
        },
    ]
}

kafka_api.create_schema(SCHEMA_NAME, schema)
kafka_api.create_topic(KAFKA_TOPIC_NAME, SCHEMA_NAME, 1, replicas=1, partitions=1)

KafkaTopic('real_time_live_transactions')

In [4]:
# setup kafka producer
kafka_config = kafka_api.get_default_config()

print(kafka_config)
producer = Producer(kafka_config)

{'security.protocol': 'SSL', 'ssl.ca.location': '/tmp/ca_chain.pem', 'ssl.certificate.location': '/tmp/client_cert.pem', 'ssl.key.location': '/tmp/client_key.pem', 'client.id': 'jupyter-cc-fraud--jim00000-6dc96c9d79-c74vp', 'group.id': 'my-group-id', 'ssl.endpoint.identification.algorithm': 'none', 'bootstrap.servers': '192.168.1.25:9091'}


In [6]:
transactions_pdf = pd.read_csv("../data_generation/historical_transactions.csv")

In [7]:
transactions_pdf

Unnamed: 0.1,Unnamed: 0,tid,datetime,cc_num,category,amount,latitude,longitude,city,country,fraud_label
0,0,4aa7ba7a5bcc3493152a33aa73a2f17f,2023-12-28 23:14:45,1766230b4f7602f856ffc82ef0a0ffd1,Grocery,78.31,36.025060,-86.779170,Brentwood Estates,US,0
1,1,1d6efad95e92a86e9671c196365277a0,2023-12-28 23:16:45,c30018ba5ca059426ca40184e3b4d833,Grocery,77.14,33.410120,-91.061770,Greenville,US,0
2,2,676cbe21703f5e05b2291369c559f71e,2023-12-28 23:21:02,818951f6e43105a228bddc51a19d8e28,Grocery,13.92,33.036990,-117.291980,Encinitas,US,0
3,3,3453b68fb35e454fcf792c4f5fddb093,2023-12-28 23:25:16,4fd1d53696a07c807e56c6af5c195430,Grocery,33.05,29.845760,-90.106740,Estelle,US,0
4,4,847e5d19ab8bcb8e91c2d174223a9ca5,2023-12-28 23:35:08,3d66cf2dc257be18dbed42327f64f753,Clothing,65.18,40.605380,-73.755130,Far Rockaway,US,0
...,...,...,...,...,...,...,...,...,...,...,...
71317,71317,a08ade3a4a50207271118c89345cd036,2024-04-26 22:36:17,39ccf08351228fbc6073640bdfa58016,Cash Withdrawal,704.83,40.718469,-73.926426,Greenpoint,US,0
71318,71318,12a6cdb78aa929b28b034087ecfcba26,2024-04-22 22:36:17,39ccf08351228fbc6073640bdfa58016,Cash Withdrawal,50.53,40.723734,-73.925561,Greenpoint,US,0
71319,71319,d17a2f8dcab315bd778bd3996ff069fb,2024-04-18 22:36:17,39ccf08351228fbc6073640bdfa58016,Cash Withdrawal,61.86,40.722977,-73.926932,Greenpoint,US,0
71320,71320,95e1e62e4a13ae964a98ebbe8409f98f,2024-04-14 22:36:17,39ccf08351228fbc6073640bdfa58016,Cash Withdrawal,412.33,40.713166,-73.921838,Greenpoint,US,0


In [8]:
data = json.loads(transactions_pdf.to_json(orient="records"))

In [9]:
data[1]

{'Unnamed: 0': 1,
 'tid': '1d6efad95e92a86e9671c196365277a0',
 'datetime': '2023-12-28 23:16:45',
 'cc_num': 'c30018ba5ca059426ca40184e3b4d833',
 'category': 'Grocery',
 'amount': 77.14,
 'latitude': 33.41012,
 'longitude': -91.06177,
 'city': 'Greenville',
 'country': 'US',
 'fraud_label': 0}

In [10]:
batch_size = 0
for t in data:
    producer.produce(KAFKA_TOPIC_NAME, json.dumps(t))
    batch_size += 1
    
    if batch_size == 10:
        producer.flush()
        batch_size = 0