In [None]:
#!/usr/bin/env python
# coding: utf-8
import pandas as pd
import time
from datetime import datetime, timedelta, date
import sqlalchemy
import logging.handlers
import re
import os
import sys
import pathlib
import argparse
from typing import List, Optional, Dict, Union, Tuple
from IPython import get_ipython
from pathlib import Path
from contextlib import contextmanager
import gc
import json
from enum import Enum, auto

def charger_config(fichier_config: str) -> dict:
    try:
        with open(fichier_config, 'r', encoding='utf-8') as fichier:
            contenu = fichier.read()            
            return json.loads(contenu)        
    except json.JSONDecodeError as e:
        logger_info.error(f"Erreur de décodage JSON dans le fichier de configuration : {str(e)}")
        raise
    except FileNotFoundError:
        logger_info.error(f"Le fichier de configuration '{fichier_config}' n'a pas été trouvé")
        raise
    except PermissionError:
        logger_info.error(f"Permissions insuffisantes pour lire le fichier de configuration '{fichier_config}'")
        raise
    except Exception as e:
        logger_info.error(f"Erreur inattendue lors de la lecture du fichier de configuration : {str(e)}")
        raise    
########################################
# Constantes d'environnement à adapter
########################################

LOGLEVEL: int = logging.INFO

# Chargement du fichier de configuration
config = charger_config('config.json')

# Chemins des fichiers et logs
SQLPATH: Path = Path(config['paths']['sql_path'])
LOGPATH: Path = Path(config['paths']['log_path'])
LOGFILE: Path = Path(config['paths']['name_log_path'])
    
# Chemin odbc.ini
ODBC_INI: Path = Path('../.odbc.ini')  

# Chaînes de connexion ODBC
ODBC_STRING_R: str = "DTA_lecture"
ODBC_STRING_W: str = "DTA_lecture"

# Structure des tables de la BDD prod
SCHEMA: str = config['connexion']['schema_preprod']

SQL_MOIS: str = 'indic_mois_kpi'

# Colonnes attendues dans le résultat des requêtes SQL
SQL_FORMAT_LABELS: List[str] = config['module']['sql_format_labels']

# Fichiers flag pour ne pas recalculer les données journalières et mensuelles
FLAG_FILE_DAILY: Path = Path(config['paths']['flag_file_daily'])
FLAG_FILE_MONTHLY: Path = Path(config['paths']['flag_file_monthly'])

########################################
# Configuration du logger
########################################

logging.basicConfig() 
formatter_info = logging.Formatter(config['logging']['formatters']['default']['format'])

logger_info = logging.getLogger("hok")
fh = logging.handlers.TimedRotatingFileHandler(LOGPATH / LOGFILE
                                               , config['logging']['handlers']['file']['when']
                                               , config['logging']['handlers']['file']['interval']
                                               , config['logging']['handlers']['file']['backupCount'])

logger_info.setLevel(LOGLEVEL)
fh.setFormatter(formatter_info)
logger_info.addHandler(fh)

########################################
# Décorateur 
########################################
def log_execution_time(func):
    """Récupère le temps d'execution d'un traitement."""
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        logger_info.info(f"Exécution de {func.__name__} terminée en {execution_time:.2f} secondes")
        return result
    return wrapper

# Fonction permettant la gestion des données avec df
# Une fois le traitement terminé, df sera automatiquement supprimé
@contextmanager
def manage_dataframe(df: pd.DataFrame):
    try:
        yield df
    finally:
        del df
        gc.collect()
        
########################################
# Classes
########################################

class HistorisationError:
    def __init__(self):
        self.error_messages: List[str] = []
        self.total_exitcode: int = 0

    def add_error(self, message: str) -> None:
        self.error_messages.append(message)
        self.total_exitcode += 1

    def get_summary(self) -> Tuple[List[str], int]:        
        return self.error_messages, self.total_exitcode

    def get_str(self) -> str:        
        return f"Nombre total d'erreurs : {self.total_exitcode}\nMessages d'erreur :\n" + \
               "\n".join(f"- {msg}" for msg in self.error_messages)
        
