Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 61 additions & 32 deletions instana/agent.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
""" The in-process Instana agent that manages monitoring state and reporting that data. """
from __future__ import absolute_import

import json
Expand All @@ -8,16 +9,17 @@
import requests

import instana.singletons
from instana.collector import Collector

from .fsm import TheMachine
from .log import logger
from .sensor import Sensor
from .util import to_json, get_py_source, package_version
from .options import StandardOptions, AWSLambdaOptions
from instana.collector import Collector


class AnnounceData(object):
""" The Announce Payload """
pid = 0
agentUuid = ""

Expand All @@ -26,6 +28,7 @@ def __init__(self, **kwds):


class AWSLambdaFrom(object):
""" The source identifier for AWSLambdaAgent """
hl = True
cp = "aws"
e = "qualifiedARN"
Expand All @@ -35,6 +38,7 @@ def __init__(self, **kwds):


class BaseAgent(object):
""" Base class for all agent flavors """
client = requests.Session()
sensor = None

Expand Down Expand Up @@ -108,13 +112,22 @@ def reset(self):
self.machine.reset()

def is_timed_out(self):
"""
If we haven't heard from the Instana host agent in 60 seconds, this
method will return True.
@return: Boolean
"""
if self.last_seen and self.can_send:
diff = datetime.now() - self.last_seen
if diff.seconds > 60:
return True
return False

def can_send(self):
"""
Are we in a state where we can send data?
@return: Boolean
"""
# Watch for pid change (fork)
current_pid = os.getpid()
if self._boot_pid != current_pid:
Expand All @@ -129,6 +142,11 @@ def can_send(self):
return False

def set_from(self, json_string):
"""
Sets the source identifiers given to use by the Instana Host agent.
@param json_string: source identifiers
@return: None
"""
if type(json_string) is bytes:
raw_json = json_string.decode("UTF-8")
else:
Expand All @@ -147,33 +165,37 @@ def set_from(self, json_string):
self.announce_data = AnnounceData(pid=res_data['pid'], agentUuid=res_data['agentUuid'])

def get_from_structure(self):
"""
Retrieves the From data that is reported alongside monitoring data.
@return: dict()
"""
if os.environ.get("INSTANA_TEST", False):
fs = {'e': os.getpid(), 'h': 'fake'}
from_data = {'e': os.getpid(), 'h': 'fake'}
else:
fs = {'e': self.announce_data.pid, 'h': self.announce_data.agentUuid}
return fs
from_data = {'e': self.announce_data.pid, 'h': self.announce_data.agentUuid}
return from_data

def is_agent_listening(self, host, port):
"""
Check if the Instana Agent is listening on <host> and <port>.
@return: Boolean
"""
rv = False
result = False
try:
url = "http://%s:%s/" % (host, port)
response = self.client.get(url, timeout=0.8)

server_header = response.headers["Server"]
if server_header == self.AGENT_HEADER:
logger.debug("Instana host agent found on %s:%d", host, port)
rv = True
result = True
else:
logger.debug("...something is listening on %s:%d but it's not the Instana Host Agent: %s",
host, port, server_header)
except (requests.ConnectTimeout, requests.ConnectionError):
except:
logger.debug("Instana Host Agent not found on %s:%d", host, port)
rv = False
finally:
return rv
return result

def announce(self, discovery):
"""
Expand All @@ -190,23 +212,25 @@ def announce(self, discovery):

if response.status_code == 200:
self.last_seen = datetime.now()
except (requests.ConnectTimeout, requests.ConnectionError):
logger.debug("announce", exc_info=True)
except:
logger.debug("announce: ", exc_info=True)
finally:
return response

def is_agent_ready(self):
"""
Used after making a successful announce to test when the agent is ready to accept data.
"""
ready = False
try:
response = self.client.head(self.__data_url(), timeout=0.8)

if response.status_code == 200:
return True
return False
except (requests.ConnectTimeout, requests.ConnectionError):
logger.debug("is_agent_ready: Instana host agent connection error")
ready = True
except:
logger.debug("is_agent_ready: ", exc_info=True)
finally:
return ready

def report_data_payload(self, entity_data):
"""
Expand All @@ -219,12 +243,12 @@ def report_data_payload(self, entity_data):
headers={"Content-Type": "application/json"},
timeout=0.8)

# logger.warn("report_data: response.status_code is %s" % response.status_code)
# logger.warning("report_data: response.status_code is %s" % response.status_code)

if response.status_code == 200:
self.last_seen = datetime.now()
except (requests.ConnectTimeout, requests.ConnectionError):
logger.debug("report_data: Instana host agent connection error")
except:
logger.debug("report_data: Instana host agent connection error", exc_info=True)
finally:
return response

Expand All @@ -244,12 +268,12 @@ def report_traces(self, spans):
headers={"Content-Type": "application/json"},
timeout=0.8)

# logger.warn("report_traces: response.status_code is %s" % response.status_code)
# logger.debug("report_traces: response.status_code is %s" % response.status_code)

if response.status_code == 200:
self.last_seen = datetime.now()
except (requests.ConnectTimeout, requests.ConnectionError):
logger.debug("report_traces: Instana host agent connection error")
except:
logger.debug("report_traces: ", exc_info=True)
finally:
return response

