# MinIO Kafka Connector


In the previous [Notebook](setup-kafka.ipynb) we saw how to setup Kafka in a kubernetes cluster using Strimzi Opearator, in this notebook we will use Kafka connector to stream topics directly to MinIO. First let's look at what connectors are and how to set one up. Here is an high level overview of all the different Kafka Components and how they interact

![kafka_components](./img/kafka_components.png)

## Kafka Connectors

Kafka Connect is an integration toolkit for streaming data between Kafka brokers and other systems. The other system is typically an external data source or target, such as a MinIO.

Kafka Connect utilizes a plugin architecture to provide implementation artifacts for connectors, which are used for connecting to external systems and manipulating data. Plugins consist of connectors, data converters, and transforms. Connectors are designed to work with specific external systems and define a schema for their configuration. When configuring Kafka Connect, you supply the necessary configuration to create a connector instance within Kafka Connect, and connector instances then define a set of tasks for data movement between systems.

In the distributed mode of operation, Strimzi operates Kafka Connect by distributing data streaming tasks across one or more worker pods. A Kafka Connect cluster consists of a group of worker pods, with each connector instantiated on a single worker. Each connector can have one or more tasks that are distributed across the group of workers, enabling highly scalable data pipelines.

Workers in Kafka Connect are responsible for converting data from one format to another, making it suitable for the source or target system. Depending on the configuration of the connector instance, workers may also apply transforms, also known as Single Message Transforms (SMTs), which can adjust messages, such as filtering certain data, before they are converted. Kafka Connect comes with some built-in transforms, but additional transformations can be provided by plugins as needed.


Kafka Connect uses the following components while streaming data

* Connectors - create tasks
* Task - move data
* Workers - run tasks
* Transformers - manipulate data
* Converters - convert data

There are 2 types of Connectors

1. Source Connectors - push data into Kafka
2. Sink Connectors - extracts data from Kafka to external source like MinIO

In this Notebook we will focus on Sink Connector that extracts data from Kafka and stores it into MinIO as shown below

![kafka_sink_connector](./img/sink_connector_streaming_minio.png)


Sink Connector streams data from Kafka and goes through following steps

1. A plugin provides the implementation artifacts for the sink connector: In Kafka Connect, a sink connector is used to stream data from Kafka to an external data system. The implementation artifacts for the sink connector, such as the code and configuration, are provided by a plugin. Plugins are used to extend the functionality of Kafka Connect and enable connections to different external data systems.
2. A single worker initiates the sink connector instance: In a distributed mode of operation, Kafka Connect runs as a cluster of worker pods. Each worker pod can initiate a sink connector instance, which is responsible for streaming data from Kafka to the external data system. The worker manages the lifecycle of the sink connector instance, including its initialization and configuration.
3. The sink connector creates tasks to stream data: Once the sink connector instance is initiated, it creates one or more tasks to stream data from Kafka to the external data system. Each task is responsible for processing a portion of the data and can run in parallel with other tasks for efficient data processing.
4. Tasks run in parallel to poll Kafka and return records: The tasks created by the sink connector run in parallel to poll Kafka for new records. They retrieve records from Kafka topics and prepare them for forwarding to the external data system. The parallel processing of tasks enables high throughput and efficient data streaming.
5. Converters put the records into a format suitable for the external data system: Before forwarding the records to the external data system, converters are used to put the records into a format that is suitable for the specific requirements of the external data system. Converters handle data format conversion, such as from Kafka's binary format to a format supported by the external data system.
6. Transforms adjust the records, such as filtering or relabeling them: Depending on the configuration of the sink connector, transformations, also known as Single Message Transforms (SMTs), can be applied to adjust the records before they are forwarded to the external data system. Transformations can be used for tasks such as filtering, relabeling, or enriching the data to be sent to the external system.
7. The sink connector is managed using KafkaConnectors or the Kafka Connect API: The sink connector, along with its tasks, is managed using KafkaConnectors, or through the Kafka Connect API, which provides programmatic access for managing Kafka Connect. This allows for easy configuration, monitoring, and management of sink connectors and their tasks in a Kafka Connect deployment.

### Setup

We will create a simple example which will perform the following steps

1. Create a Producer that will stream data from MinIO and produce events for a topic in JSON format
2. Build a Kafka Connect Image that has S3 dependencies
3. Deploy the Kafka Connect based on the above image
4. Deploy Kafka sink connector that consumes kafka topic and stores the data MinIO bucket

#### Getting Demo Data into MinIO
We will be using the NYC Taxi dataset that is available on MinIO. If you don't have the dataset follow the instructions [here](../spark/spark-with-minio.ipynb#Getting-Demo-Data-into-MinIO)

### Producer

Below is a simple python code that consumes data from MinIO and produces events for the topic `my-topic`

In [12]:
%%writefile sample-code/producer/src/producer.py
import logging
import os

import fsspec
import pandas as pd
import s3fs

from kafka import KafkaProducer

logging.basicConfig(level=logging.INFO)

producer = KafkaProducer(bootstrap_servers="my-kafka-cluster-kafka-bootstrap:9092")