class Connection_DB:
    def __init__(self, user: str, password: str, host: str, port: int, bdd: str, historization_error: HistorisationError ):
        try:           
            # Utilisez les arguments s'ils sont fournis, sinon utilisez les valeurs par défaut           
            self.user =  user 
            self.password =  password
            self.host =  host 
            self.port =  port 
            self.bdd =  bdd   
            self.historization_error = historization_error        
            self.connection_string = f'postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.bdd}'    
            self.engine = sqlalchemy.create_engine(self.connection_string)
            self.connection : Optional[sqlalchemy.engine.base.Connection] = None
            logger_info.info(f"Initialisation réussie de Connection_DB")            
        except Exception as e:            
            error_msg =f"Erreur lors de la connexion : {str(e)}"
            logger_info.error(error_msg)            
            raise self.historization_error.add_error(error_msg)
            
    def connect(self)-> sqlalchemy.engine.base.Connection:
        """Établit une connexion à la base de données."""
        try:
            if not self.connection or self.connection.closed:
                self.connection = self.engine.connect()
            return self.connection
        except Exception as e:
            error_msg =f"Erreur lors de la connexion avec la bdd : {str(e)}. Le script s'arrete."
            logger_info.error(error_msg)                       
            raise self.historization_error.add_error(error_msg)
            SystemExit(self.historization_error.get_str())
            
                
    def disconnect(self)-> None:
        """Ferme la connexion à la base de données."""
        try:
            if self.connection and not self.connection.closed:
                self.connection.close()
            self.engine.dispose()
            logger_info.info(f"Connexion fermée avec succès")
        except Exception as e:
            error_msg =f"Erreur lors de la déconnexion avec la bdd : {str(e)}"
            logger_info.error(error_msg)                       
            raise self.historization_error.add_error(error_msg)

class DatabaseAccess:
    """Classe pour accéder et manipuler la base de données."""
    def __init__(self, historization_error: HistorisationError, user: str, password: str, host: str, port: int, bdd: str ):  
        self.historization_error = historization_error
        self.read = Connection_DB(user, password, host, port, bdd, self.historization_error)
        self.write = Connection_DB(user, password, host, port, bdd, self.historization_error)
        
    def disconnect(self) -> None:
        """Ferme toutes les connexions à la base de données."""
        self.read.disconnect()
        self.write.disconnect()
        
    @log_execution_time
    def read_sql_query(self, sql) -> pd.DataFrame:
        """Exécute une requête SQL en lecture."""       
        try:            
            query = sqlalchemy.text(sql)
            with self.read.connect() as conn:                
                return pd.read_sql(query, conn)
        except Exception as e:
            error_msg =f"Erreur lors de l'exécution de la requête SQL en lecture : {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error.add_error(error_msg)
            
    @log_execution_time
    def execute_query(self, sql: str):# -> Result:
        """Exécute une requête SQL en écriture."""
        try:   
            with self.write.connect() as conn:
                return conn.execute(sql)
        except Exception as e:
            error_msg =f"Erreur lors de l'exécution de la requête SQL en écriture : {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error.add_error(error_msg)
            
    @log_execution_time        
    def read_sql_query_file(self, sqlfilename : Path) -> pd.DataFrame:
        """Lit et exécute une requête SQL depuis un fichier."""
        try: 
            with sqlfilename.open('r', encoding='utf-8') as fd:
                requete = fd.read()
            if not isinstance(requete, str):
                raise ValueError(f"Le contenu du fichier {sqlfilename} n'est pas une chaîne de caractères valide.")           
            return self.read_sql_query(requete)   
        except Exception as e:
            error_msg =f"Erreur lors de la lecture et exécution du fichier SQL {sqlfilename}: {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error.add_error(error_msg)

    @log_execution_time        
    def insert_dataframe(self, df_out: pd.DataFrame, table: str, if_exists: str = 'append') -> None:        
        """Insère un DataFrame dans une table de la base de données."""
        try:
            df_out.to_sql(table, self.write.engine, schema=SCHEMA, index=False, if_exists=if_exists)
            logger_info.info(f"Insertion réussie de {len(df_out)} lignes dans {SCHEMA}.{table}")
        except Exception as e:           
            error_msg =f"Erreur lors de l'insertion d'un DataFrame dans {SCHEMA}.{table} : {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error.add_error(error_msg)

    @log_execution_time        
    def delete_day_data(self, date: Union[str, datetime, date]) -> None:
        """Supprime les données d'un jour spécifique."""
        try:
            query = f"DELETE FROM {SCHEMA}.{Tables.JOURS} WHERE date = '{date}'"
            self.execute_query(query)
            logger_info.info(f"Données du {date} supprimées de la table {Tables.JOURS}")
        except Exception as e:            
            error_msg =f"Erreur lors de la suppression des données du jour : {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error.add_error(error_msg)

    @log_execution_time        
    def delete_month_data(self, year: int, month: int) -> None:
        """Supprime les données d'un mois spécifique."""
        try:
            query = f"DELETE FROM {SCHEMA}.{Tables.MOIS} WHERE EXTRACT(YEAR FROM date) = {year} AND EXTRACT(MONTH FROM date) = {month}"
            self.execute_query(query)
            logger_info.info(f"Données du mois {month}/{year} supprimées de la table {Tables.MOIS}")            
        except Exception as e:
            error_msg =f"Erreur lors de la suppression des données du mois : {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error.add_error(error_msg)
            
    @log_execution_time
    def delete_yesterday_data(self) -> None:
        """Supprime les données de la veille."""
        yesterday = (datetime.now() - timedelta(days=1)).date()
        self.delete_day_data(yesterday)
        
