Skip to content

Commit

Permalink
[agent] reprise du mode de marche des agents machines et ars
Browse files Browse the repository at this point in the history
dans le cas d'une coupure de reseau. l'agent essai des connections succesive.
si au bout de 15 tentatives, l'agent recree son client xmpp. et recommence le cycle de connection.
[agent] Amélioration de la résilience réseau pour les agents machines et ARS

En cas de coupure réseau,
l'agent tentera désormais de rétablir sa connexion client XMPP.
Si l'agent n'est pas en mesure de se connecter après 15 tentatives,
il recréera son client XMPP et redémarrera le cycle de connexion.
Cela devrait améliorer la capacité de l'agent à se remettre d'une coupure réseau et à continuer de fonctionner de manière fluide.
  • Loading branch information
jfkneib committed Jun 14, 2024
1 parent f2f9db7 commit 5b47ae1
Show file tree
Hide file tree
Showing 2 changed files with 312 additions and 58 deletions.
209 changes: 151 additions & 58 deletions pulse_xmpp_agent/agentxmpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
from lib.grafcetdeploy import grafcet
from zipfile import *
from optparse import OptionParser
from multiprocessing import Queue, Process, Event
from multiprocessing import Queue, Process, Event, Value
from multiprocessing.managers import SyncManager
import multiprocessing
from modulefinder import ModuleFinder
Expand Down Expand Up @@ -253,15 +253,19 @@ def __init__(
eventkilltcp,
eventkillpipe,
pidprogrammprincipal,
lockrestart,
PROCESS_RESTART
):
logging.log(
DEBUGPULSE, "start machine %s Type %s" % (conf.jidagent, conf.agenttype)
DEBUGPULSE, "initialise agent xmpp %s Type %s" % (conf.jidagent, conf.agenttype)
)

self.lockrestart = lockrestart
self.PROCESS_RESTART = PROCESS_RESTART
self.connectcount = 0
self.iq_msg = file_message_iq(dev_mod=True)
self.pidprogrammprincipal = pidprogrammprincipal
self.time_before_reinscription = 30

