La librería que vamos a usar para la automatización del monitoreo de los datos, es Watchdog. Estuvimos averiguando y es una muy buena herramienta para identificar cambios en el sistema de archivos en tiempo real. Es particularmente útil cuando necesitas realizar acciones automáticas en respuesta a eventos como la creación, modificación, o eliminación de archivos y directorios. Es por eso, que nos decidimos por Watchdog

Para interactual con la base de datos, vamos a usar pyodbc en conjunto con Sqlalchemy 

In [1]:
#Importacion de librerias usadas
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy import exc
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.engine import reflection
from watchdog.events import FileSystemEventHandler
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
import os
import datetime
import time

In [2]:
# Configuración de la cadena de conexión con la base de datos 
server = r'JORGE\SQLEXPRESS'
database = 'DrinkingTeamDB'
username = 'datafusionlatam'
password = 'DAFTHENRY'
driver = '{ODBC Driver 17 for SQL Server}'

try:
    engine = create_engine(f'mssql+pyodbc://{username}:{password}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server')
    conn = engine.connect()
except SQLAlchemyError as e:
    print(f"Error al conectar a la base de datos: {e}")
    exit()

### Codigo automatizado de ingreso de datos

In [3]:
class MyHandler(FileSystemEventHandler):
    def __init__(self, observer, ruta_datos):
        self.observer = observer
        self.ruta_datos = ruta_datos
        self.table_map = {
            r"C:\datos\datos_tiendas.csv": ('Tiendas', 'NumeroTienda'),
            r"C:\datos\datos_proveedores.csv": ('Proveedores', 'Nombre'),
            r"C:\datos\datos_productos.csv": ('Productos', 'Marca')
        }

    def on_modified(self, event):
        if event.src_path.endswith('.csv'):
            print(f"Archivo CSV modificado: {event.src_path}")
            df = pd.read_csv(event.src_path, dtype={self.table_map[event.src_path][1]:float}, header=0)
            tabla, primera_columna = self.table_map.get(event.src_path, (None, None))
            print(f"Procesando la tabla : {tabla}, usando las columna {primera_columna}")
            print(f"Columnas en el DataFrame: {df.columns.tolist()}")

            if tabla and primera_columna:
                self.ingresar_datos(df, tabla, primera_columna)
                # Detenemos el observador después de procesar el archivo
                self.observer.stop()
            else:
                print(f"No se encontró una tabla asociada para {event.src_path}")

    def ingresar_datos(self, df, tabla, primera_columna):
        try:
            # Obtén el último id de ingesta desde la tabla ingestion_control
            query = f"SELECT MAX(last_ingestion_id) AS last_id FROM ingestion_control WHERE table_name = '{tabla}'"
            last_ingestion_id = pd.read_sql(query, conn).iloc[0, 0]
            last_ingestion_id = last_ingestion_id if last_ingestion_id is not None else 0
            print(f"Comparando valores en la columna {primera_columna} con last_ingestion_id: {last_ingestion_id}")
            print(f"Valores en la columna {primera_columna}:\n{df[primera_columna].head()}")


            # Filtra los datos nuevos
            new_data = df[df[primera_columna] > last_ingestion_id].copy()
            print(f"Nuevos datos encontrados para la tabla {tabla}:\n{new_data}")


            # Asegúrate de que se está asignando last_ingestion_id correctamente
            new_data['last_ingestion_id'] = new_data[primera_columna]

            if not new_data.empty:
                # Inserta los nuevos datos en la tabla correspondiente
                new_data.to_sql(tabla, engine, if_exists='append', index=False)

                last_processed_id = int(new_data[primera_columna].max())

                # Inserta el nuevo registro en la tabla ingestion_control
                insert_query = text("""
                    INSERT INTO ingestion_control (last_ingestion_id, created_at, updated_at, table_name)
                    VALUES (:last_ingestion_id, :created_at, :updated_at, :table_name)
                """)
                with engine.begin() as connection:
                    connection.execute(insert_query, {
                        'last_ingestion_id': last_processed_id,
                        'created_at': datetime.datetime.now(),
                        'updated_at': datetime.datetime.now(),
                        'table_name': tabla
                    })
                print("Datos ingresados con éxito en la tabla y en ingestion_control")

            else:
                print("No hay nuevos datos para insertar.")
        except exc.SQLAlchemyError as e:
            print(f"Error durante la ingesta de datos: {e}")

if __name__ == "__main__":
    ruta_datos = r"C:\datos"
    observer = Observer()
    event_handler = MyHandler(observer, ruta_datos)
    observer.schedule(event_handler, ruta_datos, recursive=True)
    observer.start()

    # Espera a que el observador se detenga
    observer.join()
    print("Proceso de monitoreo finalizado.")

Archivo CSV modificado: C:\datos\datos_productos.csv
Procesando la tabla : Productos, usando las columna Marca
Columnas en el DataFrame: ['Marca', 'Nombre', 'Volumen']
Comparando valores en la columna Marca con last_ingestion_id: 80
Valores en la columna Marca:
0    81.0
1    82.0
Name: Marca, dtype: float64
Nuevos datos encontrados para la tabla Productos:
   Marca       Nombre Volumen
0   81.0       Fernet   Litro
1   82.0  Aguardiente   750ml
Datos ingresados con éxito en la tabla y en ingestion_control
Proceso de monitoreo finalizado.
