In [3]:
from kafka import KafkaConsumer
import json
import time

# Configuración del consumidor
consumer = KafkaConsumer(
    'rigcore-data',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='latest',
    enable_auto_commit=True,
    group_id='rigcore-consumer-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print("Esperando mensajes agrupados por tabla cada 3 segundos...")

try:
    while True:
        # Inicializar buffer para este intervalo
        message_buffer = {}

        # Leer todos los mensajes disponibles durante este intervalo
        start_time = time.time()
        while time.time() - start_time < 3:
            raw_msgs = consumer.poll(timeout_ms=500)
            for tp, messages in raw_msgs.items():
                for msg in messages:
                    data = msg.value
                    table = data.get('table')
                    row_data = data.get('data')

                    if table and row_data:
                        message_buffer[table] = {
                            'table': table, 'data': row_data}
                    else:
                        print(f"[WARN] Formato inesperado: {data}")

        # Imprimir todos los mensajes agrupados
        if message_buffer:
            print("\n=== Lote de datos recibido ===")
            for table, full_data in message_buffer.items():
                print(f"[Kafka] Sent from {table}: {full_data}")
            print("==============================\n")

        time.sleep(3)  # Esperar el siguiente ciclo

except KeyboardInterrupt:
    print("\nLectura interrumpida por el usuario.")

finally:
    consumer.close()
    print("Consumer cerrado correctamente.")

Esperando mensajes agrupados por tabla cada 3 segundos...

=== Lote de datos recibido ===
[Kafka] Sent from calculos_config: {'table': 'calculos_config', 'data': {'id': 15, 'id_codigo': 'codigo_01', 'session_timestamp': '2025-06-08T13:00:00', 'nombre': 'suma', 'descripcion': 'Descripción del cálculo1', 'formula': 'var1+var2', 'timestamp_created': '2025-06-15T02:55:35'}}
[Kafka] Sent from calculos_resultado: {'table': 'calculos_resultado', 'data': {'id': 909, 'timestamp': '2025-06-14T19:27:24', 'session_timestamp': '2025-06-08T13:00:00', 'codigo_01': 'gene', 'resultado_01': '0.0000', 'unidad_01': '', 'codigo_02': None, 'resultado_02': None, 'unidad_02': None, 'codigo_03': None, 'resultado_03': None, 'unidad_03': None, 'codigo_04': None, 'resultado_04': None, 'unidad_04': None, 'codigo_05': None, 'resultado_05': None, 'unidad_05': None, 'codigo_06': None, 'resultado_06': None, 'unidad_06': None, 'codigo_07': None, 'resultado_07': None, 'unidad_07': None, 'codigo_08': None, 'resultado_08'

In [1]:
from kafka import KafkaConsumer
import json
import time

# Configuración del consumidor
consumer = KafkaConsumer(
    'rigcore-data',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='latest',
    enable_auto_commit=True,
    group_id='rigcore-consumer-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

print("Escuchando mensajes en el topic 'rigcore-data'...")

try:
    for message in consumer:
        data = message.value
        table = data.get('table')
        row_data = data.get('data')

        if table and row_data:
            print(f"\n=== Mensaje de la tabla: {table} ===")
            for key, value in row_data.items():
                print(f"{key}: {value}")
        else:
            print(f"[WARN] Formato inesperado en el mensaje: {data}")

        time.sleep(1)

except KeyboardInterrupt:
    print("\nLectura interrumpida por el usuario.")

finally:
    consumer.close()
    print("Consumer cerrado correctamente.")

Escuchando mensajes en el topic 'rigcore-data'...

=== Mensaje de la tabla: fgenerador5_data ===
id: 1440
timestamp: 2025-06-01T23:59:00
rig_config_id: 1
session_timestamp: 2025-06-05T00:00:00
channel_01: 98.78
name_1: Canal 1
unit_01: kg/m³
channel_02: 111.17
name_2: Canal 2
unit_02: deg/s
channel_03: 122.77
name_3: Canal 3
unit_03: l
channel_04: 127.67
name_4: Canal 4
unit_04: lb
channel_05: 143.26
name_5: Canal 5
unit_05: ft
channel_06: 151.09
name_6: Canal 6
unit_06: in
channel_07: 157.77
name_7: Canal 7
unit_07: kg
channel_08: 172.0
name_8: Canal 8
unit_08: m
channel_09: 176.8
name_9: Canal 9
unit_09: bar
channel_10: 188.13
name_10: Canal 10
unit_10: in
channel_11: 196.02
name_11: Canal 11
unit_11: l
channel_12: 210.05
name_12: Canal 12
unit_12: kg/m³
channel_13: 223.19
name_13: Canal 13
unit_13: m
channel_14: 233.74
name_14: Canal 14
unit_14: kPa
channel_15: 242.98
name_15: Canal 15
unit_15: kg/m³
channel_16: 252.94
name_16: Canal 16
unit_16: kg/m³
channel_17: 263.1
name_17: Cana

In [7]:
from kafka import KafkaConsumer
import json
import time

# Configurar el consumidor
consumer = KafkaConsumer(
    'rigcore-data',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    enable_auto_commit=True,
    group_id='rigcore-consumer-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)


def main():
    print("Iniciando consumer en formato JSON...")
    try:
        while True:
            start_time = time.time()
            data_batch = {}

            # Recolectar mensajes durante 3 segundos
            while time.time() - start_time < 3:
                raw_msgs = consumer.poll(timeout_ms=100)
                for tp, messages in raw_msgs.items():
                    for message in messages:
                        value = message.value
                        table = value.get("table")
                        data = value.get("data")
                        if table and data:
                            # Reemplaza con la última versión
                            data_batch[table] = data

            # Imprimir el batch completo como JSON
            if data_batch:
                json_output = []
                for table, data in data_batch.items():
                    json_output.append({
                        "table": table,
                        "data": data
                    })

                print(json.dumps(json_output, indent=2, ensure_ascii=False))
            else:
                print("[]")

    except KeyboardInterrupt:
        print("Consumer detenido manualmente.")
    finally:
        consumer.close()


if __name__ == "__main__":
    main()

Iniciando consumer en formato JSON...
[]
[]
[]
[]
[]
[]
[]
[]
Consumer detenido manualmente.
