From 256e1e0b0b69975d953f6db401ee48efc9c93807 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Wed, 29 May 2019 22:19:59 +0200 Subject: [PATCH 1/2] More thread safety on boot, resets and forks --- instana/agent.py | 19 ++++++++++++------- instana/fsm.py | 27 +++++++++++++++++++++------ instana/meter.py | 21 ++++++++------------- instana/recorder.py | 21 ++++++++------------- 4 files changed, 49 insertions(+), 39 deletions(-) diff --git a/instana/agent.py b/instana/agent.py index a3709f6a..5a6f9cfa 100644 --- a/instana/agent.py +++ b/instana/agent.py @@ -47,6 +47,7 @@ class Agent(object): secrets_matcher = 'contains-ignore-case' secrets_list = ['key', 'password', 'secret'] client = requests.Session() + should_threads_shutdown = False def __init__(self): logger.debug("initializing agent") @@ -54,8 +55,13 @@ def __init__(self): self.machine = TheMachine(self) def start(self, e): - """ Starts the agent and required threads """ + """ + Starts the agent and required threads + + This method is called after a successful announce. See fsm.py + """ logger.debug("Spawning metric & span reporting threads") + self.should_threads_shutdown = False self.sensor.start() instana.singletons.tracer.recorder.start() @@ -66,15 +72,14 @@ def handle_fork(self): # Reset the Agent self.reset() - # Ask the sensor to handle the fork - self.sensor.handle_fork() - - # Ask the tracer to handle the fork - instana.singletons.tracer.handle_fork() - def reset(self): + # Will signal to any running background threads to shutdown. + self.should_threads_shutdown = True + self.last_seen = None self.from_ = From() + + # Will schedule a restart of the announce cycle in the future self.machine.reset() def to_json(self, o): diff --git a/instana/fsm.py b/instana/fsm.py index 3f2fdb89..7be23280 100644 --- a/instana/fsm.py +++ b/instana/fsm.py @@ -37,6 +37,7 @@ def to_dict(self): class TheMachine(object): RETRY_PERIOD = 30 + THREAD_NAME = "Instana Machine" agent = None fsm = None @@ -71,7 +72,7 @@ def __init__(self, agent): self.timer = t.Timer(5, self.fsm.lookup) self.timer.daemon = True - self.timer.name = "Instana Machine" + self.timer.name = self.THREAD_NAME self.timer.start() @staticmethod @@ -80,9 +81,25 @@ def print_state_change(e): (os.getpid(), t.current_thread().name, e.event, e.src, e.dst)) def reset(self): - self.fsm.lookup() + """ + reset is called to start from scratch in a process. It may be called on first boot or + after a detected fork. + + Here we time a new announce cycle in the future so that any existing threads have time + to exit before we re-create them. + + :return: void + """ + logger.debug("State machine being reset. Will schedule new announce cycle 6 seconds from now.") + + self.timer = t.Timer(6, self.fsm.lookup) + self.timer.daemon = True + self.timer.name = self.THREAD_NAME + self.timer.start() def lookup_agent_host(self, e): + self.agent.should_threads_shutdown = False + host, port = self.__get_agent_host_port() if self.agent.is_agent_listening(host, port): @@ -103,14 +120,12 @@ def lookup_agent_host(self, e): logger.warn("Instana Host Agent couldn't be found. Will retry periodically...") self.warnedPeriodic = True - self.schedule_retry(self.lookup_agent_host, e, "agent_lookup") + self.schedule_retry(self.lookup_agent_host, e, self.THREAD_NAME + ": agent_lookup") return False def announce_sensor(self, e): logger.debug("Announcing sensor to the agent") - sock = None pid = os.getpid() - cmdline = [] try: if os.path.isfile("/proc/self/cmdline"): @@ -152,7 +167,7 @@ def announce_sensor(self, e): return True else: logger.debug("Cannot announce sensor. Scheduling retry.") - self.schedule_retry(self.announce_sensor, e, "announce") + self.schedule_retry(self.announce_sensor, e, self.THREAD_NAME + ": announce") return False def schedule_retry(self, fun, e, name): diff --git a/instana/meter.py b/instana/meter.py index 0e1f8496..9f4bd0a8 100644 --- a/instana/meter.py +++ b/instana/meter.py @@ -142,9 +142,7 @@ def start(self): Calling this directly more than once without an actual fork will cause errors. """ self.reset() - - if self.thread.isAlive() is False: - self.thread.start() + self.thread.start() def reset(self): """" Reset the state as new """ @@ -154,16 +152,9 @@ def reset(self): self.snapshot_countdown = 0 self.thread = None - # Prepare the thread for metric collection/reporting - for thread in threading.enumerate(): - if thread.getName() == self.THREAD_NAME: - # Metric thread already exists; Make sure we re-use this one. - self.thread = thread - - if self.thread is None: - self.thread = threading.Thread(target=self.collect_and_report) - self.thread.daemon = True - self.thread.name = self.THREAD_NAME + self.thread = threading.Thread(target=self.collect_and_report) + self.thread.daemon = True + self.thread.name = self.THREAD_NAME def handle_fork(self): self.start() @@ -176,6 +167,10 @@ def collect_and_report(self): logger.debug(" -> Metric reporting thread is now alive") def metric_work(): + if self.agent.should_threads_shutdown is True: + logger.debug("Thread shutdown signal from agent is active: Shutting down metric reporting thread") + return False + self.process() if self.agent.is_timed_out(): diff --git a/instana/recorder.py b/instana/recorder.py index 81b13b67..f25f5e37 100644 --- a/instana/recorder.py +++ b/instana/recorder.py @@ -58,22 +58,13 @@ def start(self): Calling this directly more than once without an actual fork will cause errors. """ self.reset() - - if self.thread.isAlive() is False: - self.thread.start() + self.thread.start() def reset(self): - # Prepare the thread for metric collection/reporting - for thread in threading.enumerate(): - if thread.getName() == self.THREAD_NAME: - # Span reporting thread already exists; Make sure we re-use this one. - self.thread = thread - # Prepare the thread for span collection/reporting - if self.thread is None: - self.thread = threading.Thread(target=self.report_spans) - self.thread.daemon = True - self.thread.name = self.THREAD_NAME + self.thread = threading.Thread(target=self.report_spans) + self.thread.daemon = True + self.thread.name = self.THREAD_NAME def handle_fork(self): self.start() @@ -83,6 +74,10 @@ def report_spans(self): logger.debug(" -> Span reporting thread is now alive") def span_work(): + if instana.singletons.agent.should_threads_shutdown is True: + logger.debug("Thread shutdown signal from agent is active: Shutting down span reporting thread") + return False + queue_size = self.queue.qsize() if queue_size > 0 and instana.singletons.agent.can_send(): response = instana.singletons.agent.report_traces(self.queued_spans()) From 34d7124cf5a497091ca4b38d14c248dfe5264566 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 30 May 2019 11:07:56 +0200 Subject: [PATCH 2/2] Use Event() as a thread sentinel --- instana/agent.py | 12 ++++++++---- instana/fsm.py | 2 +- instana/meter.py | 2 +- instana/recorder.py | 2 +- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/instana/agent.py b/instana/agent.py index 5a6f9cfa..cd25e0eb 100644 --- a/instana/agent.py +++ b/instana/agent.py @@ -3,7 +3,7 @@ import json import os from datetime import datetime -from threading import Timer +import threading import requests import instana.singletons @@ -47,7 +47,7 @@ class Agent(object): secrets_matcher = 'contains-ignore-case' secrets_list = ['key', 'password', 'secret'] client = requests.Session() - should_threads_shutdown = False + should_threads_shutdown = threading.Event() def __init__(self): logger.debug("initializing agent") @@ -61,7 +61,7 @@ def start(self, e): This method is called after a successful announce. See fsm.py """ logger.debug("Spawning metric & span reporting threads") - self.should_threads_shutdown = False + self.should_threads_shutdown.clear() self.sensor.start() instana.singletons.tracer.recorder.start() @@ -73,8 +73,12 @@ def handle_fork(self): self.reset() def reset(self): + """ + This will reset the agent to a fresh unannounced state. + :return: None + """ # Will signal to any running background threads to shutdown. - self.should_threads_shutdown = True + self.should_threads_shutdown.set() self.last_seen = None self.from_ = From() diff --git a/instana/fsm.py b/instana/fsm.py index 7be23280..9daab25b 100644 --- a/instana/fsm.py +++ b/instana/fsm.py @@ -98,7 +98,7 @@ def reset(self): self.timer.start() def lookup_agent_host(self, e): - self.agent.should_threads_shutdown = False + self.agent.should_threads_shutdown.clear() host, port = self.__get_agent_host_port() diff --git a/instana/meter.py b/instana/meter.py index 9f4bd0a8..0959e0d6 100644 --- a/instana/meter.py +++ b/instana/meter.py @@ -167,7 +167,7 @@ def collect_and_report(self): logger.debug(" -> Metric reporting thread is now alive") def metric_work(): - if self.agent.should_threads_shutdown is True: + if self.agent.should_threads_shutdown.is_set(): logger.debug("Thread shutdown signal from agent is active: Shutting down metric reporting thread") return False diff --git a/instana/recorder.py b/instana/recorder.py index f25f5e37..b435130c 100644 --- a/instana/recorder.py +++ b/instana/recorder.py @@ -74,7 +74,7 @@ def report_spans(self): logger.debug(" -> Span reporting thread is now alive") def span_work(): - if instana.singletons.agent.should_threads_shutdown is True: + if instana.singletons.agent.should_threads_shutdown.is_set(): logger.debug("Thread shutdown signal from agent is active: Shutting down span reporting thread") return False