<div>
<img src=https://www.institutedata.com/wp-content/uploads/2019/10/iod_h_tp_primary_c.svg width="300">
</div>

# Lab 10.4 - Analysing Streaming Data with Kafka

## Introduction

**Note**: this notebook is to be run in Google Colab on your Google Drive. It will not work locally on your computer.

The purpose of this lab is to gain experience in working with streaming data, using Apache Kafka installed on Google Colaboratory (Colab). You will see how a simple streaming dashboard data is created using the Dash library and simulate live model scoring in monitoring a machine learning model.

Apache Kafka is an open-source distributed publish-subscribe messaging system that maintains streams of messages in topics. It has become a highly popular streaming platform for real-time applications.

In Google Colab, a virtual machine is automatically set up to execute your code. The maximum lifetime of such a machine is 12 hours. Note that notebooks will be disconnected from virtual machines if left idle. If this happens simple click on the Connect button to reconnect. If the kernel needs to be restarted (via the Runtime menu), variables may be lost but packages would not need to be reinstalled unless a new machine is assigned.

https://research.google.com/colaboratory/faq.html

Sign into colab.research.google.com and choose the Upload tab and upload this notebook.  This will automatically create a folder called "Colab Notebooks" in your Google Drive (if it does not already exist). Next upload the dataset **"fraud_data.csv"** into this "Colab Notebooks" folder by going firstly to drive.google.com.

## Setup

The following code connects your Google Drive to this notebook. A new window will open to prompt you to allow the connection to occur.

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


Upon running the following cell you should see the name of this notebook and fraud_data.csv.

In [None]:
!ls "/content/gdrive/My Drive/Colab Notebooks/"

fraud_data.csv


Next download and install Kafka:

In [None]:
!curl -sSOL https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
!tar -xzf kafka_2.13-3.7.0.tgz


gzip: stdin: not in gzip format
tar: Child returned status 1
tar: Error is not recoverable: exiting now


The kafka-python library will provide a Python-like interface to the Kafka platform:

In [None]:
!pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/246.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


Finally dash will be used for interactive plotting in this notebook:

In [None]:
!pip install dash

In [None]:
import time
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns

from kafka import KafkaProducer
from kafka import KafkaConsumer

from dash import Dash
from dash import dcc
from dash import html
from dash.dependencies import (Input, Output)
import plotly.graph_objs as go
from plotly.subplots import make_subplots

import re
import warnings
warnings.filterwarnings('ignore')

We use the following shell command to run Zookeeper and Kafka.

Apache ZooKeeper is used in distributed systems for service synchronisation, tracking the status of nodes in the Kafka cluster and maintaining a list of Kafka topics and messages.

In [None]:
!./kafka_2.13-3.7.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.7.0/config/zookeeper.properties
!./kafka_2.13-3.7.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.7.0/config/server.properties
!echo "Waiting until zookeeper and kafka services are ready"
!sleep 10

## Loading the dataset

The dataset fraud_data.csv represents credit card transactions which have been labelled as fraudulent or non-fraudulent. It is synthetically generated and is based on the dataset at https://www.kaggle.com/kartik2112/fraud-detection. Some of the features based on transactions in the past 24 hours or 6 months have been engineered from the raw data.

- trans_datetime - date and time of the transaction
- cc_num - credit card number of the customer
- merchant - name of the merchant to which the customer is paying
- amt: amount of the transaction in $
- first: first name of the customer
- last: last name of the customer
- gender: gender of the customer
- street, city, state: address of the customer
- zip: zip code of the transaction
- lat: latitude of the customer
- long: longitude of the customer
- city_pop: population of the city where the customer is living
- job, age: job and age of the customer
- num_trans_60d, num_trans_24h: number of transactions by this credit card in the past 60 days, 24 hours respectively
- num_fraud_trans_24h: number of fraudulent transactions by this credit card in the past 24 hours
- avg_trans_amt_60d: the average number of transactions by this credit card in the past 60 days
- is_fraud: whether the transaction is fraud or not (1 - fraud, 0 - not fraud)

In [None]:
df = pd.read_csv('/content/gdrive/My Drive/Colab Notebooks/fraud_data.csv')
df.head()

## Perform EDA

**Exercise**: Perform some exploratory data analysis on the df dataframe.

**Question**: How balanced is this dataset?

In [None]:
# ANSWER


Correlation matrix:

In [None]:
df_numerical = df[['amount', 'lat', 'lon', 'city_pop', 'age', 'num_trans_60d',
                   'num_trans_24h', 'num_fraud_trans_24h', 'avg_trans_amt_60d',
                   'is_fraud']]

