Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion instana/agent/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
class BaseAgent(object):
""" Base class for all agent flavors """
client = None
sensor = None
options = None

def __init__(self):
Expand Down
80 changes: 57 additions & 23 deletions instana/collector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def __init__(self, agent):
# The agent for this process. Can be Standard, AWSLambda or Fargate
self.agent = agent

# The name assigned to the spawned thread
self.THREAD_NAME = "Instana Collector"

# The Queue where we store finished spans before they are sent
self.span_queue = queue.Queue()

Expand All @@ -50,30 +53,56 @@ def __init__(self, agent):
# Reporting interval for the background thread(s)
self.report_interval = 1

# Flag to indicate if start/shutdown state
self.started = False

def is_reporting_thread_running(self):
"""
Indicates if there is a thread running with the name self.THREAD_NAME
"""
for thread in threading.enumerate():
if thread.name == self.THREAD_NAME:
return True
return False

def start(self):
"""
Starts the collector and starts reporting as long as the agent is in a ready state.
@return: None
"""
if self.is_reporting_thread_running():
if self.thread_shutdown.is_set():
# Shutdown still in progress; Reschedule this start in 5 seconds from now
timer = threading.Timer(5, self.start)
timer.daemon = True
timer.name = "Collector Timed Start"
timer.start()
return
logger.debug("Collecter.start non-fatal: call but thread already running (started: %s)", self.started)
return

if self.agent.can_send():
logger.debug("BaseCollector.start: launching collection thread")
self.thread_shutdown.clear()
self.reporting_thread = threading.Thread(target=self.thread_loop, args=())
self.reporting_thread.setDaemon(True)
self.reporting_thread.setName(self.THREAD_NAME)
self.reporting_thread.start()
self.started = True
else:
logger.warning("BaseCollector.start: the agent tells us we can't send anything out.")
logger.warning("BaseCollector.start: the agent tells us we can't send anything out")

def shutdown(self, report_final=True):
"""
Shuts down the collector and reports any final data.
Shuts down the collector and reports any final data (if possible).
e.g. If the host agent disappeared, we won't be able to report final data.
@return: None
"""
logger.debug("Collector.shutdown: Reporting final data.")
self.thread_shutdown.set()

if report_final is True:
self.prepare_and_report_data()
self.started = False

def thread_loop(self):
"""
Expand All @@ -90,15 +119,31 @@ def background_report(self):
if self.thread_shutdown.is_set():
logger.debug("Thread shutdown signal is active: Shutting down reporting thread")
return False
return self.prepare_and_report_data()

def should_send_snapshot_data(self):
self.prepare_and_report_data()

if self.thread_shutdown.is_set():
logger.debug("Thread shutdown signal is active: Shutting down reporting thread")
return False

return True

def prepare_and_report_data(self):
"""
Determines if snapshot data should be sent
Prepare and report the data payload.
@return: Boolean
"""
logger.debug("BaseCollector: should_send_snapshot_data needs to be overridden")
return False
if env_is_test is False:
lock_acquired = self.background_report_lock.acquire(False)
if lock_acquired:
try:
payload = self.prepare_payload()
self.agent.report_data_payload(payload)
finally:
self.background_report_lock.release()
else:
logger.debug("prepare_and_report_data: Couldn't acquire lock")
return True

def prepare_payload(self):
"""
Expand All @@ -108,24 +153,13 @@ def prepare_payload(self):
logger.debug("BaseCollector: prepare_payload needs to be overridden")
return DictionaryOfStan()

def prepare_and_report_data(self):
def should_send_snapshot_data(self):
"""
Prepare and report the data payload.
Determines if snapshot data should be sent
@return: Boolean
"""
if env_is_test is True:
return True

lock_acquired = self.background_report_lock.acquire(False)
if lock_acquired:
try:
payload = self.prepare_payload()
self.agent.report_data_payload(payload)
finally:
self.background_report_lock.release()
else:
logger.debug("prepare_and_report_data: Couldn't acquire lock")
return True
logger.debug("BaseCollector: should_send_snapshot_data needs to be overridden")
return False

def collect_snapshot(self, *argv, **kwargs):
logger.debug("BaseCollector: collect_snapshot needs to be overridden")
Expand Down
2 changes: 1 addition & 1 deletion instana/collector/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ def prepare_payload(self):
if with_snapshot is True:
self.snapshot_data_last_sent = int(time())
except Exception:
logger.debug("collect_snapshot error", exc_info=True)
logger.debug("non-fatal prepare_payload:", exc_info=True)

return payload