In [2]:
from confluent_kafka.admin import AdminClient

admin = AdminClient({"bootstrap.servers": "localhost:9092"})
md = admin.list_topics(timeout=5)
print("Topics:", list(md.topics.keys())[:20])  # muestra algunos

t = "org.chicago.cta.station.arrivals.v1"
if t in md.topics:
    tm = md.topics[t]
    print(f"{t} partitions:", len(tm.partitions))
else:
    print("El tópico no existe:", t)

Topics: ['_schemas', 'connect-configs', '_confluent-ksql-default_query_CTAS_TURNSTILE_SUMMARY_0-KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition', '_confluent-ksql-default__command_topic', '_confluent-ksql-default_query_CTAS_TURNSTILE_SUMMARY_0-KSTREAM-AGGREGATE-STATE-STORE-0000000006-changelog', 'com.udacity.streams.pages', 'org.chicago.cta.stations.table.v1', '_confluent-monitoring', 'connect-status', 'connect-offsets', '__consumer_offsets', 'stations-stream-__assignor-__leader', 'TURNSTILE_SUMMARY', 'com.udacity.streams.users', 'org.chicago.cta.stations', '_confluent-metrics', 'com.udacity.streams.clickevents', 'org.chicago.cta.station.arrivals.v1', 'com.udacity.streams.purchases', '__confluent.support.metrics']
org.chicago.cta.station.arrivals.v1 partitions: 3


In [4]:
from confluent_kafka import Consumer, TopicPartition

def read_from_beginning(topic, partition=0, max_msgs=10, timeout=5.0):
    c = Consumer({
        "bootstrap.servers": "localhost:9092",
        "group.id": "ad-hoc-reader",   # no importa, no committeamos
        "enable.auto.commit": False,
        "auto.offset.reset": "earliest",
    })
    tp = TopicPartition(topic, partition)
    c.assign([tp])
    # mover a earliest manualmente
    low, high = c.get_watermark_offsets(tp, timeout=5.0)
    c.seek(TopicPartition(topic, partition, low))
    print(f"Watermarks {topic}[{partition}]: low={low}, high={high}")

    seen = 0
    while seen < max_msgs:
        msg = c.poll(timeout)
        if msg is None:
            print("(timeout, no hay más mensajes)")
            break
        if msg.error():
            print("Error:", msg.error())
            break
        try:
            print(msg.value().decode("utf-8"))
        except Exception:
            print(msg.value())  # por si es Avro binario
        seen += 1
    c.close()

read_from_beginning("org.chicago.cta.station.arrivals.v1", partition=0, max_msgs=5)


KafkaException: KafkaError{code=_STATE,val=-172,str="Failed to seek to offset 0: Local: Erroneous state"}

In [5]:
read_from_beginning("org.chicago.cta.turnstile.entries.v1", 0, 5)

KafkaException: KafkaError{code=_STATE,val=-172,str="Failed to seek to offset 0: Local: Erroneous state"}

In [6]:
read_from_beginning("org.chicago.cta.weather.v1", 0, 5)

KafkaException: KafkaError{code=_STATE,val=-172,str="Failed to seek to offset 0: Local: Erroneous state"}

In [7]:
read_from_beginning("org.chicago.cta.stations", 0, 5)

KafkaException: KafkaError{code=_STATE,val=-172,str="Failed to seek to offset 0: Local: Erroneous state"}

In [9]:
from confluent_kafka import Consumer, TopicPartition, OFFSET_BEGINNING

def dump_topic_all_partitions(topic, max_msgs_per_part=5, timeout=5.0):
    c = Consumer({
        "bootstrap.servers": "localhost:9092",
        "group.id": f"adhoc-dump-{topic}",
        "enable.auto.commit": False,
        "auto.offset.reset": "earliest",
    })

    # Descubrir particiones del tópico
    md = c.list_topics(topic, timeout=5.0)
    if topic not in md.topics:
        print(f"El tópico no existe: {topic}")
        c.close()
        return
    partitions = sorted(md.topics[topic].partitions.keys())
    print(f"Particiones de {topic}: {partitions}")

    for p in partitions:
        tp = TopicPartition(topic, p, OFFSET_BEGINNING)
        c.assign([tp])

        # MUY IMPORTANTE: permitir que la asignación se aplique
        c.poll(0.1)

        low, high = c.get_watermark_offsets(TopicPartition(topic, p), timeout=5.0)
        print(f"[{topic}][{p}] watermarks low={low} high={high}")

        seen = 0
        while seen < max_msgs_per_part:
            msg = c.poll(timeout)
            if msg is None:
                print(f"[{topic}][{p}] (timeout, no hay más mensajes)")
                break
            if msg.error():
                print(f"[{topic}][{p}] error:", msg.error())
                break
            try:
                print(f"[{topic}][{p}]", msg.value().decode("utf-8"))
            except Exception:
                # Si es Avro binario, no se puede decodificar a UTF-8
                print(f"[{topic}][{p}] (bytes Avro) len=", len(msg.value()))
            seen += 1

        # Desasignar antes de pasar a la siguiente partición
        c.unassign()

    c.close()