class FlagType(Enum):
    DAILY = "day"
    MONTHLY = "month"

class FlagAction(Enum):
    CHECK = "check"
    CREATE = "create"
    REMOVE = "remove"   
    
class FlagManager:
    """Gère les flags pour l'historisation des KPI"""
    def __init__(self,historization_error: HistorisationError):        
        self.historization_error = historization_error

    def manage_flag(self, flag_path: Path, action: FlagAction, flag_type: FlagType = FlagType.DAILY) -> bool:
        """Gère les opérations sur les flags (vérification, création, suppression)"""
        try:
            flag_file = flag_path.with_suffix('.flag')
            
            if action == FlagAction.CHECK:
                return self._check_flag(flag_file, flag_type)
            elif action == FlagAction.CREATE:
                return self._create_flag(flag_file)
            elif action == FlagAction.REMOVE:
                return self._remove_flag(flag_file)            
        except Exception as e:
            error_msg = f"Erreur lors de l'action {action.value} sur le flag {flag_path}: {str(e)}"
            logger_info.error(error_msg)
            self.historization_error.add_error(error_msg)
            return False
        
    def _check_flag(self, flag_file: Path, flag_type: FlagType) -> bool:
        """Vérifie l'existence et la validité du flag"""
        logger_info.info(f"Vérification de l'existence du flag : {flag_file}")
        
        if not flag_file.exists():
            return False

        with open(flag_file, 'r', encoding='utf-8') as f:
            flag_date = date.fromisoformat(f.read().strip())
        
        today = date.today()
        
        if flag_type == FlagType.MONTHLY:
            return flag_date.replace(day=1) == today.replace(day=1)
        return flag_date == today

    def _create_flag(self, flag_file: Path) -> bool:
        """Crée le flag avec la date du jour"""
        with open(flag_file, 'w') as f:
            f.write(date.today().isoformat())
        logger_info.info(f"Flag créé : {flag_file}")
        return True

    def _remove_flag(self, flag_file: Path) -> bool:
        """Supprime le flag s'il existe"""
        if flag_file.exists():
            flag_file.unlink()
            logger_info.info(f"Flag supprimé : {flag_file}")
        return True
        
class BaseTable:
    """Classe de base pour les opérations sur les tables."""
    def __init__(self, table_name: str, db_connection: DatabaseAccess, historization_error: HistorisationError):
        self.schema = SCHEMA
        self.table_name = table_name
        self.db_connection = db_connection
        self.historization_error = historization_error
        
    def get_data(self) -> pd.DataFrame:
        """Récupère toutes les données de la table."""
        query = f"SELECT * FROM {self.schema}.{self.table_name}"
        return self.db_connection.read_sql_query(query)     
    
    def update(self, new_data: pd.DataFrame) -> None:
        """Méthode abstraite pour mettre à jour les données."""
        raise NotImplementedError("La sous-classe doit implémenter la méthode abstraite")

# Centralisation des noms de tables
class Tables:
    CALC : str = config['connexion']['tables']['calc']
    JOURS : str = config['connexion']['tables']['jours']
    MOIS : str = config['connexion']['tables']['mois']
    MAILLES : str = config['connexion']['tables']['mailles']

