In [None]:
import time
import json
import requests
from datetime import datetime, timedelta, timezone
from kafka import KafkaProducer

# ------------------------------------------------------------------------------
# 1) Konfiguration
# ------------------------------------------------------------------------------

# Kafka‐Einstellungen
KAFKA_BOOTSTRAP_SERVERS = ["172.29.16.101:9092"]
KAFKA_TOPIC = "current-weather-api"

# OpenWeatherMap‐Einstellungen
API_KEY = "bd5e378503939ddaee76f12ad7a97608"


CITY_IDS = [
    2761369,  # Wien
    2772400,  # Linz
    2778067,  # Graz
    2766824,  # Salzburg
    2775220,  # Innsbruck
    7871497,  # Klagenfurt
    2781503,
    2782045   # Bregenz
    # ... bis zu 20 IDs insgesamt ...
]

POLL_INTERVAL_SECONDS = 10 

MAX_SINGLE_CITY_CALLS_PER_DAY = 960

# Darum: Wir limitieren pro Gruppen‐Call die Anzahl der CITY_IDS auf 10.
MAX_IDS_PER_CALL = 10

# ------------------------------------------------------------------------------
# 2) KafkaProducer initialisieren
# ------------------------------------------------------------------------------
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8")
)

# ------------------------------------------------------------------------------
# 3) Tageszähler‐Logik
# ------------------------------------------------------------------------------
# Wir tracken, wie viele Einzel‐Stadt‐Abfragen wir heute bereits gemacht haben.
# Bei jedem Gruppen‐Call wird `current_batch_ids_count` = Anzahl der IDs in diesem Call
# zu `daily_city_calls` addiert. Wenn `daily_city_calls` + next_batch_size > MAX_SINGLE_CITY_CALLS_PER_DAY,
# brechen wir ab.

daily_city_calls = 0

# Wir merken uns, an welchem UTC‐Datum wir gestartet sind. Um Mitternacht UTC zurücksetzen.
current_day_utc = datetime.now(timezone.utc).date()

# ------------------------------------------------------------------------------
# 4) Polling‐Schleife
# ------------------------------------------------------------------------------
try:
    while True:
        now_utc = datetime.now(timezone.utc)

        # 4.1) Tageswechsel prüfen (UTC‐Mitternacht): Zähler zurücksetzen
        if now_utc.date() != current_day_utc:
            daily_city_calls = 0
            current_day_utc = now_utc.date()
            print(f"[{now_utc.isoformat()}] Neuer Tag (UTC) – Tageszählung zurückgesetzt.")

        # 4.2) Bestimme die nächste Teilmenge von CITY_IDS (maximal MAX_IDS_PER_CALL)
        ids_to_call = CITY_IDS[:MAX_IDS_PER_CALL]

        # Wenn wir mit dem nächsten Gruppen‐Call das Tageslimit überschreiten, abbrechen
        if daily_city_calls + len(ids_to_call) > MAX_SINGLE_CITY_CALLS_PER_DAY:
            print(
                f"[{now_utc.isoformat()}] Tageslimit von "
                f"{MAX_SINGLE_CITY_CALLS_PER_DAY} Stadt‐Abfragen erreicht oder überschritten. "
                "Beende Producer."
            )
            break

        # 4.3) API‐Call: Gruppened-Endpoint mit bis zu MAX_IDS_PER_CALL IDs
        ids_param = ",".join(str(i) for i in ids_to_call)
        url = (
            f"https://api.openweathermap.org/data/2.5/group"
            f"?id={ids_param}"
            f"&units=metric"
            f"&appid={API_KEY}"
        )

        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            data = response.json()
        except Exception as e:
            print(f"[{now_utc.isoformat()}] Fehler bei API‐Call: {e}")
            # Optional: Warte trotzdem Poll‐Intervall ab und versuche später erneut
            time.sleep(POLL_INTERVAL_SECONDS)
            continue

        # 4.4) Parsen und in Kafka‐Topic pushen
        # Die Antwort hat Feld "list", das eine Liste von City‐Objekten enthält
        if "list" not in data:
            print(f"[{now_utc.isoformat()}] Unerwartete API‐Antwort (kein 'list'): {data}")
            time.sleep(POLL_INTERVAL_SECONDS)
            continue

        for city_obj in data["list"]:
            # Beispiel‐Struktur der city_obj:
            # {
            #   "id": 2761369,
            #   "name": "Vienna",
            #   "coord": {"lat":48.2082,"lon":16.3738},
            #   "main": {"temp":22.5,"humidity":60,"temp_min":21.0,"temp_max":23.0,...},
            #   "wind": {"speed":3.1,"deg":250,...}, ...
            #   "dt": 1622548800,  # Zeitstempel UTC in Sekunden
            #   ...
            # }

            # Wir bauen unsere eigene JSON‐Nachricht
            msg = {
                "city_id":          city_obj.get("id"),
                "city_name":        city_obj.get("name"),
                "latitude":         city_obj.get("coord", {}).get("lat"),
                "longitude":        city_obj.get("coord", {}).get("lon"),
                "timestamp_utc":    datetime.utcfromtimestamp(city_obj.get("dt"))
                                      .strftime("%Y-%m-%d %H:%M:%S"),
                "temp_celsius":     city_obj.get("main", {}).get("temp"),
                "temp_min_c":       city_obj.get("main", {}).get("temp_min"),
                "temp_max_c":       city_obj.get("main", {}).get("temp_max"),
                "pressure_hpa":     city_obj.get("main", {}).get("pressure"),
                "humidity_pct":     city_obj.get("main", {}).get("humidity"),
                "wind_speed_kph":   (city_obj.get("wind", {}).get("speed") * 3.6) 
                                      if city_obj.get("wind", {}).get("speed") is not None 
                                      else None,
                "wind_direction":   city_obj.get("wind", {}).get("deg")
            }

            # Testausgabe in der Konsole
            print(json.dumps(msg, indent=2))  # oder einfach: print(msg)
            
            # Send an Kafka
            producer.send(KAFKA_TOPIC, msg)

        # 4.5) Flush to ensure publishing
        producer.flush()

        # 4.6) Tageszähler aktualisieren
        daily_city_calls += len(ids_to_call)
        print(
            f"[{now_utc.isoformat()}] Gruppen‐Call mit {len(ids_to_call)} IDs ausgeführt. "
            f"Tages‐Summe Einzel‐Abfragen: {daily_city_calls}"
        )

        # 4.7) Warte für das nächste Polling‐Intervall
        time.sleep(POLL_INTERVAL_SECONDS)