fsspec.config.conf = {
    "s3":
        {
            "key": os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"),
            "secret": os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"),
            "client_kwargs": {
                "endpoint_url": "https://play.min.io:50000"
            }
        }
}
s3 = s3fs.S3FileSystem()
total_processed = 0
i = 1
for df in pd.read_csv('s3a://openlake/spark/sample-data/taxi-data.csv', chunksize=1000):
    count = 0
    for index, row in df.iterrows():
        producer.send("my-topic", bytes(row.to_json(), 'utf-8'))
        count += 1
    producer.flush()
    total_processed += count
    if total_processed % 10000 * i == 0:
        logging.info(f"total processed till now {total_processed}")
        i += 1


Overwriting sample-code/src/producer.py


add requirements and Dockerfile based on which we will build the docker image

In [13]:
%%writefile sample-code/producer/requirements.txt
pandas
s3fs
pyarrow
kafka-python

Overwriting sample-code/requirements.txt


In [14]:
%%writefile sample-code/producer/Dockerfile
FROM python:3.11-slim

ENV PYTHONDONTWRITEBYTECODE=1

COPY requirements.txt .
RUN pip3 install -r requirements.txt

COPY src/producer.py .
CMD ["python3", "-u", "./producer.py"]

Overwriting sample-code/Dockerfile


