### Welcome to a typical Kafka administrator nightmare...

### We have a JSONSchema-serialized topic "products" and a qualification test is looming for an application consuming from that topic.

### Of course, the application crashes...
### (run consumer.py)
### Error: confluent_kafka.schema_registry.error.SchemaRegistryError: Schema 577659245 not found (HTTP status code 404, SR code 40403)

### The application cannot be fixed as it has been developed by externals (sigh), so we, the Kafka administrators, have to jump in and save the world.

### First, we have to find out what is wrong with the topic.
### The error message indicates that there are messages which have not been JSONSchema-serialized but are pure JSON.
### Let's validate that assumption and read the first message.

In [None]:
from kafi.kafi import *
c = Cluster("local")
x = c.head("products", type="bytes", n=1)
print(x)

### Ok, so how many messages are pure JSON instead of JSONSchema-serialized (value does not start with the magic zero byte)?

In [None]:
y = c.filter("products", type="bytes", filter_function=lambda x: x["value"][0] != 0)
print(len(y[0]))

### Ok, it's the first 100. Let's do a backup of them to a topic backed by local Kafi Kafka emulation...

In [None]:
l = Local("local")
l.retouch("products_backup")
c.cp("products", l, "products_backup", source_type="json", target_type="json", offsets={0: 0}, n=100)

### And then delete the first 100 messages on the real Kafka topic.

In [4]:
c.delete_records({"products": {0: 100}})

### Let's check the watermarks whether this has really worked.

In [None]:
c.watermarks("products")

### Ok, run the consumer application again...
### (run consumer.py)
### Works now.

### But we do have to bring back the first 100 messages, because the producers cannot do that (again, externals...).
### Let's see what schema ID we have to use...

In [None]:
z = c.head("products", type="bytes", n=1)
sid = int.from_bytes(z[0]["value"][1:5], "big")
print(sid)

### Ok, let's use that schema ID for adding the backed up messages to the end of the products topic - correctly JSONSchema-serialized.

In [None]:
l.cp("products_backup", c, "products", target_value_type="jsonschema", target_value_schema_id=sid)

### Argh, does not work: SerializationError: 'price' is a required property
### What's up?
### Let's check in Excel...

In [None]:
l.to_file("products_backup", l, "products_backup.xlsx", n=100)

### Argh, they have misspelled "price" as "pryce" (you might know the movie "Brazil" with Jonathan Pryce, it's similarly dystopian as our task at hand...)
### So let's fix that typo in-place in the Kafi Kafka-emulated topic on disk...
### (fix topic in VSC)

### And try to copy it to the back of the "products" topic again...

In [None]:
l.cp("products_backup", c, "products", target_value_type="jsonschema", target_value_schema_id=sid)

In [None]:
c.watermarks("products")

### Yeah.
### Now let's check if the consumer application can read all the messages, including the fixed ones...
### (run consumer.py)

### We did it :) Now the qualification test can go through. Phew.

### Very last step: Let's create a copy of that fixed topic in Parquet format for the analytic team - on S3.

In [None]:
s = S3("local")
c.to_file("products", s, "products.parquet", type="jsonschema")

### Let's see...
### (download Parquet file, show it, open a URL)

### That's really it.
### Thanks to all my colleagus from Migros in Zürich, in particular the Data Integration team, and Martin Muggli and Jason Nguyen - the Kafka guys. 

### Get your copy of Kafi from GitHub: https://github.com/xdgrulez/kafi

### And get your copy of the new O'Reilly book "Streaming Databases" by Hubert Dulay and me.

### And hand in exciting abstracts to the new, non-vendor-centric conference about everything events and streaming: EventCentric 2025 (Antwerp, Belgium, June 2-5, 2025)
### https://aardling.eu/en/eventcentric-2025-coming-soon