class Maille(BaseTable):
    """Classe pour gérer les mailles (structures hiérarchiques)."""
    def __init__(self, db_connection: DatabaseAccess, historization_error: HistorisationError):
        self.historization_error = historization_error or HistorisationError()       
        super().__init__(Tables.MAILLES, db_connection, self.historization_error)        
        
    def update(self, new_data: pd.DataFrame) -> None:
        """Met à jour les mailles avec de nouvelles données."""
        exitcode = 0
        if not isinstance(new_data, pd.DataFrame) or new_data.empty:
            logger_info.warning("Aucune nouvelle donnée à mettre à jour pour les mailles")
        try:
            logger_info.info("Mise à jour des mailles")
            with manage_dataframe(self.get_data()) as df_mailles_existing:            
                with manage_dataframe(new_data.drop_duplicates(subset=['maille'])) as new_data:
                    new_mailles = new_data[~new_data['maille'].isin(df_mailles_existing['label'])]
                    if not new_mailles.empty:
                        self._insert_new_mailles(new_mailles, df_mailles_existing)
                    else:
                        logger_info.info("Aucune nouvelle maille à ajouter")
        except Exception as e:
            error_msg =f"Erreur lors de la mise à jour des mailles : {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error(error_msg)

    def _insert_new_mailles(self, new_mailles: pd.DataFrame, existing_mailles: pd.DataFrame) -> None:
        try: 
            max_id = existing_mailles['id_maille'].max() if not existing_mailles.empty else 0
            label_to_id = dict(zip(existing_mailles['label'], existing_mailles['id_maille']))
            new_mailles_to_insert = []

            def insert_maille(label: str, parent_label: Optional[str]) -> Dict[str, Union[int, str]]:
                nonlocal max_id
                if label in label_to_id:
                    return  # La maille existe déjà
                max_id += 1
                parent_id = label_to_id.get(parent_label, 0)
                new_maille = {
                    'id_maille': max_id,
                    'label': label,
                    'id_parent': parent_id
                }
                new_mailles_to_insert.append(new_maille)
                label_to_id[label] = max_id
                return new_maille
            # D'abord, insérer toutes les mailles parents
            for _, row in new_mailles.iterrows():            
                parent_label = row['maille_parent']
                if parent_label and parent_label not in label_to_id:
                    insert_maille(parent_label, None)
                    logger_info.info(f"Maille parent créée : {parent_label}")
            # Ensuite, insérer les mailles enfants
            for _, row in new_mailles.iterrows():
                label = row['maille']
                parent_label = row['maille_parent']
                insert_maille(label, parent_label)
            if new_mailles_to_insert:            
                df_to_insert = pd.DataFrame(new_mailles_to_insert)
                self.db_connection.insert_dataframe(df_to_insert, self.table_name)
                logger_info.info(f"Ajout de {len(new_mailles_to_insert)} nouvelles mailles")                
        except Exception as e:           
            error_msg =f"Erreur lors de l'insertion de la maille : {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error.add_error(error_msg)
                        
class Calc(BaseTable):
    """Classe pour gérer les calculs."""
    def __init__(self, db_connection: DatabaseAccess, maille: Maille, rapport: List[str], historization_error: HistorisationError):
        self.historization_error = historization_error or HistorisationError()        
        super().__init__(Tables.CALC, db_connection, self.historization_error)
        self.rapport = rapport
        self.maille = maille           

    def update(self, new_calcs: pd.DataFrame) -> None:
        """Met à jour les calculs avec de nouvelles données."""
        if not isinstance(new_calcs, pd.DataFrame) or new_calcs.empty:
            logger_info.warning("Aucune nouvelle donnée à mettre à jour pour les calculs")
        try:
            logger_info.info("Mise à jour des calculs")
            with manage_dataframe(self.get_data()) as df_calcs_existing:
                with manage_dataframe(new_calcs.drop_duplicates(subset=['indicateur'])) as new_calcs:
                    new_calcs_to_add = new_calcs[~new_calcs['indicateur'].isin(df_calcs_existing['label'])]
                    if not new_calcs_to_add.empty:
                        self._insert_new_calcs(new_calcs_to_add, df_calcs_existing)
                    else:
                        logger_info.info("Aucun nouveau calcul à ajouter")
        except Exception as e:            
            error_msg =f"Erreur lors de la mise à jour des calculs : {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error(error_msg) 

    def _insert_new_calcs(self, new_calcs: pd.DataFrame, existing_calcs: pd.DataFrame) -> None: 
        try: 
            max_id = existing_calcs['id_calc'].max() if not existing_calcs.empty else 0
            calc_label_to_id = dict(zip(existing_calcs['label'], existing_calcs['id_calc']))
            maille_label_to_id = dict(zip(self.maille.get_data()['label'], self.maille.get_data()['id_maille']))
            new_calcs_to_insert = []
            def insert_calc(indicateur: str, parent: Optional[str], maille: str) -> Dict[str, Union[int, str, List[str]]]:
                nonlocal max_id
                if indicateur in calc_label_to_id:
                    return  # Le calcul existe déjà
                max_id += 1
                parent_id = calc_label_to_id.get(parent, 0)
                id_maille_groupe = maille_label_to_id.get(maille, 0)
                new_calc = {
                    'id_calc': max_id,
                    'label': indicateur,
                    'id_parent': parent_id,
                    'id_maille_groupe': id_maille_groupe,
                    'rapports': self.rapport
                }
                new_calcs_to_insert.append(new_calc)
                calc_label_to_id[indicateur] = max_id
                return new_calc
            # D'abord, insérer tous les calculs parents
            for _, row in new_calcs.iterrows():            
                parent_label = row['indicateur_parent']
                if parent_label and parent_label not in calc_label_to_id:
                    insert_calc(parent_label, None, row['maille_parent'])
                    logger_info.info(f"Calcul parent créé : {parent_label}")
            # Ensuite, insérer les calculs enfants
            for _, row in new_calcs.iterrows():
                label = row['indicateur']
                parent_label = row['indicateur_parent']
                maille_label = row['maille_parent']
                insert_calc(label, parent_label, maille_label)
            if new_calcs_to_insert:
                df_to_insert = pd.DataFrame(new_calcs_to_insert)
                self.db_connection.insert_dataframe(df_to_insert, self.table_name)
                logger_info.info(f"Ajout de {len(new_calcs_to_insert)} nouveaux calculs")                
        except Exception as e:
            error_msg =f"Erreur lors de l'insertion du calc : {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error(error_msg) 