Expand Down Expand Up @@ -286,10 +310,8 @@ def __task_response(self, message_id, data):
data=payload,
headers={"Content-Type": "application/json"},
timeout=0.8)
except (requests.ConnectTimeout, requests.ConnectionError):
logger.debug("task_response", exc_info=True)
except Exception:
logger.debug("task_response Exception", exc_info=True)
except:
logger.debug("task_response: ", exc_info=True)
finally:
return response

Expand Down Expand Up @@ -322,6 +344,7 @@ def __response_url(self, message_id):


class AWSLambdaAgent(BaseAgent):
""" In-process agent for AWS Lambda """
def __init__(self):
super(AWSLambdaAgent, self).__init__()

Expand All @@ -340,13 +363,21 @@ def __init__(self):
self.collector = Collector(self)
self.collector.start()
else:
logger.warn("Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. "
"We will not be able monitor this function.")
logger.warning("Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. "
"We will not be able monitor this function.")

def can_send(self):
"""
Are we in a state where we can send data?
@return: Boolean
"""
return self._can_send

def get_from_structure(self):
"""
Retrieves the From data that is reported alongside monitoring data.
@return: dict()
"""
return {'hl': True, 'cp': 'aws', 'e': self.collector.context.invoked_function_arn}

def report_data_payload(self, payload):
Expand All @@ -363,7 +394,7 @@ def report_data_payload(self, payload):
self.report_headers["X-Instana-Key"] = self.options.agent_key
self.report_headers["X-Instana-Time"] = str(round(time.time() * 1000))

logger.debug("using these headers: %s" % self.report_headers)
logger.debug("using these headers: %s", self.report_headers)

if 'INSTANA_DISABLE_CA_CHECK' in os.environ:
ssl_verify = False
Expand All @@ -376,9 +407,7 @@ def report_data_payload(self, payload):
timeout=self.options.timeout,
verify=ssl_verify)

logger.debug("report_data_payload: response.status_code is %s" % response.status_code)
except (requests.ConnectTimeout, requests.ConnectionError):
logger.debug("report_data_payload: ", exc_info=True)
logger.debug("report_data_payload: response.status_code is %s", response.status_code)
except:
logger.debug("report_data_payload: ", exc_info=True)
finally:
Expand All @@ -394,4 +423,4 @@ def __data_bundle_url(self):
"""
URL for posting metrics to the host agent. Only valid when announced.
"""
return "%s/bundle" % self.options.endpoint_url
return "%s/bundle" % self.options.endpoint_url
22 changes: 11 additions & 11 deletions instana/fsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,15 @@ def lookup_agent_host(self, e):
port = self.agent.options.agent_port

if self.agent.is_agent_listening(host, port):
self.agent.host = host
self.agent.port = port
self.fsm.announce()
return True
elif os.path.exists("/proc/"):

if os.path.exists("/proc/"):
host = get_default_gateway()
if host:
if self.agent.is_agent_listening(host, port):
self.agent.host = host
self.agent.port = port
self.agent.options.agent_host = host
self.agent.options.agent_port = port
self.fsm.announce()
return True

Expand All @@ -120,7 +119,8 @@ def lookup_agent_host(self, e):
return False

def announce_sensor(self, e):
logger.debug("Announcing sensor to the agent")
logger.debug("Attempting to make an announcement to the agent on %s:%d",
self.agent.options.agent_host, self.agent.options.agent_port)
pid = os.getpid()

try:
Expand All @@ -135,7 +135,7 @@ def announce_sensor(self, e):
# psutil which requires dev packages, gcc etc...
proc = subprocess.Popen(["ps", "-p", str(pid), "-o", "command"],
stdout=subprocess.PIPE)
(out, err) = proc.communicate()
(out, _) = proc.communicate()
parts = out.split(b'\n')
cmdline = [parts[1].decode("utf-8")]
except Exception:
Expand All @@ -149,7 +149,7 @@ def announce_sensor(self, e):
# If we're on a system with a procfs
if os.path.exists("/proc/"):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.agent.host, 42699))
sock.connect((self.agent.options.agent_host, self.agent.options.agent_port))
path = "/proc/%d/fd/%d" % (pid, sock.fileno())
d.fd = sock.fileno()
d.inode = os.readlink(path)
Expand All @@ -162,9 +162,9 @@ def announce_sensor(self, e):
logger.debug("Announced pid: %s (true pid: %s). Waiting for Agent Ready...",
str(pid), str(self.agent.announce_data.pid))
return True
else:
logger.debug("Cannot announce sensor. Scheduling retry.")
self.schedule_retry(self.announce_sensor, e, self.THREAD_NAME + ": announce")

logger.debug("Cannot announce sensor. Scheduling retry.")
self.schedule_retry(self.announce_sensor, e, self.THREAD_NAME + ": announce")
return False

def schedule_retry(self, fun, e, name):
Expand Down
2 changes: 2 additions & 0 deletions instana/options.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
""" Options for the in-process Instana agent """
import logging
import os


class StandardOptions(object):
""" Configurable option bits for this package """
service = None
service_name = None
agent_host = None
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def check_setuptools():
' and then try the install again.\n'
'Also:\n'
' `pip show setuptools` - shows the current version\n'
' To see the setuptool releases: \n'
' To see the setuptools releases: \n'
' https://setuptools.readthedocs.io/en/latest/history.html')


Expand Down