From 5b47ae18d75438a32c8b1e3c74306d72147c3f3c Mon Sep 17 00:00:00 2001 From: jfkneib Date: Fri, 14 Jun 2024 16:27:14 +0200 Subject: [PATCH] =?UTF-8?q?[agent]=20reprise=20du=20mode=20de=20marche=20d?= =?UTF-8?q?es=20agents=20machines=20et=20ars=20dans=20le=20cas=20d'une=20c?= =?UTF-8?q?oupure=20de=20reseau.=20l'agent=20essai=20des=20connections=20s?= =?UTF-8?q?uccesive.=20si=20au=20bout=20de=2015=20tentatives,=20l'agent=20?= =?UTF-8?q?recree=20son=20client=20xmpp.=20et=20recommence=20le=20cycle=20?= =?UTF-8?q?de=20connection.=20[agent]=20Am=C3=A9lioration=20de=20la=20r?= =?UTF-8?q?=C3=A9silience=20r=C3=A9seau=20pour=20les=20agents=20machines?= =?UTF-8?q?=20et=20ARS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit En cas de coupure réseau, l'agent tentera désormais de rétablir sa connexion client XMPP. Si l'agent n'est pas en mesure de se connecter après 15 tentatives, il recréera son client XMPP et redémarrera le cycle de connexion. Cela devrait améliorer la capacité de l'agent à se remettre d'une coupure réseau et à continuer de fonctionner de manière fluide. --- pulse_xmpp_agent/agentxmpp.py | 209 ++++++++++++++++++++++++---------- pulse_xmpp_agent/lib/utils.py | 161 ++++++++++++++++++++++++++ 2 files changed, 312 insertions(+), 58 deletions(-) diff --git a/pulse_xmpp_agent/agentxmpp.py b/pulse_xmpp_agent/agentxmpp.py index 10df6b4e..40b52086 100644 --- a/pulse_xmpp_agent/agentxmpp.py +++ b/pulse_xmpp_agent/agentxmpp.py @@ -102,7 +102,7 @@ from lib.grafcetdeploy import grafcet from zipfile import * from optparse import OptionParser -from multiprocessing import Queue, Process, Event +from multiprocessing import Queue, Process, Event, Value from multiprocessing.managers import SyncManager import multiprocessing from modulefinder import ModuleFinder @@ -253,15 +253,19 @@ def __init__( eventkilltcp, eventkillpipe, pidprogrammprincipal, + lockrestart, + PROCESS_RESTART ): logging.log( - DEBUGPULSE, "start machine %s Type %s" % (conf.jidagent, conf.agenttype) + DEBUGPULSE, "initialise agent xmpp %s Type %s" % (conf.jidagent, conf.agenttype) ) - + self.lockrestart = lockrestart + self.PROCESS_RESTART = PROCESS_RESTART + self.connectcount = 0 self.iq_msg = file_message_iq(dev_mod=True) self.pidprogrammprincipal = pidprogrammprincipal self.time_before_reinscription = 30 - + self.reconnect_time_wait = 3 # create mutex self.mutex = threading.Lock() self.mutexslotquickactioncount = threading.Lock() @@ -767,6 +771,7 @@ def __init__( # Parameters for the agent connexion self.add_event_handler("register", self.register) self.add_event_handler("connecting", self.handle_connecting) + self.add_event_handler("connected", self.handle_connected) self.add_event_handler("connection_failed", self.handle_connection_failed) self.add_event_handler("disconnected", self.handle_disconnected) @@ -1162,6 +1167,12 @@ def Mode_Marche_Arret_loop( self.config = tgconf(type_machine) self.address = (ipfromdns(self.config.Server), int(self.config.Port)) ctrlC = self.Mode_Marche_Arret_connect(forever=forever, timeout=timeout) + if self.connectcount > 2: + logger.debug("attente avant tentative de reconection %s s " % self.reconnect_time_wait) + time.sleep(self.reconnect_time_wait) + else: + # 3 seconde avant reconnection + time.sleep(3) if ctrlC: # ctrl+c on quit return False if self.brestartbot: @@ -1203,13 +1214,71 @@ def quit_application(self, wait=2): setgetrestart(0) self.disconnect(wait=wait) + def handle_connected(self, data): + # on reinitialise le nombre de connection en default + self.connectcount = 0 + self.set_connect_loop_wait(3) + def handle_connection_failed(self, data): """ on connection failed on libere la connection a savoir apres "CONNECTION FAILED" il faut reinitialiser adress et port de connection. """ - logger.error("CONNECTION FAILED") + logger.error("CONNECTION FAILED %s " % self.connectcount) + self.connectcount = self.connectcount + 1 + self.set_connect_loop_wait(3) + if self.connectcount%5 == 0: + logger.warning("tentative de connection failed %s " % self.connectcount) + if self.connectcount >= 5 and self.connectcount < 10: + self.set_connect_loop_wait(5) + if self.connectcount >=11 and self.connectcount <= 15: + self.set_connect_loop_wait(10) + if self.connectcount >= 15: + self.set_connect_loop_wait(15) + # Terminer le processus courant + logger.error("%s Tentatives de connexion échouées dans ce processus." % self.connectcount) + logger.error("Veuillez vérifier vos paramètres réseau.") + logger.error("Re-création du processus agent XMPP en cours.") + try: + with self.lockrestart: # Verrouillage pour éviter les conflits d'accès + self.PROCESS_RESTART.value = 1 + logger.debug("donne ordre de recreation client xmpp.") + except Exception as e: + logger.error(f"An error occurred: {e}") + self.process_restartbot = False + self.queue_read_event_from_command.put("quit") + # termine server kiosk + self.eventkiosk.quit() + self.eventkilltcp.set() + self.eventkillpipe.set() + time.sleep(1) + if sys.platform.startswith("win"): + try: + # on debloque le pipe + fileHandle = win32file.CreateFile( + "\\\\.\\pipe\\interfacechang", + win32file.GENERIC_READ | win32file.GENERIC_WRITE, + 0, + None, + win32file.OPEN_EXISTING, + 0, + None, + ) + win32file.WriteFile(fileHandle, "terminate") + fileHandle.Close() + time.sleep(2) + except Exception as e: + logger.error("\n%s" % (traceback.format_exc())) + pass + self.startdata = -1 + logger.debug("byby session xmpp") + logger.error("PROCESS_RESTART %s" % (self.PROCESS_RESTART.value)) + self.disconnect() + self.readconfig_Marche_Arret = False + self.loop.stop() + time.sleep(1) + return False if self.brestartbot: # on force 1 restart bot xmpp self.startdata = -1 @@ -1321,6 +1390,7 @@ def Mode_Marche_Arret_connect(self, forever=False, timeout=10): """ ctrlC = False try: + logger.debug(f"Tentative de connexion en cours. Tentative numéro {self.connectcount}.") self.connect(address=self.address, force_starttls=None) self.process(forever=False) ctrlC = False @@ -1360,17 +1430,20 @@ def get_connect_loop_wait(self): def set_connect_loop_wait(self, int_time): # connect_loop_wait in "xmlstream: make connect_loop_wait private" + # attention slixmpp modifie cette valeur private # cf commit d3063a0368503 + self.reconnect_time_wait = int_time try: self._connect_loop_wait self._connect_loop_wait = int_time + except AttributeError: self.connect_loop_wait = int_time def handle_disconnected(self, data): logger.debug( "We got disconnected. We will reconnect in %s seconds" - % self.get_connect_loop_wait() + % self.reconnect_time_wait() ) def register(self, iq): @@ -3892,8 +3965,14 @@ def module_needed(self): return False -def createDaemon( - optstypemachine, optsconsoledebug, optsdeamon, tgfichierconf, tglevellog, tglogfile +def createDaemon( optstypemachine, + optsconsoledebug, + optsdeamon, + tgfichierconf, + tglevellog, + tglogfile, + lockrestart, + PROCESS_RESTART ): """ This function create a service/Daemon that will execute a det. task @@ -3910,6 +3989,8 @@ def createDaemon( tgfichierconf, tglevellog, tglogfile, + lockrestart, + PROCESS_RESTART ), ) p.daemon = True @@ -3977,6 +4058,8 @@ def createDaemon( tgfichierconf, tglevellog, tglogfile, + lockrestart, + PROCESS_RESTART ) except OSError as error: logging.error("Unable to fork. Error: %d (%s)" % (error.errno, error.strerror)) @@ -4124,6 +4207,35 @@ def doTask( tgnamefileconfig, tglevellog, tglogfile, + lockrestart, + PROCESS_RESTART +): + + + while PROCESS_RESTART.value: + with lockrestart: # Verrouillage pour éviter les conflits d'accès + PROCESS_RESTART.value = 0 + logger.debug("START CLIENT AGENT XMPP") + doTask1( + optstypemachine, + optsconsoledebug, + optsdeamon, + tgnamefileconfig, + tglevellog, + tglogfile, + lockrestart, + PROCESS_RESTART + ) + +def doTask1( + optstypemachine, + optsconsoledebug, + optsdeamon, + tgnamefileconfig, + tglevellog, + tglogfile, + lockrestart, + PROCESS_RESTART ): processes = [] listpid = [] @@ -4201,6 +4313,8 @@ def doTask( eventkilltcp, eventkillpipe, os.getpid(), + lockrestart, + PROCESS_RESTART ), ) processes.append(p) @@ -4232,20 +4346,9 @@ def doTask( try: programrun = True while True: - time.sleep(120) - try: - process = psutil.Process(processes[0].pid) - # Vérifier si le processus est en cours d'exécution - continue - except psutil.NoSuchProcess: - programrun = False - if not programrun: - logging.debug("END PROGRAMM") - for p in processes: - p.terminate() - cmd = "kill -s kill %s" % os.getpid() - result = simplecommand(cmd) - break + time.sleep(10) + if PROCESS_RESTART.value == 1: + return except KeyboardInterrupt: logging.debug("CTRL+C have been asked.") logging.debug("The Pulse Xmpp Agent Relay is now stopped") @@ -4256,40 +4359,11 @@ def doTask( elif sys.platform.startswith("win"): try: - windowfilepid = os.path.join( - os.path.dirname(os.path.realpath(__file__)), - "INFOSTMP", - "pidagentwintree", - ) - dd = process_agent_search(os.getpid()) - processwin = json.dumps(dd.pidlist(), indent=4) - file_put_contents(windowfilepid, "%s" % processwin) - logging.debug("Process agent list : %s" % processwin) + programrun = True while True: - time.sleep(120) - dd = process_agent_search(os.getpid()) - processwin = json.dumps(dd.pidlist(), indent=4) - file_put_contents(windowfilepid, "%s" % processwin) - logging.debug("Process agent list : %s" % processwin) - # list python process - lpidsearch = [] - for k, v in dd.get_pid().items(): - if "python.exe" in v: - lpidsearch.append(int(k)) - logging.debug("Process python list : %s" % lpidsearch) - for pr in processes: - logging.debug("search %s in %s" % (pr.pid, lpidsearch)) - if pr.pid not in lpidsearch: - logging.debug( - "Process %s pid %s is missing %s" - % (pr.name, pr.pid, lpidsearch) - ) - for p in processes: - p.terminate() - logging.debug("END PROGRAMM") - cmd = "taskkill /F /PID %s" % os.getpid() - result = simplecommand(cmd) - break + time.sleep(10) + if PROCESS_RESTART.value == 1: + return except KeyboardInterrupt: logging.debug("CTRL+C have been asked.") logging.debug("The Pulse Xmpp Agent Relay is now stopped") @@ -4300,8 +4374,11 @@ def doTask( else: # completing process try: - for p in processes: - p.join() + programrun = True + while True: + time.sleep(10) + if PROCESS_RESTART.value == 1: + return except KeyboardInterrupt: logging.debug("CTRL+C have been asked.") sys.exit(1) @@ -4325,6 +4402,8 @@ def __init__( eventkilltcp, eventkillpipe, pidprogrammprincipal, + lockrestart, + PROCESS_RESTART ): # parameter permet arret programme complet ICI PASSER PARAMETRE DANS XMPPBOT @@ -4377,6 +4456,8 @@ def __init__( eventkilltcp, eventkillpipe, self.pidprogrammprincipal, + lockrestart, + PROCESS_RESTART ) xmpp.auto_reconnect = False xmpp.register_plugin("xep_0030") # Service Discovery @@ -4403,6 +4484,9 @@ def __init__( ) ) self.logger.debug("TERMINATE") + if xmpp.PROCESS_RESTART.value: + self.logger.debug("fin process") + return terminateserver(xmpp) @@ -4474,7 +4558,7 @@ def terminateserver(xmpp): logging.log(DEBUGPULSE, "terminate scheduler") logging.log(DEBUGPULSE, "Waiting to stop kiosk server") logging.log(DEBUGPULSE, "QUIT") - logging.log(DEBUGPULSE, "bye bye Agent") + logging.log(DEBUGPULSE, "bye bye client xmpp Agent") if sys.platform.startswith("win"): windowfilepid = os.path.join( os.path.dirname(os.path.realpath(__file__)), "INFOSTMP", "pidagentwintree" @@ -4499,6 +4583,11 @@ def terminateserver(xmpp): if __name__ == "__main__": + # Création d'un lock pour synchroniser l'accès à la valeur partagée + lockrestart = multiprocessing.Lock() + # Création d'une valeur partagée (initialisée à 0) + PROCESS_RESTART = Value('i', 1) # 'i' pour entier (int) + if sys.platform.startswith("linux") and os.getuid() != 0: print("Agent must be running as root") sys.exit(0) @@ -4599,6 +4688,8 @@ def terminateserver(xmpp): tg.namefileconfig, tg.levellog, tg.logfile, + lockrestart, + PROCESS_RESTART ) else: createDaemon( @@ -4608,4 +4699,6 @@ def terminateserver(xmpp): tg.namefileconfig, tg.levellog, tg.logfile, + lockrestart, + PROCESS_RESTART ) diff --git a/pulse_xmpp_agent/lib/utils.py b/pulse_xmpp_agent/lib/utils.py index e8b19066..3c57ba18 100644 --- a/pulse_xmpp_agent/lib/utils.py +++ b/pulse_xmpp_agent/lib/utils.py @@ -2613,6 +2613,167 @@ def stopallprogram(self): self.programlist.clear() +def check_socket_status(port): + """ + Vérifie l'état d'un port socket. + + Cette fonction utilise la commande `netstat` pour vérifier l'état d'un port + spécifié sur Windows, Linux ou macOS. + + Args: + port (int): Le port à vérifier. + + Returns: + str: Le statut du port (e.g., 'LISTEN', 'ESTABLISHED'). + None: Si le port n'est pas trouvé. + + Raises: + OSError: Si le système d'exploitation n'est pas supporté. + """ + # Déterminer la commande appropriée pour le système d'exploitation + system = platform.system().lower() + if system in ['windows', 'linux', 'darwin']: + # Commande netstat pour les systèmes Windows, Linux et macOS + if system == 'windows': + result = subprocess.run(['netstat', '-an'], capture_output=True, text=True) + else: + result = subprocess.run(['netstat', '-tan'], capture_output=True, text=True) + + # Filtrer les lignes contenant le port + lines = result.stdout.splitlines() + port_line = [line for line in lines if f":{port} " in line] + + if port_line: + # Extraire le statut de la ligne + status = port_line[0].split()[-1] + return status + else: + return None + else: + raise OSError(f"Système d'exploitation non supporté : {system}") + +def get_process_using_port(port): + """ + Obtient le processus utilisant un port spécifié. + + Cette fonction utilise `lsof` sur Unix-like et `netstat` sur Windows + pour trouver le processus qui utilise un port donné. + + Args: + port (int): Le port à vérifier. + + Returns: + str: Les informations du processus utilisant le port ou un message d'erreur. + """ + system = platform.system() + + if system in ['Linux', 'Darwin']: + command = ['lsof', '-i', f':{port}'] + elif system == 'Windows': + command = ['netstat', '-ano'] + else: + return f"Système d'exploitation non supporté : {system}" + + try: + result = subprocess.run(command, capture_output=True, text=True, check=True) + if system == 'Windows': + # Filtrer la sortie pour trouver le port spécifique + lines = result.stdout.splitlines() + for line in lines: + if f':{port} ' in line: # espace après le port pour éviter les mauvaises correspondances + return line + return f"Aucun processus n'utilise le port {port}." + else: + return result.stdout + except subprocess.CalledProcessError as e: + return f"Erreur : {e}" + +def process_exists(pid): + """ + Vérifie si un processus existe. + + Args: + pid (int): L'identifiant du processus à vérifier. + + Returns: + bool: True si le processus existe, False sinon. + """ + try: + process = psutil.Process(pid) + except psutil.NoSuchProcess: + return False + else: + return True + +def kill_process(pid): + """ + Tue un processus par son PID. + + Cette fonction tente de tuer un processus donné par son PID sur Windows, + Linux et macOS. + + Args: + pid (int): L'identifiant du processus à tuer. + + Returns: + bool: True si le processus a été tué avec succès, False sinon. + """ + system = platform.system() + if system == 'Windows': + try: + subprocess.run(['taskkill', '/F', '/T', '/PID', str(pid)], check=True) + logger.debug(f"Le processus avec le PID {pid} a été tué sur Windows.") + return True + except subprocess.CalledProcessError as e: + logger.error(f"Échec de la tentative de tuer le processus avec le PID {pid} sur Windows : {e}") + return False + elif system in ['Linux', 'Darwin']: + try: + subprocess.run(['kill', '-9', str(pid)], check=True) + logger.debug(f"Le processus avec le PID {pid} a été tué sur un système Unix-like.") + return True + except subprocess.CalledProcessError as e: + logger.error(f"Échec de la tentative de tuer le processus avec le PID {pid} sur un système Unix-like : {e}") + return False + else: + logger.error(f"Système d'exploitation non supporté : {system}") + return False + +def kill_process_tree(pid, parentprocess=False): + """ + Tue un processus et tous ses processus enfants. + + Args: + pid (int): L'identifiant du processus parent à tuer. + parentprocess (bool): Indique si le processus parent doit également être tué. + + Returns: + None + """ + try: + # Trouver tous les processus enfants du processus parent donné + parent = psutil.Process(pid) + children = parent.children(recursive=True) + + # Tuer chaque processus enfant récursivement + for child in children: + kill_process_tree(child.pid) + + # Tuer le processus enfant lui-même + for child in children: + child.terminate() + child.wait(timeout=5) # Attendre que le processus soit terminé + if parentprocess: + # Tuer le processus parent lui-même + parent.terminate() + parent.wait(timeout=5) # Attendre que le processus soit terminé + except psutil.NoSuchProcess: + pass + except psutil.AccessDenied: + logger.error(f"Permission refusée pour terminer le processus {pid}") + except psutil.TimeoutExpired: + logger.error(f"Délai d'attente expiré lors de la tentative de terminer le processus {pid}") + class AESCipher: def __init__(self, key, BS=32): self.key = key.encode("utf-8") if isinstance(key, str) else key