finally:
    # 4.8) Graceful Shutdown: Kafka‐Producer schließen
    producer.close()
    print("Producer geschlossen. Skript beendet.")

In [None]:
print("🟡 Start")

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

print("🟡 Start")

# ----------------------------------------------------------------------------
# 1) SparkSession erstellen
# ----------------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("KafkaToPostgresStream") \
    .master("local[*]") \
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,"
            "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")  # oder "INFO"

print("✅ Schritt 1: SparkSession erstellt")

# ----------------------------------------------------------------------------
# 2) JSON-Schema
# ----------------------------------------------------------------------------
schema = StructType([
    StructField("city_id",        StringType(), nullable=False),
    StructField("city_name",      StringType(), nullable=False),
    StructField("latitude",       DoubleType(), nullable=True),
    StructField("longitude",      DoubleType(), nullable=True),
    StructField("timestamp_utc",  StringType(), nullable=False),
    StructField("temp_celsius",   DoubleType(), nullable=True),
    StructField("temp_min_c",     DoubleType(), nullable=True),
    StructField("temp_max_c",     DoubleType(), nullable=True),
    StructField("pressure_hpa",   DoubleType(), nullable=True),
    StructField("humidity_pct",   DoubleType(), nullable=True),
    StructField("wind_speed_kph", DoubleType(), nullable=True),
    StructField("wind_direction", DoubleType(), nullable=True)
])

print("✅ Schritt 2: JSON-Schema definiert")

# ----------------------------------------------------------------------------
# 3) Kafka-Stream lesen
# ----------------------------------------------------------------------------
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "172.29.16.101:9092") \
    .option("subscribe", "current-weather-api") \
    .option("startingOffsets", "earliest") \
    .load()

print("✅ Schritt 3: Kafka-Stream wird gelesen")

df_str = df_raw.selectExpr("CAST(value AS STRING) AS json_str")
df_json = df_str.select(from_json(col("json_str"), schema).alias("data")).select("data.*")

print("✅ Schritt 4: Kafka-Daten in DataFrame mit Schema umgewandelt")

# Timestamp umwandeln (optional für spätere Verarbeitung)
df = df_json.withColumn(
    "ts", to_timestamp(col("timestamp_utc"), "yyyy-MM-dd HH:mm:ss")
).drop("timestamp_utc")

print("✅ Schritt 5: Timestamp konvertiert")

df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "172.29.16.101:9092") \
    .option("subscribe", "current-weather-api") \
    .option("startingOffsets", "earliest") \
    .load()

print("▶️ Starte Ausgabe des Kafka-Streams in der Konsole:")

df_raw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .option("truncate", False) \
    .start() \
    .awaitTermination()

In [None]:
# ----------------------------------------------------------------------------
# 4) In PostgreSQL schreiben (via foreachBatch)
# ----------------------------------------------------------------------------
def write_to_postgres(batch_df, batch_id):
    batch_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/weather_db") \
        .option("dbtable", "weather_api") \
        .option("user", "bdeng") \
        .option("password", "bdengpass") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()

query = df.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("truncate", False) \
    .start()

query.awaitTermination()