In [None]:
colormap = plt.cm.coolwarm
plt.figure(figsize = (10, 10))
plt.title('Pearson Correlation of Features', size = 15)
sns.heatmap(df_numerical.astype(float).corr(),
            linewidths = 0.1,
            vmax = 1.0,
            square = True,
            cmap = colormap,
            linecolor = 'white',
            annot = True)
plt.show()

We see that amount, num_trans_60d, num_fraud_trans_24h, and num_trans_avg_amt_60d have the strongest correlation with the target variable is_fraud.

Since the dataset is large, we sample 1000 rows to obtain a pairplot.

In [None]:
sns.pairplot(df_numerical.sample(1000, random_state=0))
plt.show()

## Create topics

Kafka records are stored in *topics* which can be thought of as data feeds that one can subscribe to.

We shall create two topics to the Kafka platform.  

- One with customer information (called *customerinfo*)
- One with predictors that can be used for a fraud prediction algorithm (called *features*)

In [None]:
!./kafka_2.13-3.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic customerinfo
!./kafka_2.13-3.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic features


View their details:

In [None]:
!./kafka_2.13-3.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic customerinfo
!./kafka_2.13-3.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic features

Next we create the records for these topics. The records need to be a list of key-value pairs. Here we will make the key the datetime (timestamp) of the transaction.

In [None]:
timestamps = list(df['trans_datetime'][90000:].to_csv(index=False).split("\n"))
X_kafka = list(df.drop(['trans_datetime'],axis=1)[90000:].to_csv(index=False).split("\n"))


In [None]:
predictors = ['amount', 'num_trans_60d', 'num_fraud_trans_24h', 'avg_trans_amt_60d']
featuredata = list(df[predictors + ['is_fraud']][90000:].to_csv(index=False).split("\n"))

In [None]:
X_kafka

In [None]:
featuredata

The following helper functions will help write messages into each topic.