class Jour(BaseTable):
    """Classe pour gérer les données journalières."""
    def __init__(self, db_connection: DatabaseAccess, rapport: List[str], date: date, historization_error: HistorisationError, maille: Maille):
        self.historization_error = historization_error or HistorisationError()
        super().__init__(Tables.JOURS, db_connection, self.historization_error)        
        self.rapport = rapport
        self.maille = maille
        self.date = date
        
    def update(self, new_data: pd.DataFrame) -> None:
        """Met à jour les calculs avec de nouvelles données."""
        if not isinstance(new_data, pd.DataFrame) or new_data.empty:
            logger_info.warning("Aucune nouvelle donnée à mettre à jour pour les jours")                    
        try:
            logger_info.info("Mise à jour des données journalières")                       
            with manage_dataframe(Calc(self.db_connection, self.maille, self.rapport,self.historization_error).get_data()) as calc_data:
                with manage_dataframe(self.maille.get_data()) as maille_data:
                    calc_data_unique = calc_data.drop_duplicates(subset='label', keep='first')
                    maille_data_unique = maille_data.drop_duplicates(subset='label', keep='first')
                    with manage_dataframe(new_data) as new_data:
                        new_data = new_data.merge(calc_data_unique[['label', 'id_calc']], 
                                                  left_on='indicateur', 
                                                  right_on='label', 
                                                  how='left')
                        new_data = new_data.merge(maille_data_unique[['label', 'id_maille']], 
                                                  left_on='maille', 
                                                  right_on='label', 
                                                  how='left')
                        new_data['date'] = self.date                        
                        to_insert = new_data[['id_calc', 'id_maille', 'date', 'valeur']].dropna()
                        if not to_insert.empty:
                            self.db_connection.insert_dataframe(to_insert, self.table_name)
                            logger_info.info(f"Insertion de {len(to_insert)} nouvelles lignes dans la table des jours")
                        else:
                            logger_info.warning("Aucune nouvelle donnée valide à insérer dans la table des jours")   
        except Exception as e:            
            error_msg =f"Erreur lors de la mise à jour des données journalières : {str(e)}"
            logger_info.error(error_msg)                        
            self.historization_error.add_error(error_msg)
            logger_info.error(f"HistorisationError après ajout : {self.historization_error.get_str()}")

