In [3]:
from kafka import KafkaConsumer
import json
from datetime import datetime, timedelta
import requests

# Kafka & Telegram configuration
KAFKA_TOPIC = "parking-spot-status"
KAFKA_BROKER = "localhost:9092"
TELEGRAM_TOKEN = "7906484659:AAHdZBo35AJf1KY7ijMYUSSO_iCsDN4sTpo"
TELEGRAM_CHAT_ID = "6264976855"

# Initialize Kafka consumer
consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=KAFKA_BROKER,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))  # Deserialize JSON messages
)

# Track last known status and changes
last_status = {}             # Last known status for each parking spot
last_change_time = {}        # Timestamp when a status change occurred

# Stability condition: a change must persist for this long to trigger notification
STABILITY_SECONDS = 30

# Send a Telegram message containing the status of all parking spots
def send_telegram_message(status_dict):
    status_msg = "🅿️ Parking Spot Status:\n"
    for i in range(1, 4):
        status_msg += f"• Spot {i}: {status_dict.get(i, 'Unknown')}\n"

    requests.post(
        f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage",
        data={
            "chat_id": TELEGRAM_CHAT_ID,
            "text": status_msg
        }
    )

print("Kafka Telegram Consumer started...")

# Continuously listen to Kafka messages
for msg in consumer:
    data = msg.value  # Get the message payload
    current_time = datetime.now()

    # Convert list of spots to a dictionary: {1: 'Empty', 2: 'Occupied', 3: 'Empty'}
    current_status = {spot["id"]: spot["status"] for spot in data["spots"]}

    need_to_send = False  # Flag to determine if a Telegram message should be sent

    # Check each spot for status changes or sustained stability
    for spot_id, new_status in current_status.items():
        old_status = last_status.get(spot_id)

        if new_status != old_status:
            # Status changed → record the time of change
            last_change_time[spot_id] = current_time
            last_status[spot_id] = new_status
        else:
            # Status remains the same → check how long it's been stable
            if spot_id in last_change_time:
                elapsed = (current_time - last_change_time[spot_id]).total_seconds()
                if elapsed >= STABILITY_SECONDS:
                    print(f"Spot {spot_id} status stable for 30 sec.")
                    last_change_time.pop(spot_id)  # Remove tracking after stability is confirmed
                    need_to_send = True

    # If stability condition is met → send Telegram message
    if need_to_send:
        send_telegram_message(current_status)
        print("Telegram text message sent.")


Kafka Telegram Consumer started...
Spot 1 status stable for 30 sec.
Spot 2 status stable for 30 sec.
Spot 3 status stable for 30 sec.
Telegram text message sent.
Spot 3 status stable for 30 sec.
Telegram text message sent.
Spot 2 status stable for 30 sec.
Telegram text message sent.
Spot 2 status stable for 30 sec.
Telegram text message sent.
Spot 2 status stable for 30 sec.
Telegram text message sent.
Spot 2 status stable for 30 sec.
Telegram text message sent.
Spot 1 status stable for 30 sec.
Telegram text message sent.


KeyboardInterrupt: 