In [None]:
import paho.mqtt.client as mqtt
from pymongo import MongoClient
from datetime import datetime
import json
import signal
import sys
import logging

# Basic logging configuration
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

# Configuration
CONFIG = {
    "MQTT": {
        "BROKER_URL": "broker.hivemq.com",
        "PORT": 1883,
        "TOPIC": "RIN",  # Changed to match ESP32 topic
        "KEEPALIVE": 60
    },
    "MONGODB": {
        "URI": "mongodb://localhost:27017",
        "DB_NAME": "TrabajoRIN",
        "COLLECTION": "parking_events"  # Single collection for all events
    }
}

class MQTTToMongoDBService:
    def __init__(self):
        self.mongo_client = None
        self.db = None
        self.collection = None
        self.mqtt_client = None
        self.setup_signal_handlers()

    def setup_signal_handlers(self):
        signal.signal(signal.SIGINT, self.handle_shutdown)
        signal.signal(signal.SIGTERM, self.handle_shutdown)

    def handle_shutdown(self, signum, frame):
        logger.info("Received termination signal. Closing connections...")
        if self.mqtt_client:
            self.mqtt_client.disconnect()
        if self.mongo_client:
            self.mongo_client.close()
        logger.info("Connections closed. Exiting...")
        sys.exit(0)

    def store_event_data(self, data):
        try:
            # Create document with additional metadata
            doc = {
                "event_type": data.get("evento"),  # "entrada" or "salida"
                "event_time": data.get("hora"),   # Time from ESP32
                "total_cars": data.get("totalCoches"),
                "entry_distance": data.get("distanciaEntrada"),
                "exit_distance": data.get("distanciaSalida"),
                "received_at": datetime.now(),    # Server reception time
                "source": "ESP32_RIN"              # Identifier for data source
            }
            
            result = self.collection.insert_one(doc)
            logger.info(f"Event stored - Type: {doc['event_type']}, Cars: {doc['total_cars']}")
            return result.inserted_id
            
        except Exception as e:
            logger.error(f"Error storing event: {str(e)}")
            raise

    def on_mqtt_connect(self, client, userdata, flags, rc):
        if rc == 0:
            logger.info("Connected to MQTT broker")
            client.subscribe(CONFIG["MQTT"]["TOPIC"])
        else:
            logger.error(f"MQTT connection error. Code: {rc}")

    def on_mqtt_message(self, client, userdata, msg):
        try:
            # Parse the JSON message
            data = json.loads(msg.payload.decode())
            logger.info(f"Received message: {data}")
            
            # Store the event data in MongoDB
            self.store_event_data(data)

        except json.JSONDecodeError as e:
            logger.error(f"JSON decode error: {str(e)}")
        except Exception as e:
            logger.error(f"Unexpected error: {str(e)}")

    def on_mqtt_disconnect(self, client, userdata, rc):
        if rc != 0:
            logger.warning(f"Unexpected MQTT disconnection. Reconnecting... Code: {rc}")

    def connect_to_mongodb(self):
        try:
            self.mongo_client = MongoClient(CONFIG["MONGODB"]["URI"])
            self.db = self.mongo_client[CONFIG["MONGODB"]["DB_NAME"]]
            self.collection = self.db[CONFIG["MONGODB"]["COLLECTION"]]
            
            # Test the connection
            self.db.command("ping")
            logger.info("Connected to MongoDB")
            return True
            
        except Exception as e:
            logger.error(f"Error connecting to MongoDB: {str(e)}")
            return False

    def setup_mqtt_client(self):
        self.mqtt_client = mqtt.Client()
        self.mqtt_client.on_connect = self.on_mqtt_connect
        self.mqtt_client.on_message = self.on_mqtt_message
        self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
        
        try:
            self.mqtt_client.connect(
                CONFIG["MQTT"]["BROKER_URL"],
                port=CONFIG["MQTT"]["PORT"],
                keepalive=CONFIG["MQTT"]["KEEPALIVE"]
            )
            return True
        except Exception as e:
            logger.error(f"Error connecting to MQTT: {str(e)}")
            return False

    def run(self):
        if not self.connect_to_mongodb():
            logger.error("Could not connect to MongoDB. Exiting...")
            sys.exit(1)
            
        if not self.setup_mqtt_client():
            logger.error("Could not connect to MQTT. Exiting...")
            sys.exit(1)
            
        logger.info("Service started. Waiting for messages...")
        self.mqtt_client.loop_forever()

if __name__ == "__main__":
    service = MQTTToMongoDBService()
    try:
        service.run()
    except KeyboardInterrupt:
        logger.info("Service stopped by user")
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        sys.exit(1)

2025-05-27 15:14:58,179 - INFO - Connected to MongoDB
2025-05-27 15:14:58,248 - INFO - Service started. Waiting for messages...
2025-05-27 15:14:58,287 - INFO - Connected to MQTT broker
2025-05-27 15:15:02,850 - INFO - Received message: {'evento': 'entrada', 'hora': '15:15:03', 'totalCoches': 2, 'distanciaEntrada': 5, 'distanciaSalida': 17}
2025-05-27 15:15:02,852 - INFO - Event stored - Type: entrada, Cars: 2
2025-05-27 15:15:08,898 - INFO - Received message: {'evento': 'entrada', 'hora': '15:15:09', 'totalCoches': 3, 'distanciaEntrada': 6, 'distanciaSalida': 17}
2025-05-27 15:15:08,900 - INFO - Event stored - Type: entrada, Cars: 3
2025-05-27 15:15:16,993 - INFO - Received message: {'evento': 'entrada', 'hora': '15:15:17', 'totalCoches': 4, 'distanciaEntrada': 5, 'distanciaSalida': 17}
2025-05-27 15:15:16,995 - INFO - Event stored - Type: entrada, Cars: 4
2025-05-27 15:15:24,041 - INFO - Received message: {'evento': 'entrada', 'hora': '15:15:24', 'totalCoches': 5, 'distanciaEntrada':

[WinError 10054] Se ha forzado la interrupción de una conexión existente por el host remoto


2025-05-27 15:55:52,358 - INFO - Connected to MQTT broker
