# FIL and Streamz
This is a guide on how [RAPIDS FIL (Forest Inference Library)](https://medium.com/rapids-ai/accelerating-random-forests-up-to-45x-using-cuml-dfb782a31bea) and [Streamz](https://streamz.readthedocs.io/en/latest/) can be used to build a streaming pipeline. In this example we use [IoT network traffic](https://www.stratosphereips.org/datasets-iot23).

Streamz has the ability to read from Kafka directly into [Dask](https://dask.org/) allowing for computation on a multi-core or cluster environment. This approach is best used for instances in which you hope to increase processing speeds with streaming data.

Here we share an example in which we demonstrate how to read connection log data from Kafka, run predictions using FIL and publish result data back to Kafka. To execute this notebook you will need to connect to an instance of Kafka. You can visit the [Apache Kafka Quick Start Guide](https://docs.confluent.io/current/quickstart/index.html?utm_medium=sem&utm_source=google&utm_campaign=ch.sem_br.nonbrand_tp.prs_tgt.kafka_mt.xct_rgn.namer_lng.eng_dv.all&utm_term=quickstart%20kafka&creative=&device=c&placement=&gclid=EAIaIQobChMInKfy66zL7AIVxRx9Ch2wKAmdEAAYASAAEgKEf_D_BwE#) to learn how to set up Kafka in your environment.

## Data Download

In [None]:
# Download sample data - this may take some time to download
!wget https://mcfp.felk.cvut.cz/publicDatasets/IoT-23-Dataset/iot_23_datasets_small.tar.gz

In [None]:
# Extract data
!tar -xzf iot_23_datasets_small.tar.gz

With our kafka broker already running at `localhost:9092` and input kafka topic created, next we ingest our sample data into our topic named `input`. 

In [None]:
# To load the data into kafka use the command line tool kafka-console-producer provided by your kafka installation. In this example kafka is installed at /opt/kafka.
# Update the broker-list and topic parameters as needed
!/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input < opt/Malware-Project/BigDataset/IoTScenarios/CTU-IoT-Malware-Capture-1-1/bro/conn.log.labeled

# Imports

In [1]:
import random
import confluent_kafka as ck
import cudf
import dask
from dask_cuda import LocalCUDACluster
from distributed import Client
from streamz import Stream
import time

# Parameters

The average log size is used later in the notebook to estimate throughput and avg batch size bencmarks for streaming 

In [2]:
# Benchmark
avg_log_size=0.147 # in kilobytes

Provide the filepath of your FIL model

In [3]:
# FIL
model_file="/path/to/model"

Kafka parameters

In [4]:
# Kafka
broker="localhost:9092"
input_topic="input"
output_topic="output"


producer_conf = {
    "bootstrap.servers": broker,
    "session.timeout.ms": 10000,
}

# Dask

Next, create your dask cuda cluster and initialize each dask worker with the FIL model referenced above

In [5]:
#Start dask
cluster = LocalCUDACluster()
client = Client(cluster)

In [6]:
def worker_init():
    # Initialization for each dask worker
    from cuml import ForestInference
    worker = dask.distributed.get_worker()
    worker.data["fil_model"] = ForestInference.load(filename=model_file,
                          algo='BATCH_TREE_REORG',
                          output_class=True,
                          threshold=0.50,
                          model_type='xgboost')
    worker.data["data_columns"] = ["ts","uid","id.orig_h","id.orig_p","id.resp_h","id.resp_p","proto","service","duration",
               "orig_bytes","resp_bytes","conn_state","local_orig","local_resp","missed_bytes","history",
                  "orig_pkts","orig_ip_bytes","resp_pkts","resp_ip_bytes","label"]
    worker.data["data_types"] = {"ts":"float64",
            "uid":"object",
            "id.orig_h":"object",
            "id.orig_p":"int64",
            "id.resp_h":"object",
            "id.resp_p":"int64",
            "proto":"object",
            "service":"object",
            "duration":"object",
            "orig_bytes":"object",
            "resp_bytes":"object",
            "conn_state":"object",
            "local_orig":"object",
            "local_resp":"object",
            "missed_bytes":"int64",
            "history":"object",
            "orig_pkts":"int64",
            "orig_ip_bytes":"int64",
            "resp_pkts":"int64",
            "resp_ip_bytes":"int64",
            "label":"object"}

In [7]:
client.run(worker_init)

{'tcp://127.0.0.1:37781': None,
 'tcp://127.0.0.1:38499': None,
 'tcp://127.0.0.1:38895': None,
 'tcp://127.0.0.1:39389': None,
 'tcp://127.0.0.1:39429': None,
 'tcp://127.0.0.1:41401': None,
 'tcp://127.0.0.1:43393': None,
 'tcp://127.0.0.1:44817': None}

In [8]:
print(client)

<Client: 'tcp://127.0.0.1:46295' processes=8 threads=8, memory=540.94 GB>


# Streamz Pipeline

Update the `max_batch_size` and `poll_interval` parameters as needed to tune your streamz workload to suit your environment

In [9]:
max_batch_size=1000000
poll_interval="2s"

In [10]:
# Generate a unique group_id to be able to re-run this demo notebook on the same data loaded to your kafka topic.
j = random.randint(0,10000)
group_id="fil-group-%d" % j

# Kafka consumer configuration
consumer_conf = {
    "bootstrap.servers": broker,
    "group.id": group_id,
    "session.timeout.ms": "60000",
    "enable.partition.eof": "true",
    "auto.offset.reset": "earliest",
}

# Stream source
source = Stream.from_kafka_batched(
        input_topic,
        consumer_conf,
        poll_interval=poll_interval,
        npartitions=1,
        asynchronous=True,
        dask=True,
        max_batch_size=max_batch_size

)

Next, we define the `predict` function to be used in the streamz pipeline. The predict function will construct a GPU dataframe of the raw log messages from kafka, format the data and then execute a prediction using the FIL model we previously loaded into Dask.

In [11]:
def predict(messages):
    batch_start_time = int(round(time.time()))
    worker = dask.distributed.get_worker()
    df = cudf.DataFrame()
    if type(messages) == str:
       df["stream"] = [messages.decode("utf-8")]
    elif type(messages) == list and len(messages) > 0:
       df["stream"] = [msg.decode("utf-8") for msg in messages]
    else:
       print("ERROR: Unknown type encountered in inference")
    df_conn = df['stream'].str.split('\t')
    df_conn.columns = worker.data["data_columns"]
    df_conn=df_conn.astype(worker.data["data_types"])
    fil_preds = worker.data["fil_model"].predict(df_conn[["orig_pkts", "orig_ip_bytes", "resp_pkts", "resp_ip_bytes"]])
    size = len(fil_preds)
    return (fil_preds, batch_start_time, size)

The `sink_to_kafka` function writes the output data or FIL predictions to the previously defined kafka topic.

In [12]:
def sink_to_kafka(processed_data):
    producer = ck.Producer(producer_conf)
    json_str = processed_data[0].to_json(orient="records", lines=True)
    json_recs = json_str.split("\n")
    num_recs = len(json_recs)
    for idx,rec in enumerate(json_recs):
        if idx % 50000 == 0:
            producer.flush()
        producer.produce(output_topic, rec)
    producer.flush()
    return processed_data

Below we define our streamz pipeline. This pipeline is also designed to capture benchmark data for reading and processing FIL predictions. 

In [13]:
output = source.map(predict).map(lambda x: (x[0], x[1], int(round(time.time())), x[2])).map(sink_to_kafka).gather().sink_to_list()

Next we start the streamz pipeline. View the progress on your dask dashboard http://localhost:8787

In [14]:
source.start()

This function calculates the benchmark. With each batch of data processed we have recorded the start and stop times that we can then use to calculate the total time difference. Throughput and avg batch size are estimates based on the average log size previously defined.

In [15]:
def calc_benchmark(results, size_per_log):
    t1 = int(round(time.time() * 1000))
    t2 = 0
    size = 0.0
    batch_count = 0
    cnt = 0
    # Find min and max time while keeping track of batch count and size
    for result in results:
        (ts1, ts2, result_size) = (result[1], result[2], result[3])
        cnt += result_size
        if ts1 == 0 or ts2 == 0:
            continue
        batch_count = batch_count + 1
        t1 = min(t1, ts1)
        t2 = max(t2, ts2)
        size += result_size * size_per_log
    time_diff = t2 - t1
    throughput_mbps = size / (1024.0 * time_diff) if time_diff > 0 else 0
    avg_batch_size = size / (1024.0 * batch_count) if batch_count > 0 else 0
    return (time_diff, throughput_mbps, avg_batch_size, cnt)

Please wait a few moments for all logs to be processed before calculating benchmarks  
View the progress on the dask dashboard http://localhost:8787

In [17]:
benchmark = calc_benchmark(output, avg_log_size)
print("max batch size:", max_batch_size)
print("poll interval:", poll_interval)
print("time (s):", benchmark[0])
print("throughput (mb/s):", benchmark[1])
print("avg batch size (mb):", benchmark[2])
print("num records:", benchmark[3])

max batch size: 1000000
poll interval: 2s
time (s): 48
throughput (mb/s): 72.405251953125
avg batch size (mb): 139.01808375000002
num records: 24209952


This end-to-end demonstration of FIL is intended to be able to optimize your data processing pipeline by utilizing the GPU. In this example, we've been able to process over 500,000 logs/s on 8 GPUs (Tesla V100). We hope to expand more in the future by integrating [cuStreamz](https://medium.com/rapids-ai/gpu-accelerated-stream-processing-with-rapids-f2b725696a61). 