In [10]:
dump_topic_all_partitions("org.chicago.cta.station.arrivals.v1", 5)

Particiones de org.chicago.cta.station.arrivals.v1: [0, 1, 2]
[org.chicago.cta.station.arrivals.v1][0] watermarks low=0 high=557
[org.chicago.cta.station.arrivals.v1][0] (bytes Avro) len= 34
[org.chicago.cta.station.arrivals.v1][0] (bytes Avro) len= 34
[org.chicago.cta.station.arrivals.v1][0] (bytes Avro) len= 33
[org.chicago.cta.station.arrivals.v1][0] (bytes Avro) len= 33
[org.chicago.cta.station.arrivals.v1][0] (bytes Avro) len= 35
[org.chicago.cta.station.arrivals.v1][1] watermarks low=0 high=559
[org.chicago.cta.station.arrivals.v1][1] (bytes Avro) len= 34
[org.chicago.cta.station.arrivals.v1][1] (bytes Avro) len= 34
[org.chicago.cta.station.arrivals.v1][1] (bytes Avro) len= 34
[org.chicago.cta.station.arrivals.v1][1] (bytes Avro) len= 34
[org.chicago.cta.station.arrivals.v1][1] (bytes Avro) len= 34
[org.chicago.cta.station.arrivals.v1][2] watermarks low=0 high=534
[org.chicago.cta.station.arrivals.v1][2] (bytes Avro) len= 34
[org.chicago.cta.station.arrivals.v1][2] (bytes Avro) l

In [11]:
dump_topic_all_partitions("org.chicago.cta.turnstile.entries.v1", 5)

