Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions instana/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import os
from datetime import datetime
from threading import Timer
import threading
import requests

import instana.singletons
Expand Down Expand Up @@ -47,15 +47,21 @@ class Agent(object):
secrets_matcher = 'contains-ignore-case'
secrets_list = ['key', 'password', 'secret']
client = requests.Session()
should_threads_shutdown = threading.Event()

def __init__(self):
logger.debug("initializing agent")
self.sensor = Sensor(self)
self.machine = TheMachine(self)

def start(self, e):
""" Starts the agent and required threads """
"""
Starts the agent and required threads

This method is called after a successful announce. See fsm.py
"""
logger.debug("Spawning metric & span reporting threads")
self.should_threads_shutdown.clear()
self.sensor.start()
instana.singletons.tracer.recorder.start()

Expand All @@ -66,15 +72,18 @@ def handle_fork(self):
# Reset the Agent
self.reset()

# Ask the sensor to handle the fork
self.sensor.handle_fork()

# Ask the tracer to handle the fork
instana.singletons.tracer.handle_fork()

def reset(self):
"""
This will reset the agent to a fresh unannounced state.
:return: None
"""
# Will signal to any running background threads to shutdown.
self.should_threads_shutdown.set()

self.last_seen = None
self.from_ = From()

# Will schedule a restart of the announce cycle in the future
self.machine.reset()

def to_json(self, o):
Expand Down
27 changes: 21 additions & 6 deletions instana/fsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def to_dict(self):

class TheMachine(object):
RETRY_PERIOD = 30
THREAD_NAME = "Instana Machine"

agent = None
fsm = None
Expand Down Expand Up @@ -71,7 +72,7 @@ def __init__(self, agent):

self.timer = t.Timer(5, self.fsm.lookup)
self.timer.daemon = True
self.timer.name = "Instana Machine"
self.timer.name = self.THREAD_NAME
self.timer.start()

@staticmethod
Expand All @@ -80,9 +81,25 @@ def print_state_change(e):
(os.getpid(), t.current_thread().name, e.event, e.src, e.dst))

def reset(self):
self.fsm.lookup()
"""
reset is called to start from scratch in a process. It may be called on first boot or
after a detected fork.

Here we time a new announce cycle in the future so that any existing threads have time
to exit before we re-create them.

:return: void
"""
logger.debug("State machine being reset. Will schedule new announce cycle 6 seconds from now.")

self.timer = t.Timer(6, self.fsm.lookup)
self.timer.daemon = True
self.timer.name = self.THREAD_NAME
self.timer.start()

def lookup_agent_host(self, e):
self.agent.should_threads_shutdown.clear()

host, port = self.__get_agent_host_port()

if self.agent.is_agent_listening(host, port):
Expand All @@ -103,14 +120,12 @@ def lookup_agent_host(self, e):
logger.warn("Instana Host Agent couldn't be found. Will retry periodically...")
self.warnedPeriodic = True

self.schedule_retry(self.lookup_agent_host, e, "agent_lookup")
self.schedule_retry(self.lookup_agent_host, e, self.THREAD_NAME + ": agent_lookup")
return False

def announce_sensor(self, e):
logger.debug("Announcing sensor to the agent")
sock = None
pid = os.getpid()
cmdline = []

try:
if os.path.isfile("/proc/self/cmdline"):
Expand Down Expand Up @@ -152,7 +167,7 @@ def announce_sensor(self, e):
return True
else:
logger.debug("Cannot announce sensor. Scheduling retry.")
self.schedule_retry(self.announce_sensor, e, "announce")
self.schedule_retry(self.announce_sensor, e, self.THREAD_NAME + ": announce")
return False

def schedule_retry(self, fun, e, name):
Expand Down
21 changes: 8 additions & 13 deletions instana/meter.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ def start(self):
Calling this directly more than once without an actual fork will cause errors.
"""
self.reset()

if self.thread.isAlive() is False:
self.thread.start()
self.thread.start()

def reset(self):
"""" Reset the state as new """
Expand All @@ -154,16 +152,9 @@ def reset(self):
self.snapshot_countdown = 0
self.thread = None

# Prepare the thread for metric collection/reporting
for thread in threading.enumerate():
if thread.getName() == self.THREAD_NAME:
# Metric thread already exists; Make sure we re-use this one.
self.thread = thread

if self.thread is None:
self.thread = threading.Thread(target=self.collect_and_report)
self.thread.daemon = True
self.thread.name = self.THREAD_NAME
self.thread = threading.Thread(target=self.collect_and_report)
self.thread.daemon = True
self.thread.name = self.THREAD_NAME

def handle_fork(self):
self.start()
Expand All @@ -176,6 +167,10 @@ def collect_and_report(self):
logger.debug(" -> Metric reporting thread is now alive")

def metric_work():
if self.agent.should_threads_shutdown.is_set():
logger.debug("Thread shutdown signal from agent is active: Shutting down metric reporting thread")
return False

self.process()

if self.agent.is_timed_out():
Expand Down
21 changes: 8 additions & 13 deletions instana/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,13 @@ def start(self):
Calling this directly more than once without an actual fork will cause errors.
"""
self.reset()

if self.thread.isAlive() is False:
self.thread.start()
self.thread.start()

def reset(self):
# Prepare the thread for metric collection/reporting
for thread in threading.enumerate():
if thread.getName() == self.THREAD_NAME:
# Span reporting thread already exists; Make sure we re-use this one.
self.thread = thread

# Prepare the thread for span collection/reporting
if self.thread is None:
self.thread = threading.Thread(target=self.report_spans)
self.thread.daemon = True
self.thread.name = self.THREAD_NAME
self.thread = threading.Thread(target=self.report_spans)
self.thread.daemon = True
self.thread.name = self.THREAD_NAME

def handle_fork(self):
self.start()
Expand All @@ -83,6 +74,10 @@ def report_spans(self):
logger.debug(" -> Span reporting thread is now alive")

def span_work():
if instana.singletons.agent.should_threads_shutdown.is_set():
logger.debug("Thread shutdown signal from agent is active: Shutting down span reporting thread")
return False

queue_size = self.queue.qsize()
if queue_size > 0 and instana.singletons.agent.can_send():
response = instana.singletons.agent.report_traces(self.queued_spans())
Expand Down