diff --git a/instana/agent.py b/instana/agent.py index a3709f6a..cd25e0eb 100644 --- a/instana/agent.py +++ b/instana/agent.py @@ -3,7 +3,7 @@ import json import os from datetime import datetime -from threading import Timer +import threading import requests import instana.singletons @@ -47,6 +47,7 @@ class Agent(object): secrets_matcher = 'contains-ignore-case' secrets_list = ['key', 'password', 'secret'] client = requests.Session() + should_threads_shutdown = threading.Event() def __init__(self): logger.debug("initializing agent") @@ -54,8 +55,13 @@ def __init__(self): self.machine = TheMachine(self) def start(self, e): - """ Starts the agent and required threads """ + """ + Starts the agent and required threads + + This method is called after a successful announce. See fsm.py + """ logger.debug("Spawning metric & span reporting threads") + self.should_threads_shutdown.clear() self.sensor.start() instana.singletons.tracer.recorder.start() @@ -66,15 +72,18 @@ def handle_fork(self): # Reset the Agent self.reset() - # Ask the sensor to handle the fork - self.sensor.handle_fork() - - # Ask the tracer to handle the fork - instana.singletons.tracer.handle_fork() - def reset(self): + """ + This will reset the agent to a fresh unannounced state. + :return: None + """ + # Will signal to any running background threads to shutdown. + self.should_threads_shutdown.set() + self.last_seen = None self.from_ = From() + + # Will schedule a restart of the announce cycle in the future self.machine.reset() def to_json(self, o): diff --git a/instana/fsm.py b/instana/fsm.py index 3f2fdb89..9daab25b 100644 --- a/instana/fsm.py +++ b/instana/fsm.py @@ -37,6 +37,7 @@ def to_dict(self): class TheMachine(object): RETRY_PERIOD = 30 + THREAD_NAME = "Instana Machine" agent = None fsm = None @@ -71,7 +72,7 @@ def __init__(self, agent): self.timer = t.Timer(5, self.fsm.lookup) self.timer.daemon = True - self.timer.name = "Instana Machine" + self.timer.name = self.THREAD_NAME self.timer.start() @staticmethod @@ -80,9 +81,25 @@ def print_state_change(e): (os.getpid(), t.current_thread().name, e.event, e.src, e.dst)) def reset(self): - self.fsm.lookup() + """ + reset is called to start from scratch in a process. It may be called on first boot or + after a detected fork. + + Here we time a new announce cycle in the future so that any existing threads have time + to exit before we re-create them. + + :return: void + """ + logger.debug("State machine being reset. Will schedule new announce cycle 6 seconds from now.") + + self.timer = t.Timer(6, self.fsm.lookup) + self.timer.daemon = True + self.timer.name = self.THREAD_NAME + self.timer.start() def lookup_agent_host(self, e): + self.agent.should_threads_shutdown.clear() + host, port = self.__get_agent_host_port() if self.agent.is_agent_listening(host, port): @@ -103,14 +120,12 @@ def lookup_agent_host(self, e): logger.warn("Instana Host Agent couldn't be found. Will retry periodically...") self.warnedPeriodic = True - self.schedule_retry(self.lookup_agent_host, e, "agent_lookup") + self.schedule_retry(self.lookup_agent_host, e, self.THREAD_NAME + ": agent_lookup") return False def announce_sensor(self, e): logger.debug("Announcing sensor to the agent") - sock = None pid = os.getpid() - cmdline = [] try: if os.path.isfile("/proc/self/cmdline"): @@ -152,7 +167,7 @@ def announce_sensor(self, e): return True else: logger.debug("Cannot announce sensor. Scheduling retry.") - self.schedule_retry(self.announce_sensor, e, "announce") + self.schedule_retry(self.announce_sensor, e, self.THREAD_NAME + ": announce") return False def schedule_retry(self, fun, e, name): diff --git a/instana/meter.py b/instana/meter.py index 0e1f8496..0959e0d6 100644 --- a/instana/meter.py +++ b/instana/meter.py @@ -142,9 +142,7 @@ def start(self): Calling this directly more than once without an actual fork will cause errors. """ self.reset() - - if self.thread.isAlive() is False: - self.thread.start() + self.thread.start() def reset(self): """" Reset the state as new """ @@ -154,16 +152,9 @@ def reset(self): self.snapshot_countdown = 0 self.thread = None - # Prepare the thread for metric collection/reporting - for thread in threading.enumerate(): - if thread.getName() == self.THREAD_NAME: - # Metric thread already exists; Make sure we re-use this one. - self.thread = thread - - if self.thread is None: - self.thread = threading.Thread(target=self.collect_and_report) - self.thread.daemon = True - self.thread.name = self.THREAD_NAME + self.thread = threading.Thread(target=self.collect_and_report) + self.thread.daemon = True + self.thread.name = self.THREAD_NAME def handle_fork(self): self.start() @@ -176,6 +167,10 @@ def collect_and_report(self): logger.debug(" -> Metric reporting thread is now alive") def metric_work(): + if self.agent.should_threads_shutdown.is_set(): + logger.debug("Thread shutdown signal from agent is active: Shutting down metric reporting thread") + return False + self.process() if self.agent.is_timed_out(): diff --git a/instana/recorder.py b/instana/recorder.py index 81b13b67..b435130c 100644 --- a/instana/recorder.py +++ b/instana/recorder.py @@ -58,22 +58,13 @@ def start(self): Calling this directly more than once without an actual fork will cause errors. """ self.reset() - - if self.thread.isAlive() is False: - self.thread.start() + self.thread.start() def reset(self): - # Prepare the thread for metric collection/reporting - for thread in threading.enumerate(): - if thread.getName() == self.THREAD_NAME: - # Span reporting thread already exists; Make sure we re-use this one. - self.thread = thread - # Prepare the thread for span collection/reporting - if self.thread is None: - self.thread = threading.Thread(target=self.report_spans) - self.thread.daemon = True - self.thread.name = self.THREAD_NAME + self.thread = threading.Thread(target=self.report_spans) + self.thread.daemon = True + self.thread.name = self.THREAD_NAME def handle_fork(self): self.start() @@ -83,6 +74,10 @@ def report_spans(self): logger.debug(" -> Span reporting thread is now alive") def span_work(): + if instana.singletons.agent.should_threads_shutdown.is_set(): + logger.debug("Thread shutdown signal from agent is active: Shutting down span reporting thread") + return False + queue_size = self.queue.qsize() if queue_size > 0 and instana.singletons.agent.can_send(): response = instana.singletons.agent.report_traces(self.queued_spans())