In [None]:
from kafka import KafkaConsumer
import json
import pandas as pd
import os

TOPIC = "hydro2"
BROKER = "localhost:9092"
CSV_FILE = "hydro2_dane.csv"

# Przygotuj plik CSV jeśli nie istnieje
if not os.path.isfile(CSV_FILE):
    df = pd.DataFrame()
    df.to_csv(CSV_FILE, index=False, encoding='utf-8')

# Konsument Kafka
consumer = KafkaConsumer(
    TOPIC,
    bootstrap_servers=[BROKER],
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    auto_offset_reset='earliest',
    enable_auto_commit=True
)

print("▶️ Oczekiwanie na dane z Kafka...")

try:
    for message in consumer:
        record = message.value
        print("📥 Otrzymano rekord:", record)

        # Zapisz rekord do CSV
        df = pd.DataFrame([record])
        df.to_csv(CSV_FILE, mode='a', index=False, header=not os.path.isfile(CSV_FILE) or os.stat(CSV_FILE).st_size == 0, encoding='utf-8')
except KeyboardInterrupt:
    print("🛑 Zatrzymano konsumenta.")
finally:
    consumer.close()


▶️ Oczekiwanie na dane z Kafka...
📥 Otrzymano rekord: {'kod_stacji': '150160330', 'nazwa_stacji': 'SZCZYTNA', 'lon': '16.443056', 'lat': '50.415556', 'stan': '133', 'stan_data': '2025-05-05 19:20:00', 'przelyw': None, 'przeplyw_data': None}
📥 Otrzymano rekord: {'kod_stacji': '150160340', 'nazwa_stacji': 'SARNY', 'lon': '16.465833', 'lat': '50.547778', 'stan': '74', 'stan_data': '2025-05-05 19:20:00', 'przelyw': None, 'przeplyw_data': None}
📥 Otrzymano rekord: {'kod_stacji': '150160350', 'nazwa_stacji': 'SZALEJÓW GÓRNY', 'lon': '16.537222', 'lat': '50.418333', 'stan': '118', 'stan_data': '2025-05-05 19:20:00', 'przelyw': None, 'przeplyw_data': None}
📥 Otrzymano rekord: {'kod_stacji': '150160360', 'nazwa_stacji': 'STARKÓW', 'lon': '16.58', 'lat': '50.3775', 'stan': '154', 'stan_data': '2025-05-05 19:20:00', 'przelyw': None, 'przeplyw_data': None}
📥 Otrzymano rekord: {'kod_stacji': '150160370', 'nazwa_stacji': 'TOPOLICE', 'lon': '16.609167', 'lat': '50.366944', 'stan': '140', 'stan_data':