class Mois(BaseTable):
    """Classe pour gérer les données mensuelles."""
    def __init__(self, db_connection: DatabaseAccess, rapport_dir: Path, historization_error: HistorisationError):
        self.historization_error = historization_error or HistorisationError()
        super().__init__(Tables.MOIS, db_connection, self.historization_error)
        self.rapport_dir = rapport_dir       
        
    def update(self) -> None:   
        """Met à jour les calculs avec de nouvelles données."""
        if not hasattr(self, 'historization_error'):
            print("historization_error n'existe pas")
        try:
            # Vérifier si le fichier indic_mois_kpi.sql existe
            sql_file = os.path.join(self.rapport_dir, SQL_MOIS)            
            if os.path.exists(sql_file):
                # Si le fichier existe, exécuter le script SQL
                df_mois = self.db_connection.read_sql_query_file(sql_file)                
            else:
                rapport = [os.path.basename(self.rapport_dir).upper()]
                # Sinon, utiliser la requête SQL fournie
                query = f"""
                SELECT id_calc, id_maille, 
                       date_trunc('month', (current_date - interval '1 month'))::date as "date", 
                       avg(valeur) as valeur 
                FROM {self.schema}.{Tables.JOURS} j 
                JOIN {self.schema}.{Tables.CALC} c USING(id_calc) 
                WHERE extract(month from "date") = extract(month from current_date - interval '1 month') 
                  AND c.rapports @> array[{rapport}]
                GROUP BY id_calc, id_maille, 
                         date_trunc('month', (current_date - interval '1 month'))::date
                """
                df_mois = self.db_connection.read_sql_query(query)
            if not df_mois.empty:
                self.db_connection.insert_dataframe(df_mois, self.table_name, if_exists='append')
                logger_info.info(f"Added {len(df_mois)} monthly records")                
            else:
                logger_info.warning("Pas de données mensuelles à historiser")            
        except Exception as e:            
            error_msg =f"Erreur lors de l'histiorisation des données mensuelles: {str(e)}"
            logger_info.error(error_msg)                        
            raise self.historization_error.add_error(error_msg)
            
@log_execution_time
class Rapport:
    def __init__(self, rapport_dir: Path, db_connection: DatabaseAccess, historization_error: HistorisationError, flag_manager: Optional[FlagManager] = None, date: Optional[date] = None):
        self.historization_error = historization_error
        self.db_connection = db_connection
        self.date = date or datetime.now().date()
        self.rapport_dir = Path(rapport_dir)
        self.rapport = [self.rapport_dir.name.upper()]
        self.maille = Maille(self.db_connection, self.historization_error)
        self.calc = Calc(self.db_connection, self.maille, self.rapport, self.historization_error)
        self.jour = Jour(self.db_connection, self.rapport, self.date, self.historization_error, self.maille)
        self.mois = Mois(self.db_connection, rapport_dir, self.historization_error)
        self.flag_manager = flag_manager
        self.exitcode = 0
        self.sql_processor = SQLFileProcessor(self.db_connection, self.historization_error)        

    def process_sql_files(self) -> int:
        try:
            self._process_daily_files()
            self._process_monthly_file()
            return self.exitcode
        except Exception as e:
            self._handle_error(f"Erreur lors du traitement processus des fichiers SQL : {str(e)}")
            return self.exitcode

    def _process_daily_files(self):
        for sql_file in self.rapport_dir.glob('*.sql'):
            if SQL_MOIS not in sql_file.name:
                self._process_single_file(sql_file)

    def _process_monthly_file(self):
        monthly_file = self.rapport_dir / SQL_MOIS
        if monthly_file.exists():
            self._process_single_file(monthly_file, is_monthly=True)
        elif not self.flag_manager.manage_flag(FLAG_FILE_MONTHLY, FlagAction.CHECK, FlagType.MONTHLY):
            self._update_monthly_data()

    def _process_single_file(self, sql_file: Path, is_monthly: bool = False):
        if not self._should_process_file(sql_file, is_monthly):
            return

        try:
            data = self.sql_processor.process_file(sql_file)
            if data is not None and not data.empty:
                self._update_data(data, is_monthly)                
                self.flag_manager.manage_flag(sql_file, FlagAction.CREATE, FlagType.MONTHLY if is_monthly else FlagType.DAILY)
            else:
                logger_info.warning(f"Le fichier {sql_file} n'a pas retourné de DataFrame valide ou est vide.")
        except Exception as e:
            self._handle_error(f"Erreur lors du traitement du fichier {sql_file}: {str(e)}")

    def _should_process_file(self, sql_file: Path, is_monthly: bool) -> bool:
        flag_type = FlagType.MONTHLY if is_monthly else FlagType.DAILY
        if self.flag_manager.manage_flag(sql_file, FlagAction.CHECK, flag_type):
            logger_info.warning(f"Le fichier {sql_file.name} a déjà été traité.")
            return False
        return True

    def _update_data(self, data: pd.DataFrame, is_monthly: bool):
        if is_monthly:
            self.mois.update()
        else:
            self.maille.update(data[['maille', 'maille_parent']])
            self.calc.update(data[['indicateur', 'indicateur_parent', 'maille_parent']])
            self.jour.update(data)
            
    def _update_monthly_data(self):
        self.mois.update()
        self.flag_manager.manage_flag(FLAG_FILE_MONTHLY, FlagAction.CREATE, FlagType.MONTHLY)

    def _handle_error(self, error_msg: str):
        logger_info.error(error_msg)
        self.historization_error.add_error(error_msg)
        self.exitcode += 1   