self.reconnect_time_wait = 3
# create mutex
self.mutex = threading.Lock()
self.mutexslotquickactioncount = threading.Lock()
Expand Down Expand Up @@ -767,6 +771,7 @@ def __init__(
# Parameters for the agent connexion
self.add_event_handler("register", self.register)
self.add_event_handler("connecting", self.handle_connecting)
self.add_event_handler("connected", self.handle_connected)
self.add_event_handler("connection_failed", self.handle_connection_failed)
self.add_event_handler("disconnected", self.handle_disconnected)

Expand Down Expand Up @@ -1162,6 +1167,12 @@ def Mode_Marche_Arret_loop(
self.config = tgconf(type_machine)
self.address = (ipfromdns(self.config.Server), int(self.config.Port))
ctrlC = self.Mode_Marche_Arret_connect(forever=forever, timeout=timeout)
if self.connectcount > 2:
logger.debug("attente avant tentative de reconection %s s " % self.reconnect_time_wait)
time.sleep(self.reconnect_time_wait)
else:
# 3 seconde avant reconnection
time.sleep(3)
if ctrlC: # ctrl+c on quit
return False
if self.brestartbot:
Expand Down Expand Up @@ -1203,13 +1214,71 @@ def quit_application(self, wait=2):
setgetrestart(0)
self.disconnect(wait=wait)

def handle_connected(self, data):
# on reinitialise le nombre de connection en default
self.connectcount = 0
self.set_connect_loop_wait(3)

def handle_connection_failed(self, data):
"""
on connection failed on libere la connection
a savoir apres "CONNECTION FAILED"
il faut reinitialiser adress et port de connection.
"""
logger.error("CONNECTION FAILED")
logger.error("CONNECTION FAILED %s " % self.connectcount)
self.connectcount = self.connectcount + 1
self.set_connect_loop_wait(3)
if self.connectcount%5 == 0:
logger.warning("tentative de connection failed %s " % self.connectcount)
if self.connectcount >= 5 and self.connectcount < 10:
self.set_connect_loop_wait(5)
if self.connectcount >=11 and self.connectcount <= 15:
self.set_connect_loop_wait(10)
if self.connectcount >= 15:
self.set_connect_loop_wait(15)
# Terminer le processus courant
logger.error("%s Tentatives de connexion échouées dans ce processus." % self.connectcount)
logger.error("Veuillez vérifier vos paramètres réseau.")
logger.error("Re-création du processus agent XMPP en cours.")
try:
with self.lockrestart: # Verrouillage pour éviter les conflits d'accès
self.PROCESS_RESTART.value = 1
logger.debug("donne ordre de recreation client xmpp.")
except Exception as e:
logger.error(f"An error occurred: {e}")
self.process_restartbot = False
self.queue_read_event_from_command.put("quit")
# termine server kiosk
self.eventkiosk.quit()
self.eventkilltcp.set()
self.eventkillpipe.set()
time.sleep(1)
if sys.platform.startswith("win"):
try:
# on debloque le pipe
fileHandle = win32file.CreateFile(
"\\\\.\\pipe\\interfacechang",
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0,
None,
win32file.OPEN_EXISTING,
0,
None,
)
win32file.WriteFile(fileHandle, "terminate")
fileHandle.Close()
time.sleep(2)
except Exception as e:
logger.error("\n%s" % (traceback.format_exc()))
pass
self.startdata = -1
logger.debug("byby session xmpp")
logger.error("PROCESS_RESTART %s" % (self.PROCESS_RESTART.value))
self.disconnect()
self.readconfig_Marche_Arret = False
self.loop.stop()
time.sleep(1)
return False
if self.brestartbot:
# on force 1 restart bot xmpp
self.startdata = -1
Expand Down Expand Up @@ -1321,6 +1390,7 @@ def Mode_Marche_Arret_connect(self, forever=False, timeout=10):
"""
ctrlC = False
try:
logger.debug(f"Tentative de connexion en cours. Tentative numéro {self.connectcount}.")
self.connect(address=self.address, force_starttls=None)
self.process(forever=False)
ctrlC = False
Expand Down Expand Up @@ -1360,17 +1430,20 @@ def get_connect_loop_wait(self):

def set_connect_loop_wait(self, int_time):
# connect_loop_wait in "xmlstream: make connect_loop_wait private"
# attention slixmpp modifie cette valeur private
# cf commit d3063a0368503
self.reconnect_time_wait = int_time
try:
self._connect_loop_wait
self._connect_loop_wait = int_time

except AttributeError:
self.connect_loop_wait = int_time

def handle_disconnected(self, data):
logger.debug(
"We got disconnected. We will reconnect in %s seconds"
% self.get_connect_loop_wait()
% self.reconnect_time_wait()
)

def register(self, iq):
Expand Down Expand Up @@ -3892,8 +3965,14 @@ def module_needed(self):
return False


def createDaemon(
optstypemachine, optsconsoledebug, optsdeamon, tgfichierconf, tglevellog, tglogfile
def createDaemon( optstypemachine,
optsconsoledebug,
optsdeamon,
tgfichierconf,
tglevellog,
tglogfile,
lockrestart,
PROCESS_RESTART
):
"""
This function create a service/Daemon that will execute a det. task
Expand All @@ -3910,6 +3989,8 @@ def createDaemon(
tgfichierconf,
tglevellog,
tglogfile,
lockrestart,
PROCESS_RESTART
),
)
p.daemon = True
Expand Down Expand Up @@ -3977,6 +4058,8 @@ def createDaemon(
tgfichierconf,
tglevellog,
tglogfile,
lockrestart,
PROCESS_RESTART
)
except OSError as error:
logging.error("Unable to fork. Error: %d (%s)" % (error.errno, error.strerror))
Expand Down Expand Up @@ -4124,6 +4207,35 @@ def doTask(
tgnamefileconfig,
tglevellog,
tglogfile,
lockrestart,
PROCESS_RESTART
):


while PROCESS_RESTART.value:
with lockrestart: # Verrouillage pour éviter les conflits d'accès
PROCESS_RESTART.value = 0
logger.debug("START CLIENT AGENT XMPP")
doTask1(
optstypemachine,
optsconsoledebug,
optsdeamon,
tgnamefileconfig,
tglevellog,
tglogfile,
lockrestart,
PROCESS_RESTART
)

def doTask1(
optstypemachine,
optsconsoledebug,
optsdeamon,
tgnamefileconfig,
tglevellog,
tglogfile,
lockrestart,
PROCESS_RESTART
):
processes = []
listpid = []
Expand Down Expand Up @@ -4201,6 +4313,8 @@ def doTask(
eventkilltcp,
eventkillpipe,
os.getpid(),
lockrestart,
PROCESS_RESTART
),
)
processes.append(p)
Expand Down Expand Up @@ -4232,20 +4346,9 @@ def doTask(
try:
programrun = True
while True:
time.sleep(120)
try:
process = psutil.Process(processes[0].pid)
# Vérifier si le processus est en cours d'exécution
continue
except psutil.NoSuchProcess:
programrun = False
if not programrun:
logging.debug("END PROGRAMM")
for p in processes:
p.terminate()
cmd = "kill -s kill %s" % os.getpid()
result = simplecommand(cmd)
break
time.sleep(10)
if PROCESS_RESTART.value == 1:
return
except KeyboardInterrupt:
logging.debug("CTRL+C have been asked.")
logging.debug("The Pulse Xmpp Agent Relay is now stopped")
Expand All @@ -4256,40 +4359,11 @@ def doTask(

elif sys.platform.startswith("win"):
try:
windowfilepid = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"INFOSTMP",
"pidagentwintree",
)
dd = process_agent_search(os.getpid())
processwin = json.dumps(dd.pidlist(), indent=4)
file_put_contents(windowfilepid, "%s" % processwin)
logging.debug("Process agent list : %s" % processwin)
programrun = True
while True:
time.sleep(120)
dd = process_agent_search(os.getpid())
processwin = json.dumps(dd.pidlist(), indent=4)
file_put_contents(windowfilepid, "%s" % processwin)
logging.debug("Process agent list : %s" % processwin)
# list python process
lpidsearch = []
for k, v in dd.get_pid().items():
if "python.exe" in v:
lpidsearch.append(int(k))
logging.debug("Process python list : %s" % lpidsearch)
for pr in processes:
logging.debug("search %s in %s" % (pr.pid, lpidsearch))
if pr.pid not in lpidsearch:
logging.debug(
"Process %s pid %s is missing %s"
% (pr.name, pr.pid, lpidsearch)
)
for p in processes:
p.terminate()
logging.debug("END PROGRAMM")
cmd = "taskkill /F /PID %s" % os.getpid()
result = simplecommand(cmd)
break
time.sleep(10)
if PROCESS_RESTART.value == 1:
return
except KeyboardInterrupt:
logging.debug("CTRL+C have been asked.")
logging.debug("The Pulse Xmpp Agent Relay is now stopped")
Expand All @@ -4300,8 +4374,11 @@ def doTask(
else:
# completing process
try:
for p in processes:
p.join()
programrun = True
while True:
time.sleep(10)
if PROCESS_RESTART.value == 1:
return
except KeyboardInterrupt:
logging.debug("CTRL+C have been asked.")
sys.exit(1)
Expand All @@ -4325,6 +4402,8 @@ def __init__(
eventkilltcp,
eventkillpipe,
pidprogrammprincipal,
lockrestart,
PROCESS_RESTART
):
# parameter permet arret programme complet ICI PASSER PARAMETRE DANS XMPPBOT

Expand Down Expand Up @@ -4377,6 +4456,8 @@ def __init__(
eventkilltcp,
eventkillpipe,
self.pidprogrammprincipal,
lockrestart,
PROCESS_RESTART
)
xmpp.auto_reconnect = False
xmpp.register_plugin("xep_0030") # Service Discovery
Expand All @@ -4403,6 +4484,9 @@ def __init__(
)
)
self.logger.debug("TERMINATE")
if xmpp.PROCESS_RESTART.value:
self.logger.debug("fin process")
return
terminateserver(xmpp)


Expand Down Expand Up @@ -4474,7 +4558,7 @@ def terminateserver(xmpp):
logging.log(DEBUGPULSE, "terminate scheduler")
logging.log(DEBUGPULSE, "Waiting to stop kiosk server")
logging.log(DEBUGPULSE, "QUIT")
logging.log(DEBUGPULSE, "bye bye Agent")
logging.log(DEBUGPULSE, "bye bye client xmpp Agent")
if sys.platform.startswith("win"):
windowfilepid = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "INFOSTMP", "pidagentwintree"
Expand All @@ -4499,6 +4583,11 @@ def terminateserver(xmpp):


if __name__ == "__main__":
# Création d'un lock pour synchroniser l'accès à la valeur partagée
lockrestart = multiprocessing.Lock()
# Création d'une valeur partagée (initialisée à 0)
PROCESS_RESTART = Value('i', 1) # 'i' pour entier (int)

if sys.platform.startswith("linux") and os.getuid() != 0:
print("Agent must be running as root")
sys.exit(0)
Expand Down Expand Up @@ -4599,6 +4688,8 @@ def terminateserver(xmpp):
tg.namefileconfig,
tg.levellog,
tg.logfile,
lockrestart,
PROCESS_RESTART
)
else:
createDaemon(
Expand All @@ -4608,4 +4699,6 @@ def terminateserver(xmpp):
tg.namefileconfig,
tg.levellog,
tg.logfile,
lockrestart,
PROCESS_RESTART
)
Loading

0 comments on commit 5b47ae1

Please sign in to comment.