In [None]:
def error_callback(exc):
    raise Exception('Error while sending data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for key, message in items:
    producer.send(topic_name, key=key.encode('utf-8'),
                  value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

To start with we write 5 messages to each topic.

In [None]:
write_to_kafka("customerinfo", zip(timestamps[1:6], X_kafka[1:6]))
write_to_kafka("features", zip(timestamps[1:6], featuredata[1:6]))

The zip function pairs up the timestamps with data into tuples.

Run the following shell command to view one of the topics. You will need to interrupt execution of the cell after the five messages have appeared.

In [None]:
! /content/kafka_2.13-3.7.0/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic features \
--from-beginning

## Subscribing to topics
To start with we subscribe to both topics with the same consumer.

In [None]:
kafka_bootstrap_servers = 'localhost:9092'
topics = ['customerinfo', 'features']

In [None]:
consumer = KafkaConsumer(
    *topics,
    group_id = '104a',
    bootstrap_servers=kafka_bootstrap_servers,
    auto_offset_reset='earliest',
    enable_auto_commit=True)

In [None]:
consumer.config

The following cell shows the message, key and value of records obtained by the consumer. Stop running of the cell once ten messages have been displayed.

In [None]:
for message in consumer:
  print("message: ", message)
  print("key: ", message.key)
  print("value: ", message.value.decode('utf-8'))
  time.sleep(2)

**Question**: In which order have the records come into the consumer and are the records sorted within each topic?

**Answer**:

For the next section we delete and recreate our topics, this time each with one partition.

In [None]:
!./kafka_2.13-3.7.0/bin/kafka-topics.sh --delete --bootstrap-server 127.0.0.1:9092 --topic customerinfo
!./kafka_2.13-3.7.0/bin/kafka-topics.sh --delete --bootstrap-server 127.0.0.1:9092 --topic features


In [None]:
!./kafka_2.13-3.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic customerinfo
!./kafka_2.13-3.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic features


In [None]:
!./kafka_2.13-3.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic customerinfo
!./kafka_2.13-3.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic features

## Creating a map showing the incoming stream

This time we write more messages to each topic.

In [None]:
write_to_kafka("customerinfo", zip(timestamps[1:], X_kafka[1:]))

**Exercise**: write featuredata[1:] to the topic "features"

In [None]:
# ANSWER:


In [None]:
consumer = KafkaConsumer(
    'customerinfo',
    group_id = '104a',
    bootstrap_servers=kafka_bootstrap_servers,
    auto_offset_reset='earliest',
    enable_auto_commit=True)

Run the following cell to see if the first 20 latitude and longitude values are accessible.

In [None]:
df_received = pd.DataFrame(columns = ['lat', 'lon'])
count = 0
for message in consumer:
  count = count + 1
  x = re.split(r",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", message.value.decode('utf-8'))
  #source: https://stackoverflow.com/questions/18893390/splitting-on-comma-outside-quotes
  #this regular expression allows one to split the record only on commas occurring outside quote
  #characters
  df_received = pd.concat([df_received, pd.DataFrame({'lat': [x[11]], 'lon': [x[12]]})], ignore_index=True)
  print(f"lat = %s, lon = %s" %  (x[11], x[12]))
  time.sleep(1)
  if count > 20:
    break

Another way of getting records from the consumer is through the poll method:

In [None]:
latestrecord = consumer.poll(max_records = 1)

In [None]:
latestrecord # one record should appear, if not rerun the above cell

In [None]:
records = consumer.poll(max_records = 3)
for tp, consumer_records in records.items():
    for consumer_record in consumer_records:
        print("offset: ", consumer_record.offset, "record value: ",
              consumer_record.value)

The following function obtains the latest record from *consumer*.

In [None]:
def get_latest_record():
    global consumer

    latestrecord = consumer.poll(max_records = 1)
    for tp, consumer_record in latestrecord.items():
      for message in consumer_record:
        x = re.split(r",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", message.value.decode('utf-8'))

        y = message.key.decode('utf-8')

    return [y] + x


**Exercise**: run get_latest_record() a couple of times to verify that different records are obtained from the consumer each time.

Reinitialising the customerinfo *consumer*:

In [None]:
consumer = KafkaConsumer(
    'customerinfo',
    group_id = '104a',
    bootstrap_servers=kafka_bootstrap_servers,
    auto_offset_reset='earliest',
    enable_auto_commit=True)

The following cell creates a dashboard app to display a map showing location of each customer Some additional data appears when hovering over a dot on the map.

In [None]:
try:
  del(trans_info)
except:
  pass

trans_info = pd.DataFrame(columns = ['time', 'merchant', 'category', 'amount',
                                      'city', 'lat', 'lon', 'text'])

app = Dash(__name__)

app.layout = html.Div([
    dcc.Graph(id='demo-live'),
    # every two seconds the layout updates:
    dcc.Interval(id='output-update', interval=2*1000)
])

@app.callback(
    Output(component_id='demo-live', component_property='figure'),
    [Input(component_id='output-update', component_property='n_intervals')]
)
def get_live_updates(n_intervals):
    global trans_info
    newrow = get_latest_record() #newrow is a list

    trans_info = pd.concat([trans_info, pd.DataFrame({
        'time': [newrow[0]],
        'merchant': [newrow[2]],
        'category': [newrow[3]],
        'amount': [newrow[4]],
        'city': [newrow[9]],
        'lat': [newrow[12]],
        'lon': [newrow[13]],
        'text': ['Time: ' + newrow[0] + '<br>Merchant: ' + newrow[2] \
        + '<br>Category: ' + newrow[3]\
        + '<br>Amount: ' + newrow[4] + '' + '<br>City: '\
        + newrow[9]]
        })], ignore_index=True)

    time.sleep(1)
    df2 = trans_info.copy()

    data=go.Scattergeo(
          lon = df2['lon'],
          lat = df2['lat'],
          text = df2['text'],
          locationmode = 'USA-states',
          mode = 'markers',
        )

    layout = go.Layout(
          autosize=False,
          width=780,
          height=500,
          margin=dict(l=20, r=20, t=20, b=20),
          paper_bgcolor='lightblue',
          geo_scope='usa',
          title_text='Locations of Transactions'
        )
    fig = {'data' : [data], 'layout' : layout}

    return fig



Upon running the following code it may take a few moments for a map to appear. Hover your mouse over any of the dots to reveal more information. Aim to understand the workings of the previous cell.

In [None]:
app.run() # run after a few seconds

While the app is running, run the following cell several times to verify that data is streaming into the consumer.

In [None]:
trans_info.tail()

If there's a need to restart the consumer (e.g. a heartbeat failed warning appears because it is rebalancing), one can run consumer.close() and then recreate the consumer.

## Live predictions

Finally we use the features topic to make fraud predictions on the fly with a trained classification model.

We use the first 90000 records of df as the training set.

In [None]:
X_train = df[predictors][:90000]
y_train = df['is_fraud'][:90000]

**Exercise**: Fit a classification model to X_train, y_train and score it. (For this lab it does not matter how well or poorly the model performs.)

In [None]:
# ANSWER


Subscribe to the topic containing the predictors and perform live predictions.

In [None]:
feature_consumer = KafkaConsumer(
    'features',
    group_id = '104b',
    bootstrap_servers=kafka_bootstrap_servers,
    auto_offset_reset='earliest',
    enable_auto_commit=True)

In [None]:
def get_latest_customer_features():
    global feature_consumer

    latestrecord = feature_consumer.poll(max_records = 1)
    for tp, consumer_record in latestrecord.items():
      for message in consumer_record:
        x = re.split(r",", message.value.decode('utf-8'))
        y = message.key.decode('utf-8')

    return [y] + x

In [None]:
get_latest_customer_features() #may need to wait a few moments before the consumer is ready

The following is a function to be used in our next dashboard to output the type of predicted result (True/False Positives, True/False Negatives):

In [None]:
def prediction_result(predicted_outcome, actual_outcome):
  # predicted   actual    output
  #    0           0        TN
  #    0           1        FN
  #    1           0        FP
  #    1           1        TP

  key = predicted_outcome*2 + actual_outcome
  mappingdict = {0: 'TN', 1: 'FN', 2: 'FP', 3: 'TP'}
  return mappingdict[key]

In [None]:
try:
  del(df_preds)
except:
  pass
df_preds = pd.DataFrame(columns = ['datetime'] + predictors +
                        ['predicted_output', 'actual_output', 'result'])

**Exercise**: The next cell creates a dashboard showing predictions made live. Fill in the missing code to generate predicted output based on your trained model.

In [None]:
app = Dash(__name__)

app.layout = html.Div([
    dcc.Graph(id='demo-live'),
    ## for every 2 secs the layout updates
    dcc.Interval(id='output-update', interval=2*1000)
])

@app.callback(
    Output(component_id='demo-live', component_property='figure'),
    [Input(component_id='output-update', component_property='n_intervals')]
)
def get_live_updates(n_intervals):
    global df_preds
    newrow = get_latest_customer_features()
    predicted_output = int(#--complete code--#)
    df_preds = pd.concat([df_preds, pd.DataFrame({'datetime': [newrow[0]],
                               'amount': [newrow[1]],
                               'num_trans_60d': [newrow[2]],
                               'num_fraud_trans_24h': [newrow[3]],
                               'avg_trans_amt_60d': [newrow[4]],
                               'predicted_output': [predicted_output],
                               'actual_output': [newrow[5]],
                               'result': [prediction_result(int(predicted_output),
                                                           int(newrow[5]))]})],
                               ignore_index=True)

    time.sleep(1)
    df2 = df_preds.copy()
    last20 = df2.tail(20)

    fig = make_subplots(
      rows=2, cols=2,
      row_heights=[0.1, 0.9],
      column_widths=[0.5, 0.5],
      specs=[[{"type": "Table", "colspan": 2}, None],
             [{"type": "Histogram"}, {"type": "Table"}]]
             )

    fig.add_trace(
        go.Table(
            header=dict(
                values=["False Positive Count", "False Negative Count"],
                font=dict(size=14),
                align="center"
            ),
            cells=dict(
                values=[sum(df_preds['result'] == 'FP'),
                        sum(df_preds['result'] == 'FN')],
            align = "center"),
            columnwidth = [.5, .5]
        ),
        row=1, col=1
    )

    fig.add_trace(
      go.Histogram(x=df2['amount'].astype(float),
                 xbins = dict(start = 0, end=500, size=20)
                 ),
      row=2, col=1
    )
    fig.update_xaxes(title_text="Amount Spent", row=2, col=1)
    fig.update_yaxes(title_text="Count", row=2, col=1)

    fig.add_trace(
      go.Table(
        header=dict(
            values=["Datetime", "Predicted Outcome", "Actual Outcome",
                    "Result"],
            font=dict(size=14),
            align="center"
        ),
        cells=dict(
            values=[last20['datetime'], last20['predicted_output'],
                              last20['actual_output'], last20['result']],
            align = "center"),
        columnwidth = [.27, .27, .27, .19]
      ),
    row=2, col=2
    )
    fig.update_layout(
    height=800,
    showlegend=False,
    title_text="Live Results",
    )
    return fig

In [None]:
app.run()

**Exercise**: While the app is running verify that df_preds is being modified.

## Conclusion
We have implemented a simple Kakfa streaming platform to which we published to topics - one with customer information, the other a subset of that to make fraud predictions. With the first topic we generated a live map showing the locations of customers, in the other we created a live predictive model scoring dashboard.



---



---



> > > > > > > > > © 2024 Institute of Data


---



---