class SQLFileProcessor:
    def __init__(self, db_connection: DatabaseAccess, historization_error: HistorisationError):
        self.db_connection = db_connection
        self.historization_error = historization_error

    def process_file(self, sql_file: Path) -> Optional[pd.DataFrame]:
        try:
            data = self.db_connection.read_sql_query_file(sql_file)
            if set(SQL_FORMAT_LABELS).issubset(data.columns):
                return data
            else:               
                error_msg = f"Colonnes manquantes dans le fichier {sql_file}: {str(e)}"
                logger_info.error(error_msg)
                self.historization_error.add_error(error_msg)    
        except Exception as e:           
            error_msg = f"Erreur lors du traitement {sql_file}: {str(e)}"
            logger_info.error(error_msg)
            self.historization_error.add_error(error_msg)
            
class Traitement:
    def __init__(self, db_connection: DatabaseAccess, historization_error: HistorisationError, flag_manager: FlagManager):
        self.db_connection = db_connection
        self.historization_error = historization_error
        self.flag_manager = flag_manager
        self.data_cleaner = DataCleaner(self.db_connection, self.historization_error)

    def process_action(self, action: str, date: Optional[date] = None, rapport_dir: Optional[Path] = None) -> int:
        try:
            logger_info.info(f"Process_data, action souhaitée : {action}")
            
            is_monthly = action == 'mois'
            self._clean_old_data(is_monthly)
            
            if action == 'default':
                return self._process_default(date)
            elif action == 'jour':
                return self._process_jour(date)
            elif action == 'mois':
                return self._process_mois()
            elif action == 'rapport' and rapport_dir:
                return self._process_rapport(rapport_dir)
            else:
                error_msg = f"Action non reconnue : {action}"
                raise ValueError(error_msg)
        except Exception as e:
            error_msg = f"Erreur lors du traitement {action} : {str(e)}"
            logger_info.error(error_msg)
            self.historization_error.add_error(error_msg)
            return 1

    def _process_default(self, date: Optional[date] = None) -> int:
        logger_info.info("traitement par default")
        if not self.flag_manager.manage_flag(FLAG_FILE_DAILY, FlagAction.CHECK) or date:
            exitcode = 0
            for rapport_dir in SQLPATH.glob('*'):
                if rapport_dir.is_dir():
                    rapport = Rapport(Path(rapport_dir), self.db_connection, self.historization_error, self.flag_manager, date)
                    exitcode += rapport.process_sql_files()
            if not date:
                self.flag_manager.manage_flag(FLAG_FILE_DAILY, FlagAction.CREATE)
            return exitcode
        return 0

    def _process_jour(self, date: Optional[date] = None) -> int:
        date = date if date else datetime.now().date()
        self.db_connection.delete_day_data(date)
        self.flag_manager.manage_flag(FLAG_FILE_DAILY, FlagAction.REMOVE)
        return self._process_default(date)

    def _process_mois(self) -> int:
        last_month = (date.today().replace(day=1) - timedelta(days=1)).replace(day=1)
        self.db_connection.delete_month_data(last_month.year, last_month.month)
        self.flag_manager.manage_flag(FLAG_FILE_MONTHLY, FlagAction.REMOVE)
        self._clean_old_data(true)
        exitcode = 0
        for rapport_dir in SQLPATH.glob('*'):
            if rapport_dir.is_dir():
                exitcode += Mois(self.db_connection, str(rapport_dir), self.historization_error).update()
        self.flag_manager.manage_flag(FLAG_FILE_MONTHLY, FlagAction.CREATE, FlagType.MONTHLY)
        return exitcode

    def _process_rapport(self, rapport_dir: Path) -> int:
        rapport = Rapport(rapport_dir, self.db_connection, self.historization_error, self.flag_manager)
        return rapport.process_sql_files()  
    
    def _clean_old_data(self, is_monthly: Optional[bool] = False):
        if is_monthly:
            self.data_cleaner.clean_monthly_data()
        else:
            self.data_cleaner.clean_daily_data()

    
    
class DataCleaner:
    def __init__(self, db_connection: DatabaseAccess, historization_error: HistorisationError):
        self.db_connection = db_connection
        self.historization_error = historization_error

    def clean_daily_data(self):
        self._clean_data(SCHEMA, Tables.JOURS, config['data_cleanup']['daily_retention_months'])

    def clean_monthly_data(self):
        self._clean_data(SCHEMA, Tables.MOIS,config['data_cleanup']['monthly_retention_months'])

    def _clean_data(self, schema: str, table: str, interval: str):
        try:
            query = f"""
            DELETE FROM {schema}.{table}
            WHERE date < CURRENT_DATE - INTERVAL '{interval}'
            """
            result = self.db_connection.execute_query(query)
            affected_rows = result.rowcount if hasattr(result, 'rowcount') else 0
            logger_info.info(f"Suppression de {affected_rows} lignes de données de {table}")
        except Exception as e:
            raise Exception(f"Erreur lors du nettoyage des données de {table} : {str(e)}")  
    
