In [1]:
# Importamos las bibliotecas necesarias
import pandas as pd
from sqlalchemy import create_engine, exc, text
import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

In [2]:
# Configuración de la cadena de conexión con la base de datos 
server = r'JORGE\SQLEXPRESS'
database = 'DrinkingTeamDB'
username = 'datafusionlatam'
password = 'DAFTHENRY'
driver = '{ODBC Driver 17 for SQL Server}'

try:
    engine = create_engine(f'mssql+pyodbc://{username}:{password}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server')
    conn = engine.connect()
except SQLAlchemyError as e:
    print(f"Error al conectar a la base de datos: {e}")
    exit()

### Codigo automatizado de ingreso de datos

In [3]:
# Definimos una clase que maneja los eventos de modificación en el sistema de archivos
class MyHandler(FileSystemEventHandler):
    def __init__(self, observer, ruta_datos):
        self.observer = observer
        self.ruta_datos = ruta_datos
        # Mapa que asocia archivos CSV con la tabla de base de datos correspondiente y la columna a utilizar para la comparación
        self.table_map = {
            r"C:\datos\datos_tiendas.csv": ('Tiendas', 'NumeroTienda'),
            r"C:\datos\datos_proveedores.csv": ('Proveedores', 'Nombre'),
            r"C:\datos\datos_productos.csv": ('Productos', 'Marca')
        }

    # Método que se ejecuta cuando un archivo es modificado
    def on_modified(self, event):
        # Verifica si el archivo modificado es un archivo CSV
        if event.src_path.endswith('.csv'):
            print(f"Archivo CSV modificado: {event.src_path}")
            # Lee el archivo CSV modificado en un DataFrame de pandas
            df = pd.read_csv(event.src_path, dtype={self.table_map[event.src_path][1]: float}, header=0)
            # Obtiene el nombre de la tabla y la columna principal desde el mapa de tablas
            tabla, primera_columna = self.table_map.get(event.src_path, (None, None))
            print(f"Procesando la tabla : {tabla}, usando las columna {primera_columna}")
            print(f"Columnas en el DataFrame: {df.columns.tolist()}")

            # Si se encuentra una tabla y una columna asociada
            if tabla and primera_columna:
                # Llama al método para ingresar los datos en la base de datos
                self.ingresar_datos(df, tabla, primera_columna)
                # Detiene el observador después de procesar el archivo
                self.observer.stop()
            else:
                print(f"No se encontró una tabla asociada para {event.src_path}")

    # Método para ingresar los datos nuevos en la base de datos
    def ingresar_datos(self, df, tabla, primera_columna):
        try:
            # Obtiene el último id de ingesta desde la tabla ingestion_control
            query = f"SELECT MAX(last_ingestion_id) AS last_id FROM ingestion_control WHERE table_name = '{tabla}'"
            last_ingestion_id = pd.read_sql(query, conn).iloc[0, 0]
            last_ingestion_id = last_ingestion_id if last_ingestion_id is not None else 0
            print(f"Comparando valores en la columna {primera_columna} con last_ingestion_id: {last_ingestion_id}")
            print(f"Valores en la columna {primera_columna}:\n{df[primera_columna].head()}")

            # Filtra los datos nuevos comparando con el último id de ingesta
            new_data = df[df[primera_columna] > last_ingestion_id].copy()
            print(f"Nuevos datos encontrados para la tabla {tabla}:\n{new_data}")

            # Asigna el valor de last_ingestion_id a los nuevos datos
            new_data['last_ingestion_id'] = new_data[primera_columna]

            # Si hay nuevos datos, se insertan en la base de datos
            if not new_data.empty:
                # Inserta los nuevos datos en la tabla correspondiente
                new_data.to_sql(tabla, engine, if_exists='append', index=False)

                # Obtiene el máximo id procesado en los nuevos datos
                last_processed_id = int(new_data[primera_columna].max())

                # Inserta el nuevo registro en la tabla ingestion_control
                insert_query = text("""
                    INSERT INTO ingestion_control (last_ingestion_id, created_at, updated_at, table_name)
                    VALUES (:last_ingestion_id, :created_at, :updated_at, :table_name)
                """)
                # Ejecuta la inserción en la base de datos dentro de una transacción
                with engine.begin() as connection:
                    connection.execute(insert_query, {
                        'last_ingestion_id': last_processed_id,
                        'created_at': datetime.datetime.now(),
                        'updated_at': datetime.datetime.now(),
                        'table_name': tabla
                    })
                print("Datos ingresados con éxito en la tabla y en ingestion_control")

            else:
                # Si no hay datos nuevos, imprime un mensaje indicando que no hay nada para insertar
                print("No hay nuevos datos para insertar.")
        except exc.SQLAlchemyError as e:
            # Captura cualquier error de SQLAlchemy y lo imprime
            print(f"Error durante la ingesta de datos: {e}")

# Bloque principal de ejecución del script
if __name__ == "__main__":
    # Define la ruta donde se monitorearán los archivos
    ruta_datos = r"C:\datos"
    # Crea una instancia del observador de archivos
    observer = Observer()
    # Asocia el manejador de eventos con el observador
    event_handler = MyHandler(observer, ruta_datos)
    # Programa el observador para monitorear la ruta definida y sus subdirectorios
    observer.schedule(event_handler, ruta_datos, recursive=True)
    # Inicia el observador
    observer.start()

    # Mantiene el script en ejecución hasta que el observador sea detenido
    observer.join()
    print("Proceso de monitoreo finalizado.")


Archivo CSV modificado: C:\datos\datos_tiendas.csv
Procesando la tabla : Tiendas, usando las columna NumeroTienda
Columnas en el DataFrame: ['NumeroTienda', 'Ciudad']
Comparando valores en la columna NumeroTienda con last_ingestion_id: 95
Valores en la columna NumeroTienda:
0    91.0
1    92.0
2    93.0
3    94.0
4    95.0
Name: NumeroTienda, dtype: float64
Nuevos datos encontrados para la tabla Tiendas:
Empty DataFrame
Columns: [NumeroTienda, Ciudad]
Index: []
No hay nuevos datos para insertar.
Proceso de monitoreo finalizado.
