# Kafi: Kafka Superpowers for Your Jupyter Notebook and Python
<img src="pix/kafka.jpg" style="width: 33%; height: 33%"/>
<img src="pix/jupyter.jpg" style="width: 33%; height: 33%"/>

### Ralph Debusmann
##### `ralph.debusmann@mgb.ch`

<img src="pix/migros.png" style="width: 20%; height: 20%"/>


# Agenda

* Part I: The Birth of Kafi

* Part II: Three Paradigms for Using Kafi
  * Shell/Python interpreter
  * Juypter Notebooks
  * Code (Microservices, FaaS, Agents...)

* Part III: Use Cases for Kafi
  * Kafka Administration
  * Schema Registry Administration
  * Kafka Backups incl. Kafka Emulation
  * Simple Stream Processing
  * Kafka via REST Proxy
  * Building a Bridge from Kafka to Pandas Dataframes and Files

# Part I: The Birth of Kafi


<img src="pix/birth.jpg" style="width: 35%; height: 35%"/>


What happens if you would just like to create a topic on Kafka, list topics, produce some messages, or consume some messages, or search for messages?

The answer is often:
* kafkacat/kcat
* standard Kafka commandline tools (kafka-console-producer, kafka-console-consumer...)


It works...for a long time indeed. But how?

## Still the State-of-the-Art Developer Experience

### List Topics

```
kcat -b localhost:9092 -L
```


```
kafka-topics --bootstrap-server localhost:9092 --list
```

### Create Topics

(not possible with kcat)


```
kafka-topics --bootstrap-server localhost:9092 --topic topic_json1 --create
```


### Produce Messages

```
kcat -b localhost:9092 -t topic_json1 -P -K ,

123,{"bla":123}
456,{"bla":456}
789,{"bla":789}
```


```
kafka-console-producer --bootstrap-server localhost:9092 --topic topic_json1 --property parse.key=true --property key.separator=','

123,{"bla":123}
456,{"bla":456}
789,{"bla":789}
```


### Produce Messages Using a Schema

(not even possible with kcat...)

```
kafka-avro-console-producer --bootstrap-server localhost:9092 --topic topic_avro1 --property schema.registry.url=http://localhost:8081 --property key.serializer=org.apache.kafka.common.serialization.StringSerializer --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"bla","type":"int"}]}' --property parse.key=true --property key.separator=','

123,{"bla": 123}
456,{"bla": 456}
789,{"bla": 789}
```


### Consume Messages

```
kcat -b localhost:9092 -t topic_json1 -C -o beginning -e -K ,
```


```
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_json1 --property print.key=true --from-beginning
```

### Search Messages

```
kcat -b localhost:9092 -t topic_json1 -C -o beginning -e | grep 456
```


```
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_json1 --from-beginning | grep 456
```

## Can't We Do Better?

I developed Kafi because I was frustrated with kcat and the standard Kafka commandline tools. Not by another commandline tool, but by building a Python module (=library) wrapped around Confluent's Python client for Kafka, confluent_kafka.

Regardless of whether you use Kafi in your shell or in a Jupyter notebook, you have a similar experience. And your life gets so much better. I promise.


Because Kafi is a Python module, you first need to import it. Then, you create a Cluster object `c` reading from a configuration file:

```
from kafi.kafi import *
c = Cluster("local")
```

Then...this is how you can list topics, create topics, produce messages, consume messages or search for messages with Kafi.


### List Topics

```
c.ls()
```

Many commands also support wildcards, so like in a shell, you can do e.g.:
```
c.ls("*off*")
```

### Create Topics

```
c.touch("topic_json2")
```

### Produce Messages

```
pr = c.producer("topic_json2")
pr.produce({"bla": 123}, key="123")
pr.produce({"bla": 456}, key="456")
pr.produce({"bla": 789}, key="789")
pr.close()
```

### Produce Messages Using a Schema

```
t = "topic_avro2"
vs = '{"type":"record","name":"myrecord","fields":[{"name":"bla","type":"int"}]}'

pr = c.producer(t, value_type="avro", value_schema=vs)
pr.produce({"bla": 123}, key="123")
pr.produce({"bla": 456}, key="456")
pr.produce({"bla": 789}, key="789")
pr.close()
```


### Consume Messages

```
c.cat("topic_json2")
```

or...

```
c.cat("topic_avro2")
```

ok, it's Avro...

```
c.cat("topic_avro2", value_type="avro")
```


### Configuration



Kafi supports the full range of configuration options of Confluent's Python client. This is, for example, the simple configuration file to connect to a local Kafka cluster that we used in our first steps with Kafi before:

```
kafka:
  bootstrap.servers: localhost:9092

schema_registry:
  schema.registry.url: http://localhost:8081
```


...and this is a configuration file for connecting to Confluent Cloud:

```
kafka:
  bootstrap.servers: ${KAFI_KAFKA_SERVER}
  security.protocol: SASL_SSL
  sasl.mechanisms: PLAIN
  sasl.username: ${KAFI_KAFKA_USERNAME}
  sasl.password: ${KAFI_KAFKA_PASSWORD}
  
schema_registry:
  schema.registry.url: ${KAFI_SCHEMA_REGISTRY_URL}
  basic.auth.credentials.source: USER_INFO
  basic.auth.user.info: ${KAFI_SCHEMA_REGISTRY_USER_INFO}
```

There are many other configuration options to fine-tune your cluster connection and to override Kafi's "common sense" defaults (e.g. setting the `auto.offset.reset` to `earliest`). These defaults are one of the building blocks responsible for making it so convenient to use.

# Part II: Three Paradigms for Using Kafi

<img src="pix/paradigms.jpg" style="width: 35%; height: 35%"/>


Wait, this talk is titled "Kafka Superpowers for Your Jupyter Notebook and Python". So where is Kafi in the Jupyter notebook? Ok, here, but that's not what you probably ask yourselves... so far, we just used in the Python interpreter in the shell...

There are actually three main paradigms for using Kafi.

## Shell/Python Interpreter

The first is in your shell using the Python interpreter, like we did in Part I above. That gives you a user/developer experience similar to bash/zsh + kcat or the standard Kafka commandline tools.

## Code (Microservices, FaaS, Agents...)

As Kafi is just a Python module, it is also super useful to use in your Python code. Either for smaller scripts, or even for building microservices, FaaS-functions, or agents (put in a pinch of llamaindex agents for example).

## Jupyter Notebooks

Now finally to them. You will see soon in Part III that Jupyter notebooks are a very convenient and powerful paradigm of using Kafi, especially for Python/Jupyter afficionados like Data Scientists etc.

But... using Kafi in a Jupyter notebook is actually also very convenient and powerful for Kafka administrators or developers! You'll see.

# Part III: Use Cases for Kafi


<img src="pix/use_cases.jpg" style="width: 35%; height: 35%"/>


## Kafka Administration

We covered a bit of that already when we compared Kafi to kcat/the standard Kafka commandline tools. So let's start again by importing Kafi and connecting to our local Kafka cluster, and a Confluent Cloud cluster for good measure.

In [1]:
from kafi.kafi import *
cl = Cluster("local")
cc = Cluster("ccloud")

### Brokers

A basic administration task is to show the brokers of your Kafka cluster. So let's view the brokers of our local Kafka cluster first.

In [2]:
cl.brokers()

{1: 'localhost:9092'}

Interesting, now for our Confluent Cloud Basic Cluster...

In [3]:
cc.brokers()

{0: 'b0-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 1: 'b1-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 2: 'b2-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 3: 'b3-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 4: 'b4-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 5: 'b5-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 6: 'b6-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 7: 'b7-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 8: 'b8-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 9: 'b9-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 10: 'b10-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092',
 11: 'b11-pkc-ewzgj.europe-west4.gcp.confluent.cloud:9092'}

How about the broker configs?

In [4]:
cl.broker_config()