def parse_arguments() -> Optional[argparse.Namespace]:
    """Parse les arguments de ligne de commande ou retourne None si exécuté dans Jupyter."""
    if 'ipykernel' in sys.modules:
        # Exécution dans Jupyter, pas de parsing d'arguments
        return None
    else:
        # Exécution comme script autonome, parser les arguments
        parser = argparse.ArgumentParser(description="Traitement des données journalières et mensuelles")
        parser.add_argument('--jour', action='store_true', help="Force le recalcul des données journalières")
        parser.add_argument('--mois', action='store_true', help="Force le recalcul des données mensuelles")
        parser.add_argument('--veille', action='store_true', help="Force le recalcul des données de la veille")
        parser.add_argument('--sup', action='store_true', help="Force la suppression des données journalières")
        parser.add_argument('--rapport', type=Path, help="Chemin vers le répertoire du rapport")
        parser.add_argument('--user', help="Nom d'utilisateur pour la connexion à la base de données")
        parser.add_argument('--password', help="Mot de passe pour la connexion à la base de données")
        parser.add_argument('--host', help="Hôte de la base de données")
        parser.add_argument('--port', type=int, help="Port de la base de données")
        parser.add_argument('--bdd', help="Nom de la base de données")
        return parser.parse_args()  

# Fonction principale    
@log_execution_time
def main() -> int:
    db_connection = None
    error_handler = HistorisationError()
    flag_manager = FlagManager(error_handler)
    try:
        args = parse_arguments()
        config = charger_config('config.json')
        
        connection_params = {
            'user': getattr(args, 'user', None) or config['connexion']['user'],
            'password': getattr(args, 'password', None) or config['connexion']['password'],
            'host': getattr(args, 'host', None) or config['connexion']['host'],
            'port': getattr(args, 'port', None) or config['connexion']['port'],
            'bdd': getattr(args, 'bdd', None) or config['connexion']['bdd']
        }
        
        db_connection = DatabaseAccess(error_handler, **connection_params)
        traitement = Traitement(db_connection, error_handler, flag_manager)
        actions = {
            
            'jour': lambda: traitement.process_action('jour'),
            'mois': lambda: traitement.process_action('mois'),
            'veille': lambda: traitement.process_action('jour', (datetime.now() - timedelta(days=1)).date()),
            'rapport': lambda: traitement.process_action( 'rapport', rapport_dir=Path(args.rapport)) if args.rapport else 0
        }        
        if args is None or not any(getattr(args, action, False) for action in actions):  
            logger_info.info('Actualisation manuelle du rapport R039')
            traitement.process_action('default')
            #process_data(db_connection, error_handler,flag_manager, 'default',rapport_dir=Path("sql/r039"))             
        else:
            for action, func in actions.items():
                if getattr(args, action, False):                   
                    func()  
        
        if args is None or not any(getattr(args, action, False) for action in ['jour', 'mois', 'veille', 'rapport']):
            logger_info.info('Actualisation manuelle des rapports')
            exitcode = traitement.process_action('default')
        else:
            exitcode = 0
            if args.jour:
                exitcode += traitement.process_action('jour')
            if args.mois:
                exitcode += traitement.process_action('mois')
            if args.veille:
                exitcode += traitement.process_action('jour', date=(datetime.now() - timedelta(days=1)).date())
            if args.rapport:
                exitcode += traitement.process_action('rapport', rapport_dir=Path(args.rapport))
        
        return exitcode, error_handler.get_summary()

    except Exception as e:
        error_msg = f"Erreur générale : {str(e)}"
        logger_info.error(error_msg)
        error_handler.add_error(error_msg)
        return 1, error_handler.get_summary()

    finally:
        if db_connection:
            try:
                db_connection.disconnect()
            except Exception as e:
                error_msg = f"Erreur lors de la déconnexion : {str(e)}"
                logger_info.error(error_msg)
                error_handler.add_error(error_msg)
        
        gc.collect()

if __name__ == "__main__":
    exitcode, (messages, _) = main()
    if exitcode > 0:
        print(messages)
    sys.exit(exitcode)