Particiones de org.chicago.cta.turnstile.entries.v1: [0, 1, 2]
[org.chicago.cta.turnstile.entries.v1][0] watermarks low=0 high=1673
[org.chicago.cta.turnstile.entries.v1][0] (bytes Avro) len= 20
[org.chicago.cta.turnstile.entries.v1][0] (bytes Avro) len= 28
[org.chicago.cta.turnstile.entries.v1][0] (bytes Avro) len= 35
[org.chicago.cta.turnstile.entries.v1][0] (bytes Avro) len= 21
[org.chicago.cta.turnstile.entries.v1][0] (bytes Avro) len= 21
[org.chicago.cta.turnstile.entries.v1][1] watermarks low=0 high=1822
[org.chicago.cta.turnstile.entries.v1][1] (bytes Avro) len= 28
[org.chicago.cta.turnstile.entries.v1][1] (bytes Avro) len= 22
[org.chicago.cta.turnstile.entries.v1][1] (bytes Avro) len= 22
[org.chicago.cta.turnstile.entries.v1][1] (bytes Avro) len= 24
[org.chicago.cta.turnstile.entries.v1][1] (bytes Avro) len= 24
[org.chicago.cta.turnstile.entries.v1][2] watermarks low=0 high=1867
[org.chicago.cta.turnstile.entries.v1][2] (bytes Avro) len= 20
[org.chicago.cta.turnstile.entries.v1

In [12]:
dump_topic_all_partitions("org.chicago.cta.weather.v1", 5)

Particiones de org.chicago.cta.weather.v1: [0, 1, 2]
[org.chicago.cta.weather.v1][0] watermarks low=0 high=0
[org.chicago.cta.weather.v1][0] (timeout, no hay más mensajes)
[org.chicago.cta.weather.v1][1] watermarks low=0 high=3
[org.chicago.cta.weather.v1][1] (bytes Avro) len= 16
[org.chicago.cta.weather.v1][1] (bytes Avro) len= 15
[org.chicago.cta.weather.v1][1] (timeout, no hay más mensajes)
[org.chicago.cta.weather.v1][2] watermarks low=0 high=2
[org.chicago.cta.weather.v1][2] (bytes Avro) len= 23
[org.chicago.cta.weather.v1][2] (bytes Avro) len= 23
[org.chicago.cta.weather.v1][2] (timeout, no hay más mensajes)


In [13]:
from confluent_kafka.avro import AvroConsumer
from confluent_kafka import TopicPartition, OFFSET_BEGINNING

def dump_avro_all_partitions(topic, max_msgs_per_part=5, timeout=5.0):
    c = AvroConsumer({
        "bootstrap.servers": "localhost:9092",
        "group.id": f"adhoc-avro-{topic}",
        "enable.auto.commit": False,
        "schema.registry.url": "http://localhost:8081",
        "auto.offset.reset": "earliest",
    })
    md = c.list_topics(topic, timeout=5.0)
    parts = sorted(md.topics[topic].partitions.keys())
    print(f"Particiones de {topic}: {parts}")
    for p in parts:
        tp = TopicPartition(topic, p, OFFSET_BEGINNING)
        c.assign([tp])
        c.poll(0.1)  # importante para aplicar la asignación
        low, high = c.get_watermark_offsets(TopicPartition(topic, p), timeout=5.0)
        print(f"[{topic}][{p}] low={low} high={high}")
        seen = 0
        while seen < max_msgs_per_part:
            msg = c.poll(timeout)
            if msg is None:
                print(f"[{topic}][{p}] (timeout)")
                break
            if msg.error():
                print(f"[{topic}][{p}] error:", msg.error())
                break
            print(f"[{topic}][{p}] value=", msg.value())  # <-- dict Avro
            seen += 1
        c.unassign()
    c.close()

In [14]:
dump_avro_all_partitions("org.chicago.cta.station.arrivals.v1", 3)

Particiones de org.chicago.cta.station.arrivals.v1: [0, 1, 2]
[org.chicago.cta.station.arrivals.v1][0] low=0 high=846
[org.chicago.cta.station.arrivals.v1][0] value= {'station_id': 40890, 'train_id': 'BL000', 'direction': 'b', 'line': 'blue', 'train_status': 'in_service', 'prev_station_id': None, 'prev_direction': None}
[org.chicago.cta.station.arrivals.v1][0] value= {'station_id': 41410, 'train_id': 'BL008', 'direction': 'a', 'line': 'blue', 'train_status': 'in_service', 'prev_station_id': None, 'prev_direction': None}
[org.chicago.cta.station.arrivals.v1][0] value= {'station_id': 41420, 'train_id': 'RL002', 'direction': 'b', 'line': 'red', 'train_status': 'in_service', 'prev_station_id': None, 'prev_direction': None}
[org.chicago.cta.station.arrivals.v1][1] low=0 high=889
[org.chicago.cta.station.arrivals.v1][1] value= {'station_id': 40590, 'train_id': 'BL002', 'direction': 'b', 'line': 'blue', 'train_status': 'in_service', 'prev_station_id': None, 'prev_direction': None}
[org.chicag

In [15]:
dump_avro_all_partitions("org.chicago.cta.turnstile.entries.v1", 3)

Particiones de org.chicago.cta.turnstile.entries.v1: [0, 1, 2]
[org.chicago.cta.turnstile.entries.v1][0] low=0 high=2672
[org.chicago.cta.turnstile.entries.v1][0] value= {'station_id': 40750, 'station_name': 'Harlem', 'line': 'blue'}
[org.chicago.cta.turnstile.entries.v1][0] value= {'station_id': 41280, 'station_name': 'Jefferson Park', 'line': 'blue'}
[org.chicago.cta.turnstile.entries.v1][0] value= {'station_id': 40670, 'station_name': "Western/O'Hare Branch", 'line': 'blue'}
[org.chicago.cta.turnstile.entries.v1][1] low=0 high=2935
[org.chicago.cta.turnstile.entries.v1][1] value= {'station_id': 41330, 'station_name': 'Montrose', 'line': 'blue'}
[org.chicago.cta.turnstile.entries.v1][1] value= {'station_id': 41330, 'station_name': 'Montrose', 'line': 'blue'}
[org.chicago.cta.turnstile.entries.v1][1] value= {'station_id': 40570, 'station_name': 'California', 'line': 'blue'}
[org.chicago.cta.turnstile.entries.v1][2] low=0 high=2985
[org.chicago.cta.turnstile.entries.v1][2] value= {'sta

In [16]:
dump_avro_all_partitions("org.chicago.cta.weather.v1", 3)

Particiones de org.chicago.cta.weather.v1: [0, 1, 2]
[org.chicago.cta.weather.v1][0] low=0 high=0
[org.chicago.cta.weather.v1][0] (timeout)
[org.chicago.cta.weather.v1][1] low=0 high=5
[org.chicago.cta.weather.v1][1] value= {'temperature': 68.16808319091797, 'status': 'cloudy'}
[org.chicago.cta.weather.v1][1] value= {'temperature': 67.00574493408203, 'status': 'windy'}
[org.chicago.cta.weather.v1][1] value= {'temperature': 68.1431655883789, 'status': 'sunny'}
[org.chicago.cta.weather.v1][2] low=0 high=3
[org.chicago.cta.weather.v1][2] value= {'temperature': 66.28805541992188, 'status': 'precipitation'}
[org.chicago.cta.weather.v1][2] value= {'temperature': 68.92756652832031, 'status': 'precipitation'}
[org.chicago.cta.weather.v1][2] (timeout)


In [17]:
dump_topic_all_partitions("org.chicago.cta.stations", 5)  # el helper JSON que ya usaste

Particiones de org.chicago.cta.stations: [0]
[org.chicago.cta.stations][0] watermarks low=0 high=230
[org.chicago.cta.stations][0] {"stop_id":30001,"direction_id":"E","stop_name":"Austin (O'Hare-bound)","station_name":"Austin","station_descriptive_name":"Austin (Blue Line)","station_id":40010,"order":29,"red":false,"blue":true,"green":false}
[org.chicago.cta.stations][0] {"stop_id":30002,"direction_id":"W","stop_name":"Austin (Forest Pk-bound)","station_name":"Austin","station_descriptive_name":"Austin (Blue Line)","station_id":40010,"order":29,"red":false,"blue":true,"green":false}
[org.chicago.cta.stations][0] {"stop_id":30003,"direction_id":"E","stop_name":"Harlem (63rd-bound)","station_name":"Harlem/Lake","station_descriptive_name":"Harlem/Lake (Green Line)","station_id":40020,"order":0,"red":false,"blue":false,"green":true}
[org.chicago.cta.stations][0] {"stop_id":30004,"direction_id":"W","stop_name":"Harlem (Terminal arrival)","station_name":"Harlem/Lake","station_descriptive_nam

In [18]:
dump_topic_all_partitions("org.chicago.cta.stations.table.v1", 5)

Particiones de org.chicago.cta.stations.table.v1: [0]
[org.chicago.cta.stations.table.v1][0] watermarks low=0 high=460
[org.chicago.cta.stations.table.v1][0] {"station_id": 40010, "station_name": "Austin", "order": 29, "line": "blue", "__faust": {"ns": "faust_stream.TransformedStation"}}
[org.chicago.cta.stations.table.v1][0] {"station_id": 40010, "station_name": "Austin", "order": 29, "line": "blue", "__faust": {"ns": "faust_stream.TransformedStation"}}
[org.chicago.cta.stations.table.v1][0] {"station_id": 40010, "station_name": "Austin", "order": 29, "line": "blue"}
[org.chicago.cta.stations.table.v1][0] {"station_id": 40010, "station_name": "Austin", "order": 29, "line": "blue"}
[org.chicago.cta.stations.table.v1][0] {"station_id": 40020, "station_name": "Harlem/Lake", "order": 0, "line": "green", "__faust": {"ns": "faust_stream.TransformedStation"}}


In [19]:
dump_topic_all_partitions("TURNSTILE_SUMMARY", 5)

Particiones de TURNSTILE_SUMMARY: [0, 1, 2, 3]
[TURNSTILE_SUMMARY][0] watermarks low=0 high=753
[TURNSTILE_SUMMARY][0] {"STATION_ID":40220,"COUNT":3}
[TURNSTILE_SUMMARY][0] {"STATION_ID":40970,"COUNT":6}
[TURNSTILE_SUMMARY][0] {"STATION_ID":40020,"COUNT":5}
[TURNSTILE_SUMMARY][0] {"STATION_ID":40390,"COUNT":3}
[TURNSTILE_SUMMARY][0] {"STATION_ID":40330,"COUNT":15}
[TURNSTILE_SUMMARY][1] watermarks low=0 high=1206
[TURNSTILE_SUMMARY][1] {"STATION_ID":40180,"COUNT":2}
[TURNSTILE_SUMMARY][1] {"STATION_ID":40760,"COUNT":4}
[TURNSTILE_SUMMARY][1] {"STATION_ID":40900,"COUNT":3}
[TURNSTILE_SUMMARY][1] {"STATION_ID":41280,"COUNT":3}
[TURNSTILE_SUMMARY][1] {"STATION_ID":41080,"COUNT":1}
[TURNSTILE_SUMMARY][2] watermarks low=0 high=1015
[TURNSTILE_SUMMARY][2] {"STATION_ID":40470,"COUNT":2}
[TURNSTILE_SUMMARY][2] {"STATION_ID":41120,"COUNT":1}
[TURNSTILE_SUMMARY][2] {"STATION_ID":40940,"COUNT":1}
[TURNSTILE_SUMMARY][2] {"STATION_ID":41360,"COUNT":3}
[TURNSTILE_SUMMARY][2] {"STATION_ID":41400,"COU