{1: {'log.cleaner.min.compaction.lag.ms': '0',
  'offsets.topic.num.partitions': '50',
  'sasl.oauthbearer.jwks.endpoint.refresh.ms': '3600000',
  'remote.log.metadata.manager.listener.name': None,
  'log.flush.interval.messages': '9223372036854775807',
  'controller.socket.timeout.ms': '30000',
  'principal.builder.class': 'org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder',
  'log.flush.interval.ms': None,
  'controller.quorum.request.timeout.ms': '2000',
  'sasl.oauthbearer.expected.audience': None,
  'min.insync.replicas': '1',
  'remote.log.manager.thread.pool.size': '10',
  'group.consumer.max.session.timeout.ms': '60000',
  'num.recovery.threads.per.data.dir': '1',
  'ssl.keystore.type': 'JKS',
  'zookeeper.ssl.protocol': 'TLSv1.2',
  'controller.quorum.bootstrap.servers': '',
  'sasl.mechanism.inter.broker.protocol': 'GSSAPI',
  'group.share.record.lock.duration.ms': '30000',
  'metadata.log.segment.bytes': '1073741824',
  'fetch.purgatory.purge.interv

For our Confluent Cloud cluster, we only want to see the config of one broker, broker 11:

In [5]:
cc.broker_config(11)

{11: {'log.message.timestamp.type': 'CreateTime',
  'ssl.cipher.suites': 'TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_128_GCM_SHA256',
  'log.message.timestamp.difference.max.ms': '9223372036854775807',
  'log.cleanup.policy': 'delete',
  'log.cleaner.min.compaction.lag.ms': '0',
  'message.max.bytes': '2097164',
  'log.segment.bytes': '104857600',
  'log.message.timestamp.before.max.ms': '9223372036854775807',
  'log.retention.bytes': '-1',
  'log.cleaner.delete.retention.ms': '86400000',
  'auto.create.topics.enable': 'false',
  'log.cleaner.max.compaction.lag.ms': '9223372036854775807',
  'ssl.enabled.protocols': 'TLSv1.2',
  'log.message.timestamp.after.max.ms': '9223372036854775807',
  'default.replication.factor': '3',
  'log.retention.ms': None,
  'num.partitions': '6'}}

What if we'd just like to see one configuration item, e.g. `message.max.bytes`?

In [6]:
_[11]["message.max.bytes"]

'2097164'

We can just as well change this configuration item, or at least try to do it.

In [7]:
cc.broker_config(config={"message.max.bytes": 1048582})

KafkaException: KafkaError{code=POLICY_VIOLATION,val=44,str="Altering resources of type BROKER is not permitted"}

Well there we are. But it should work on our local cluster:

In [8]:
cl.broker_config(config={"message.max.size": 1048582})[1]["message.max.bytes"]

'1048588'

### Consumer Groups

Let's first see the consumer groups that we have.

In [9]:
cl.gls()

['1742384316499',
 '_confluent-controlcenter-7-9-0-1-command',
 '1742384288901',
 '_confluent-controlcenter-7-9-0-1',
 '1742384306781']

That's the automatically created groups that our `cat` command created before. What are the offsets of one of them?

In [11]:
g = "1742384288901"

cl.group_offsets(g)


{'1742384288901': {'topic_json2': {0: 3}}}

What if we need the same consumer group offsets for another consumer group... on Confluent Cloud? For that, let's first create a topic on Confluent Cloud and populate it.

In [12]:
t = "topic_json2"

cc.touch(t)
pr = cc.producer(t)
pr.produce({"bla": 123}, key="123")
pr.produce({"bla": 456}, key="456")
pr.produce({"bla": 789}, key="789")
pr.close()

'topic_json2'

Go. Copy the offsets of our consumer group on our local cluster to a new one on Confluent Cloud.

In [13]:
cl.cp_group_offsets(t, g, cc, g)

{'1742384288901': {'topic_json2': {0: 3}}}

Check it...

In [14]:
cc.group_offsets(g)

{'1742384288901': {'topic_json2': {0: 3}}}

Let's close by going back to our local cluster and deleting the source consumer groups.

In [15]:
cl.grm(g)

['1742384288901']

In [16]:
cl.gls()

['1742384316499',
 '_confluent-controlcenter-7-9-0-1-command',
 '_confluent-controlcenter-7-9-0-1',
 '1742384306781']

### Topics

In the same way as for the brokers, we can have a look at the configuration of a topic...

In [17]:
t = "topic_json2"

cl.config(t)

{'topic_json2': {'compression.type': 'producer',
  'remote.log.delete.on.disable': 'false',
  'leader.replication.throttled.replicas': '',
  'remote.storage.enable': 'false',
  'message.downconversion.enable': 'true',
  'min.insync.replicas': '1',
  'segment.jitter.ms': '0',
  'remote.log.copy.disable': 'false',
  'local.retention.ms': '-2',
  'cleanup.policy': 'delete',
  'flush.ms': '9223372036854775807',
  'follower.replication.throttled.replicas': '',
  'compression.lz4.level': '9',
  'segment.bytes': '1073741824',
  'retention.ms': '604800000',
  'compression.gzip.level': '-1',
  'flush.messages': '9223372036854775807',
  'compression.zstd.level': '3',
  'message.format.version': '3.0-IV1',
  'max.compaction.lag.ms': '9223372036854775807',
  'file.delete.delay.ms': '60000',
  'max.message.bytes': '1048588',
  'min.compaction.lag.ms': '0',
  'message.timestamp.type': 'CreateTime',
  'local.retention.bytes': '-2',
  'preallocate': 'false',
  'min.cleanable.dirty.ratio': '0.5',
  'in

...we can change the configuation just as well:

In [18]:
cl.config(t, {"retention.ms": -1})

{'topic_json2': {'compression.type': 'producer',
  'remote.log.delete.on.disable': 'false',
  'leader.replication.throttled.replicas': '',
  'remote.storage.enable': 'false',
  'message.downconversion.enable': 'true',
  'min.insync.replicas': '1',
  'segment.jitter.ms': '0',
  'remote.log.copy.disable': 'false',
  'local.retention.ms': '-2',
  'cleanup.policy': 'delete',
  'flush.ms': '9223372036854775807',
  'follower.replication.throttled.replicas': '',
  'compression.lz4.level': '9',
  'segment.bytes': '1073741824',
  'retention.ms': '-1',
  'compression.gzip.level': '-1',
  'flush.messages': '9223372036854775807',
  'compression.zstd.level': '3',
  'message.format.version': '3.0-IV1',
  'max.compaction.lag.ms': '9223372036854775807',
  'file.delete.delay.ms': '60000',
  'max.message.bytes': '1048588',
  'min.compaction.lag.ms': '0',
  'message.timestamp.type': 'CreateTime',
  'local.retention.bytes': '-2',
  'preallocate': 'false',
  'min.cleanable.dirty.ratio': '0.5',
  'index.int

We can create or delete topics...

In [19]:
cl.touch("abc")


'abc'

In [20]:
cl.rm("a*")

['abc']

We can list topics with their total sizes...

In [21]:
cl.l()

{'__consumer_offsets': 204,
 '__transaction_state': 4,
 '_confluent-command': 1,
 '_confluent-controlcenter-7-9-0-1-AlertHistoryStore-changelog': 0,
 '_confluent-controlcenter-7-9-0-1-AlertHistoryStore-repartition': 0,
 '_confluent-controlcenter-7-9-0-1-Group-ONE_MINUTE-changelog': 0,
 '_confluent-controlcenter-7-9-0-1-Group-ONE_MINUTE-repartition': 0,
 '_confluent-controlcenter-7-9-0-1-Group-THREE_HOURS-changelog': 0,
 '_confluent-controlcenter-7-9-0-1-Group-THREE_HOURS-repartition': 0,
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTEROTHER-0000000106-store-changelog': 70,
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTEROTHER-0000000106-store-repartition': 0,
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTERTHIS-0000000105-store-changelog': 70,
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTERTHIS-0000000105-store-repartition': 0,
 '_confluent-controlcenter-7-9-0-1-MetricsAggregateStore-changelog': 0,
 '_confluent-controlcenter-7-9-0-1-MetricsAggregateStore-repartition': 0,
 '_confluent-

...see their partitions:

In [22]:
cl.partitions(t)

{'topic_json2': 1}

...and their watermarks:

In [23]:
cl.watermarks(t)

{'topic_json2': {0: (0, 3)}}

Now let's create a new test topic and write some messages to it.

In [None]:
import time

t = "topic_offsets"

pr = cl.producer(t)
pr.produce({"bla": 123}, key="123")
time.sleep(0.1)
pr.produce({"bla": 456}, key="456")
time.sleep(0.1)
pr.produce({"bla": 789}, key="789")
pr.close()

cl.cat(t)

Let's pick the timestamp of the second message and search for it:

In [None]:
cl.offsets_for_times(t, {0: ... })

Perfect. Next idea: Let's delete some records from the beginning of the topic. E.g. the first two:

In [None]:
print(cl.watermarks(t))

cl.delete_records({t: {0: 2}})

cl.watermarks(t)

Now we are left with only the third message:

In [None]:
cl.cat(t)

Now, please stay with me... we have some more functionality that goes beyond just having a "cat" command... all shell-inspired...

Read from the beginning of the topic:

In [24]:
cl.head(t, 1)

[{'topic': 'topic_json2',
  'headers': None,
  'partition': 0,
  'offset': 0,
  'timestamp': (1, 1742384198782),
  'key': '123',
  'value': {'bla': 123}}]

Or from the end:

In [25]:
cl.tail(t, 1)

[{'topic': 'topic_json2',
  'headers': None,
  'partition': 0,
  'offset': 2,
  'timestamp': (1, 1742384198782),
  'key': '789',
  'value': {'bla': 789}}]

Now it increasingly gets wilder. How about copying a topic from our local cluster to Confluent Cloud?

In [26]:
t2 = t + "_from_local_cluster"

cc.touch(t2)

cl.cp(t, cc, t2)

cc.l(t2)

{'topic_json2_from_local_cluster': 3}

Let's see this on Confluent Cloud...

And now, some more shelly stuff.

Word count:

In [27]:
cl.wc(t)

(3, 9, 45)

Do a grep on the topic to find the message with value `456`:

In [28]:
cl.grep(t, ".*456.*")

Found matching message on partition 0, offset 1.


([{'topic': 'topic_json2',
   'headers': None,
   'partition': 0,
   'offset': 1,
   'timestamp': (1, 1742384198782),
   'key': '456',
   'value': {'bla': 456}}],
 1,
 3)

## Schema Registry Administration

With Kafi, you also have the entire array of the Schema Registry API at your disposal.

List subjects:

In [29]:
cl.sls()

['topic_avro2-value']

Or maybe just those matching a pattern:

In [30]:
cl.sls("*2-value")

['topic_avro2-value']

Get all the versions of the subject:

In [31]:
s = "topic_avro2-value"

cl.get_versions(s)

[1]

Get the latest version...

In [32]:
cl.get_latest_version(s)

{'schema_id': 1,
 'schema': {'schema_str': '{"type":"record","name":"myrecord","fields":[{"name":"bla","type":"int"}]}',
  'schema_type': 'AVRO'},
 'subject': 'topic_avro2-value',
 'version': 1}

Next. We list the subjects, delete our subject, and list the subjects again:

In [33]:
print(cl.sls())

cl.srm(s)

cl.sls()

['topic_avro2-value']


[]

Now let's see if it is only soft-deleted...

In [34]:
cl.sls(deleted=True)

['topic_avro2-value']

Aha, so let's kill it off completely.

In [35]:
cl.srm(s, permanent=True)

{'topic_avro2-value': [1]}

Now it should really be gone.

In [36]:
cl.sls(deleted=True)

[]

The rest of the Schema Registry API is also supported:

* get_schema
* register_schema
* lookup_schema
* get_schema_versions
* get_versions
* delete_version
* get_compatibility
* set_compatibility
* test_compatibility

That's it for the first use case - for doing Kafka administration with Kafi :-)

Time to close your eyes and breathe now...

## Simple Stream Processing

Kafi also offers some functionality for simple stream processing. It's nowhere as expressive and powerful as e.g. Kafka Streams or Flink, or other Python libraries like Quix, Bytewax, Pathway etc. - but for many day-to-day tasks and microservices, this could even be enough.

Oh, and shameless plug. If you wish to read up on stream processing and streaming databases, and the ongoing convergence of streaming and databases/data warehouses/data lakes (e.g. Tableflow) - there is a book that I can recommend ;-)

<img src="pix/sdb.jpg" style="width: 30%; height: 30%"/>


Back to the topic. Kafi and stream processing. All the functionality for stream processing (and actually, even simpler commands like `cat` or `head`) are based on a functional backbone. As a functional programmer, or a Kafka Streams DSL or Flink DataStream API user, you'll feel at home immediately.


We start with `foreach`. Here, we simply read the topic message-by-message and print out its key. We could do anything in the lambda expression, API calls for example.

In [37]:
t = "topic_json2"

cl.foreach(t, lambda x: print(x["key"]))

123
456
789


Next, we go a bit further and use a `map` function that reads individual messages from a topic, does a "single message transform", and returns the result of the transformation.

In [38]:
def add(x):
    x["value"]["bla"] += 1000
    return x

cl.map(t, add)


([{'topic': 'topic_json2',
   'headers': None,
   'partition': 0,
   'offset': 0,
   'timestamp': (1, 1742384198782),
   'key': '123',
   'value': {'bla': 1123}},
  {'topic': 'topic_json2',
   'headers': None,
   'partition': 0,
   'offset': 1,
   'timestamp': (1, 1742384198782),
   'key': '456',
   'value': {'bla': 1456}},
  {'topic': 'topic_json2',
   'headers': None,
   'partition': 0,
   'offset': 2,
   'timestamp': (1, 1742384198782),
   'key': '789',
   'value': {'bla': 1789}}],
 3)

Of course we can also write out the result of the transformation to another topic, even on another cluster. So let's do the same transformation as above and write the result out to our Confluent Cloud cluster...

In [39]:
t2 = "topic_json_map"

cc.touch(t2)

cl.map_to(t, cc, t2, add)


(3, 3)

Let's see if that has worked...

The next command, `filter`, is just a special case of `flatmap`. Here, we just want to keep the message with value `456`:

In [None]:
cl.filter(t, lambda x: x["value"]["bla"] == 456)

Of course, `filter_to` is also there, and again we write out the result to Confluent Cloud:

In [40]:
t4 = "topic_filter"

cc.touch(t4)

cl.filter_to(t, cc, t4, lambda x: x["value"]["bla"] == 456)

(3, 1)

One more check on Confluent Cloud...

...and continue. `foldl` stands for "fold left" in functional programming, and is often also called `reduce` (e.g. in Kafka Streams). It is useful for simple stateful stream processing.

In the example below, we do a very simple aggregation: We sum up the values.

In [41]:
def sum(acc, x):
    acc += x["value"]["bla"]
    return acc

cl.foldl(t, sum, 0)

(1368, 3)

And again, Kafi allows you to write out the result of your processing into another topic. On any cluster. It's a bit more involved though. What we do below is to get the value of each message in the source topic on our local cluster, do the aggregation as before, and then remove the `bla` field, and add another field `sum` with the current sum:

In [42]:
t5 = "topic_foldl"

cc.touch(t5)

def sum_to(acc, x):
    acc += x["value"]["bla"]
    #
    y = dict(x)
    del y["value"]["bla"]
    y["value"]["sum"] = acc
    #
    return acc, [y]

cl.foldl_to(t, cc, t5, sum_to, 0)

(1368, 3, 3)

Let's it check out on Confluent Cloud again...

...and now for the last simple stream processing function.

We join the source topic with the `bla` field from our local Kafka cluster with the new topic with only the `sum` field on Confluent Cloud, and write out the result to our local Kafka cluster to have a topic that has both fields.

We use the key of the messages to join them, as e.g. in Kafka Streams (of course, you can override this and also e.g. use a field in the value payload).

BTW the join code is inspired by DBSP/Feldera, if you don't know it, have a look at e.g. this super cool blog on their web page:
https://www.feldera.com/blog/gpu-stream-dbsp


In [43]:
t6 = "topic_join"

cl.join_to(t, cc, t5, cl, t6)

(({'123': {'topic': 'topic_json2',
    'headers': None,
    'partition': 0,
    'offset': 0,
    'timestamp': (1, 1742384198782),
    'key': '123',
    'value': {'bla': 123}},
   '456': {'topic': 'topic_json2',
    'headers': None,
    'partition': 0,
    'offset': 1,
    'timestamp': (1, 1742384198782),
    'key': '456',
    'value': {'bla': 456}},
   '789': {'topic': 'topic_json2',
    'headers': None,
    'partition': 0,
    'offset': 2,
    'timestamp': (1, 1742384198782),
    'key': '789',
    'value': {'bla': 789}}},
  {}),
 3,
 3,
 3)

Let's check the result:

In [44]:
cl.cat(t6)

[{'topic': 'topic_join',
  'headers': None,
  'partition': 0,
  'offset': 0,
  'timestamp': (1, 1742385445885),
  'key': '123',
  'value': {'bla': 123, 'sum': 123}},
 {'topic': 'topic_join',
  'headers': None,
  'partition': 0,
  'offset': 1,
  'timestamp': (1, 1742385445885),
  'key': '456',
  'value': {'bla': 456, 'sum': 579}},
 {'topic': 'topic_join',
  'headers': None,
  'partition': 0,
  'offset': 2,
  'timestamp': (1, 1742385445885),
  'key': '789',
  'value': {'bla': 789, 'sum': 1368}}]

## Kafka via REST Proxy

The entire functionality of Kafi cannot only be used via the direct Kafka protocol, but also via a REST Proxy. This might sometimes be necessary if you have a firewall blocking the Kafka port, or a Private Cluster that you can only access via IP whitelisting.

How does it work? You just create a `RestProxy` object instead of `Cluster`, and then e.g. do a `ls`:

In [47]:
rl = RestProxy("local")
rl.ls()

['_confluent-command',
 '_confluent-controlcenter-7-9-0-1-AlertHistoryStore-changelog',
 '_confluent-controlcenter-7-9-0-1-AlertHistoryStore-repartition',
 '_confluent-controlcenter-7-9-0-1-Group-ONE_MINUTE-changelog',
 '_confluent-controlcenter-7-9-0-1-Group-ONE_MINUTE-repartition',
 '_confluent-controlcenter-7-9-0-1-Group-THREE_HOURS-changelog',
 '_confluent-controlcenter-7-9-0-1-Group-THREE_HOURS-repartition',
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTEROTHER-0000000106-store-changelog',
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTEROTHER-0000000106-store-repartition',
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTERTHIS-0000000105-store-changelog',
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTERTHIS-0000000105-store-repartition',
 '_confluent-controlcenter-7-9-0-1-MetricsAggregateStore-changelog',
 '_confluent-controlcenter-7-9-0-1-MetricsAggregateStore-repartition',
 '_confluent-controlcenter-7-9-0-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-changelog',
 '_confluent-con

Now is this really going over HTTP? Have a look...

In [48]:
rl.verbose(2)
rl.ls()

GET Request
-
URL: http://localhost:8082/v3/clusters/MkU3OEVBNTcwNTJENDM2Qk/topics
Headers: {'Content-Type': 'application/json'}

GET Response
-
{"kind":"KafkaTopicList","metadata":{"self":"http://rest-proxy:8082/v3/clusters/MkU3OEVBNTcwNTJENDM2Qk/topics","next":null},"data":[{"kind":"KafkaTopic","metadata":{"self":"http://rest-proxy:8082/v3/clusters/MkU3OEVBNTcwNTJENDM2Qk/topics/_confluent-command","resource_name":"crn:///kafka=MkU3OEVBNTcwNTJENDM2Qk/topic=_confluent-command"},"cluster_id":"MkU3OEVBNTcwNTJENDM2Qk","topic_name":"_confluent-command","is_internal":false,"replication_factor":1,"partitions_count":1,"partitions":{"related":"http://rest-proxy:8082/v3/clusters/MkU3OEVBNTcwNTJENDM2Qk/topics/_confluent-command/partitions"},"configs":{"related":"http://rest-proxy:8082/v3/clusters/MkU3OEVBNTcwNTJENDM2Qk/topics/_confluent-command/configs"},"partition_reassignments":{"related":"http://rest-proxy:8082/v3/clusters/MkU3OEVBNTcwNTJENDM2Qk/topics/_confluent-command/partitions/-/reassign

['_confluent-command',
 '_confluent-controlcenter-7-9-0-1-AlertHistoryStore-changelog',
 '_confluent-controlcenter-7-9-0-1-AlertHistoryStore-repartition',
 '_confluent-controlcenter-7-9-0-1-Group-ONE_MINUTE-changelog',
 '_confluent-controlcenter-7-9-0-1-Group-ONE_MINUTE-repartition',
 '_confluent-controlcenter-7-9-0-1-Group-THREE_HOURS-changelog',
 '_confluent-controlcenter-7-9-0-1-Group-THREE_HOURS-repartition',
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTEROTHER-0000000106-store-changelog',
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTEROTHER-0000000106-store-repartition',
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTERTHIS-0000000105-store-changelog',
 '_confluent-controlcenter-7-9-0-1-KSTREAM-OUTERTHIS-0000000105-store-repartition',
 '_confluent-controlcenter-7-9-0-1-MetricsAggregateStore-changelog',
 '_confluent-controlcenter-7-9-0-1-MetricsAggregateStore-repartition',
 '_confluent-controlcenter-7-9-0-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-changelog',
 '_confluent-con

Really all the commands that you have seen above also work via REST now. E.g. you can produce to a topic as before...

In [None]:
t = "topic_rest_proxy"

pr = rl.producer(t)
pr.produce({"bla": 123}, key="123")
pr.produce({"bla": 456}, key="456")
pr.produce({"bla": 789}, key="789")
pr.close()

And do a `cat`:

In [None]:
rl.cat(t, key_type="json")

## Kafka Backups incl. Kafka Emulation

Kafi has built-in "Kafka Emulation". That is useful e.g. for backing up topics to local disk, and replaying them back 1:1 back to Kafka.

Here, we create a `Local` object that points to "Kafka emulation" our local hard disk, and backup a topic from Confluent Cloud to it.


In [2]:
from kafi.kafi import *
cc = Cluster("ccloud")
l = Local("local")

t = "topic_foldl"

cc.cp(t, l, t)

(3, 3)

Let's check out whether the topic has landed on our "Kafka Emulation":

In [3]:
l.l()

{'topic_foldl': 3}

Cool. Let's see it:

In [4]:
l.cat(t)

[{'topic': 'topic_foldl',
  'value': {'sum': 123},
  'key': '123',
  'timestamp': (1, 1742385778688),
  'headers': None,
  'partition': 0,
  'offset': 0},
 {'topic': 'topic_foldl',
  'value': {'sum': 579},
  'key': '456',
  'timestamp': (1, 1742385778688),
  'headers': None,
  'partition': 0,
  'offset': 1},
 {'topic': 'topic_foldl',
  'value': {'sum': 1368},
  'key': '789',
  'timestamp': (1, 1742385778688),
  'headers': None,
  'partition': 0,
  'offset': 2}]

And now, since we have read it, there should be an "emulated" consumer group as well, no?

In [5]:
l.gls()

['1742385801143']

In [6]:
l.describe_groups()


{'1742385801143': {'group_id': '1742385801143',
  'is_simple_consumer_group': False,
  'partition_assignor': 'range',
  'state': 'empty'}}

How does this look like under the covers? Let's see.

But Kafi doesn't only support local disk here. You can just as well use Kafi's direct Azure Blob Storage support, or, as we will show, S3.

In [7]:
s = S3("local")
s.ls()


[]

Still empty. So let's copy a topic from somewhere (e.g. our local Kafka cluster) to S3 (local MinIO).

In [9]:
cl = Cluster("local")
cc = Cluster("ccloud")

t = "topic_json2"

cl.cp(t, s, t)

(3, 3)

...and?

In [10]:
s.l()

{'topic_json2': 3}

One more `cat`, now reading from S3:

In [11]:
s.cat(t)

[{'topic': 'topic_json2',
  'value': {'bla': 123},
  'key': '123',
  'timestamp': (1, 1742385971738),
  'headers': None,
  'partition': 0,
  'offset': 0},
 {'topic': 'topic_json2',
  'value': {'bla': 456},
  'key': '456',
  'timestamp': (1, 1742385971738),
  'headers': None,
  'partition': 0,
  'offset': 1},
 {'topic': 'topic_json2',
  'value': {'bla': 789},
  'key': '789',
  'timestamp': (1, 1742385971738),
  'headers': None,
  'partition': 0,
  'offset': 2}]

Let's check this out in the MinIO UI as well...

...and lastly, let's copy back the topic from local S3 "Kafka Emulation" to Confluent Cloud.

In [None]:
t7 = "topic_json_from_s3"

cc.touch(t7)

s.cp(t, cc, t7)

## Building a Bridge from Kafka to Pandas Dataframes and Files

We are at the end, oh no, one more thing.

Kafi's name doesn't only mean "coffee" in Swiss German, but it actually means "*Ka*fka" and "*Fi*les".

In this sense, you can not only use Kafi for doing backups and play them back to Kafka.


What Kafi can do, is e.g. copy a topic into a Pandas dataframe.

In [12]:
df = cl.topic_to_df(t)
df

Unnamed: 0,bla
0,123
1,456
2,789


And back to Kafka...

In [13]:
tdf = "topic_df"

cl.df_to_topic(df, tdf)

3

Let's check that out... the keys should be missing as these commands yet only use the value of the messages:

In [14]:
cl.cat(tdf)

[{'topic': 'topic_df',
  'headers': None,
  'partition': 0,
  'offset': 0,
  'timestamp': (1, 1742386072368),
  'key': None,
  'value': {'bla': 123}},
 {'topic': 'topic_df',
  'headers': None,
  'partition': 0,
  'offset': 1,
  'timestamp': (1, 1742386072369),
  'key': None,
  'value': {'bla': 456}},
 {'topic': 'topic_df',
  'headers': None,
  'partition': 0,
  'offset': 2,
  'timestamp': (1, 1742386072369),
  'key': None,
  'value': {'bla': 789}}]

Once you have `topic_to_df` and `df_to_topic`, it is not a far step to use the support of all kinds of file formats supported of Pandas:
* csv
* feather
* json
* orc
* parquet
* xlsx
* xml

Hence Kafi also supports direct dumping of a topic to a Parquet file for instance, in this example, from the local Kafka cluster to S3:

In [15]:
s = S3("local")

cl.topic_to_file(t, s, "topic.parquet")


1570

Let's check this out in the MinIO UI...

Now, we can also copy the topic from e.g. from our local Kafka to an Excel file:

In [16]:
cl.topic_to_file(t, s, "topic.xlsx")

4954

Let's download and have a look...

And, cool thing is, we can go the other way round, too. We can bring back the Excel file on S3 and write it out to a topic on Confluent Cloud...

In [17]:
texcel = "topic_from_excel"

cc.touch(texcel)

s.file_to_topic("topic.xlsx", cc, texcel)

3

A final look into the Confluent Cloud UI hopefully shows us that it has worked...

So - why don't you just head over to GitHub for the Kafi project and its documentation:
https://github.com/xdgrulez/kafi

This Jupyter notebook can also be found there, in case you'd like to go through it yourself:
https://github.com/xdgrulez/cur25blr

<img src="pix/thank_you.jpg" style="width: 40%; height: 40%"/>