Build and push the docker image for the producer using the above docker file into your docker registry or you can use the one available in openlake [openlake/kafka-demo-producer](https://hub.docker.com/r/openlake/kafka-demo-producer/tags)

Let's create a YAML that deploys our producer in kubernetes cluster as a job

In [83]:
%%writefile deployment/producer.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: producer-job
  namespace: kafka
spec:
  template:
    metadata:
      name: producer-job
    spec:
      containers:
      - name: producer-job
        image: openlake/kafka-demo-producer:latest
      restartPolicy: Never

Writing deployment/producer.yaml


Deploy the `producer.yaml` file

In [84]:
!kubectl apply -f deployment/producer.yaml

job.batch/producer-job created


You can check the logs by using the below command

In [24]:
!kubectl logs -f job.batch/producer-job -n kafka # stop this shell once you are done

<jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
<jemalloc>: (This is the expected behaviour if you are running under QEMU)
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=my-kafka-cluster-kafka-bootstrap:9092 <connecting> [IPv4 ('10.96.4.95', 9092)]>: connecting to my-kafka-cluster-kafka-bootstrap:9092 [('10.96.4.95', 9092) IPv4]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=my-kafka-cluster-kafka-bootstrap:9092 <connecting> [IPv4 ('10.96.4.95', 9092)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.conn:<BrokerConnection node_id=0 host=my-kafka-cluster-kafka-0.my-kafka-cluster-kafka-brokers.kafka.svc:9092 <connecting> [IPv4 ('10.244.1.4', 9092)]>: connecting to my-kafka-cluster-kafka-0.my-kafka-cluster-kafka-brokers.kafka.svc:9092 [('10.244.1.4', 

Now that we have our basic producer that is sending JSON events to `my-topic` let deploy the Kafka Connect and the corresponding Connector that stores these events into MinIO

### Build Kafka Connect Image

Lets build a kafka connect image that has S3 dependencies

In [27]:
%%writefile sample-code/connect/Dockerfile
FROM confluentinc/cp-kafka-connect:7.0.9 as cp
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.4.2
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:7.3.3
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
# Add S3 dependency
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/ /opt/kafka/plugins/kafka-connect-s3/
# Add Avro dependency
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/ /opt/kafka/plugins/avro/

Overwriting sample-code/connect/Dockerfile


Build and push the docker image for the producer using the above docker file into your docker registry or you can use the one available in openlake [openlake/kafka-connect:0.34.0](https://hub.docker.com/r/openlake/kafka-connect/tags)

Before we deploy the `KafkaConnect` we first need to create storage topcis if not already present for the KafkaConnect to work as expected.

### Create Storage Topics

Lets create `connect-status`, `connect-configs` and `connect-offsets` topics and deploy them as shown below

In [72]:
%%writefile deployment/connect-status-topic.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: connect-status
  namespace: kafka
  labels:
    strimzi.io/cluster: my-kafka-cluster
spec:
  partitions: 1
  replicas: 3
  config:
    cleanup.policy: compact

Writing deployment/connect-status-topic.yaml


In [73]:
%%writefile deployment/connect-configs-topic.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: connect-configs
  namespace: kafka
  labels:
    strimzi.io/cluster: my-kafka-cluster
spec:
  partitions: 1
  replicas: 3
  config:
    cleanup.policy: compact

Writing deployment/connect-configs-topic.yaml


In [74]:
%%writefile deployment/connect-offsets-topic.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: connect-offsets
  namespace: kafka
  labels:
    strimzi.io/cluster: my-kafka-cluster
spec:
  partitions: 1
  replicas: 3
  config:
    cleanup.policy: compact

Writing deployment/connect-offsets-topic.yaml


Deploy above topics

In [None]:
!kubectl apply -f deployment/connect-status-topic.yaml
!kubectl apply -f deployment/connect-configs-topic.yaml
!kubectl apply -f deployment/connect-offsets-topic.yaml

### Deploy Kafka Connect

Create a YAML file for Kafka Connect that uses the above image and deploy it in k8s. The KafkaConnect will have 1 replica and make use of ths storage topics that we created above.

NOTE: `spec.template.connectContainer.env` has the creds defiend in order for KafkaConnect to store data in Minio cluster, other details like the `endpoint_url`, `bucket_name` will be part of `KafkaConnector`

In [75]:
%%writefile deployment/connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: connect-cluster
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: openlake/kafka-connect:0.34.0
  version: 3.4.0
  replicas: 1
  bootstrapServers: my-kafka-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-kafka-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    bootstrap.servers: my-kafka-cluster-kafka-bootstrap:9092
    group.id: connect-cluster
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    internal.key.converter: org.apache.kafka.connect.json.JsonConverter
    internal.value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
    offset.storage.topic: connect-offsets
    offset.storage.replication.factor: 1
    config.storage.topic: connect-configs
    config.storage.replication.factor: 1
    status.storage.topic: connect-status
    status.storage.replication.factor: 1
    offset.flush.interval.ms: 10000
    plugin.path: /opt/kafka/plugins
    offset.storage.file.filename: /tmp/connect.offsets
  template:
    connectContainer:
      env:
        - name: AWS_ACCESS_KEY_ID
          value: "openlakeuser"
        - name: AWS_SECRET_ACCESS_KEY
          value: "openlakeuser"

Writing deployment/connect.yaml


In [87]:
!kubectl apply -f deployment/connect.yaml

kafkaconnect.kafka.strimzi.io/connect-cluster created


### Deploy Kafka Sink Connector

Now that we have the Kafka Connect up and running next step is to deploy the sink connector that will poll `my-topic` and store the data into MinIO bucket `openlake-tmp`.


`connector.class` - specifies what type of connector the sink connector will use in our case it is `io.confluent.connect.s3.S3SinkConnector`
`store.url` - MinIO endpoint URL where you want to store the data from KafkaConnect
`storage.class` - specifies which storage class to use in our case since we are storing in MinIO `io.confluent.connect.s3.storage.S3Storage` will be used
`format.class` -  Format type in which the data will be stored into MinIO, since we would like to store JSON we will use `io.confluent.connect.s3.format.json.JsonFormat` implementation

In [90]:
%%writefile deployment/connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: "minio-connector"
  namespace: "kafka"
  labels:
    strimzi.io/cluster:
      connect-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  config:
    connector.class: io.confluent.connect.s3.S3SinkConnector
    task.max: '1'
    topics: my-topic
    s3.region: us-east-1
    s3.bucket.name: openlake-tmp
    s3.part.size: '5242880'
    flush.size: '1000'
    store.url: https://play.min.io:50000
    storage.class: io.confluent.connect.s3.storage.S3Storage
    format.class: io.confluent.connect.s3.format.json.JsonFormat
    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
    behavior.on.null.values: ignore

Overwriting deployment/connector.yaml


In [89]:
!kubectl apply -f deployment/connector.yaml

kafkaconnector.kafka.strimzi.io/minio-connector created


If all goes well we can see files being added to Minio `openlake-tmp` bucket by executing the below command

In [79]:
!mc ls --summarize --recursive play/openlake-tmp/topics/my-topic

]11;?\[6n[m[32m[2023-04-11 19:53:29 PDT][0m[33m 368KiB[0m [34mSTANDARD[0m[1m partition=0/my-topic+0+0000000000.json[0m
[0m[m[32m[2023-04-11 19:53:30 PDT][0m[33m 368KiB[0m [34mSTANDARD[0m[1m partition=0/my-topic+0+0000001000.json[0m
[0m[m[32m[2023-04-11 19:53:31 PDT][0m[33m 368KiB[0m [34mSTANDARD[0m[1m partition=0/my-topic+0+0000002000.json[0m
[0m[m[32m[2023-04-11 19:53:31 PDT][0m[33m 368KiB[0m [34mSTANDARD[0m[1m partition=0/my-topic+0+0000003000.json[0m
[0m[m[32m[2023-04-11 19:53:31 PDT][0m[33m 368KiB[0m [34mSTANDARD[0m[1m partition=0/my-topic+0+0000004000.json[0m
[0m[m[32m[2023-04-11 19:53:32 PDT][0m[33m 368KiB[0m [34mSTANDARD[0m[1m partition=0/my-topic+0+0000005000.json[0m
[0m[m[32m[2023-04-11 19:53:32 PDT][0m[33m 368KiB[0m [34mSTANDARD[0m[1m partition=0/my-topic+0+0000006000.json[0m
[0m[m[32m[2023-04-11 19:53:33 PDT][0m[33m 368KiB[0m [34mSTANDARD[0m[1m partition=0/my-topic+0+0000007000.json[0m
[0m[m

We have an end-to-end implementation of producing topics in kafka and consuming it directly into MinIO using the Kafka Connectors. There is a lot more efficient and performant way to stream and store data into MinIO from Kafka which we will conver in the next Notebook