-
Notifications
You must be signed in to change notification settings - Fork 155
Preliminary refactor for supporting spans over HTTP #186
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,16 +20,13 @@ | |
import tornado.gen | ||
import tornado.ioloop | ||
import tornado.queues | ||
import socket | ||
from tornado.concurrent import Future | ||
from .constants import DEFAULT_FLUSH_INTERVAL | ||
from . import thrift | ||
from . import ioloop_util | ||
from .metrics import Metrics, LegacyMetricsFactory | ||
from .senders import UDPSender | ||
from .utils import ErrorReporter | ||
|
||
from thrift.protocol import TCompactProtocol | ||
from jaeger_client.thrift_gen.agent import Agent | ||
|
||
default_logger = logging.getLogger('jaeger_tracing') | ||
|
||
|
@@ -78,9 +75,11 @@ class Reporter(NullReporter): | |
def __init__(self, channel, queue_capacity=100, batch_size=10, | ||
flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None, | ||
error_reporter=None, metrics=None, metrics_factory=None, | ||
**kwargs): | ||
sender=None, **kwargs): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would making sender a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @black-adder I think that putting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree |
||
""" | ||
:param channel: a communication channel to jaeger-agent | ||
:param sender: senders.Sender subclass implementing send method, | ||
for sending batch of spans to jaeger. | ||
:param queue_capacity: how many spans we can hold in memory before | ||
starting to drop spans | ||
:param batch_size: how many spans we can submit at once to Collector | ||
|
@@ -97,19 +96,20 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, | |
""" | ||
from threading import Lock | ||
|
||
self._channel = channel | ||
# TODO for next major rev: remove channel param in favor of sender | ||
self._sender = sender or self._create_default_sender(channel) | ||
self.queue_capacity = queue_capacity | ||
self.batch_size = batch_size | ||
self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics()) | ||
self.metrics = ReporterMetrics(self.metrics_factory) | ||
self.error_reporter = error_reporter or ErrorReporter(Metrics()) | ||
self.logger = kwargs.get('logger', default_logger) | ||
self.agent = Agent.Client(self._channel, self) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is technically a breaking change since it was already public before |
||
|
||
if queue_capacity < batch_size: | ||
raise ValueError('Queue capacity cannot be less than batch size') | ||
|
||
self.io_loop = io_loop or channel.io_loop | ||
self.io_loop = io_loop or self.fetch_io_loop(channel, self._sender) | ||
|
||
if self.io_loop is None: | ||
self.logger.error('Jaeger Reporter has no IOLoop') | ||
else: | ||
|
@@ -120,14 +120,24 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, | |
self.flush_interval = flush_interval or None | ||
self.io_loop.spawn_callback(self._consume_queue) | ||
|
||
self._process_lock = Lock() | ||
self._process = None | ||
@staticmethod | ||
def fetch_io_loop(channel, sender): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this be private? |
||
if channel: | ||
return channel.io_loop | ||
elif sender: | ||
return sender.io_loop | ||
return None | ||
|
||
def _create_default_sender(self, channel): | ||
sender = UDPSender( | ||
port=channel.reporting_port, | ||
host=channel.host, | ||
io_loop=channel.io_loop | ||
) | ||
return sender | ||
|
||
def set_process(self, service_name, tags, max_length): | ||
with self._process_lock: | ||
self._process = thrift.make_process( | ||
service_name=service_name, tags=tags, max_length=max_length, | ||
) | ||
self._sender.set_process(service_name, tags, max_length) | ||
|
||
def report_span(self, span): | ||
# We should not be calling `queue.put_nowait()` from random threads, | ||
|
@@ -150,14 +160,14 @@ def _report_span_from_ioloop(self, span): | |
|
||
@tornado.gen.coroutine | ||
def _consume_queue(self): | ||
spans = [] | ||
stopped = False | ||
|
||
while not stopped: | ||
while len(spans) < self.batch_size: | ||
while self._sender.span_count < self.batch_size: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as Yuri mentioned, this is fine for now but ideally the reporter only appends to the sender and both the http sender and the udp sender maintains it's own batch size. |
||
try: | ||
# using timeout allows periodic flush with smaller packet | ||
timeout = self.flush_interval + self.io_loop.time() \ | ||
if self.flush_interval and spans else None | ||
if self.flush_interval and self._sender.span_count else None | ||
span = yield self.queue.get(timeout=timeout) | ||
except tornado.gen.TimeoutError: | ||
break | ||
|
@@ -168,52 +178,23 @@ def _consume_queue(self): | |
# don't return yet, submit accumulated spans first | ||
break | ||
else: | ||
spans.append(span) | ||
if spans: | ||
yield self._submit(spans) | ||
for _ in spans: | ||
self.queue.task_done() | ||
spans = spans[:0] | ||
self.metrics.reporter_queue_length(self.queue.qsize()) | ||
self.logger.info('Span publisher exited') | ||
self._sender.append(span) | ||
|
||
# method for protocol factory | ||
def getProtocol(self, transport): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. breaking change |
||
""" | ||
Implements Thrift ProtocolFactory interface | ||
:param: transport: | ||
:return: Thrift compact protocol | ||
""" | ||
return TCompactProtocol.TCompactProtocol(transport) | ||
if self._sender.span_count: | ||
num_spans = self._sender.span_count | ||
try: | ||
yield self._sender.flush() | ||
except Exception as exc: | ||
self.metrics.reporter_failure(num_spans) | ||
self.error_reporter.error(exc) | ||
else: | ||
self.metrics.reporter_success(num_spans) | ||
|
||
@tornado.gen.coroutine | ||
def _submit(self, spans): | ||
if not spans: | ||
return | ||
with self._process_lock: | ||
process = self._process | ||
if not process: | ||
return | ||
try: | ||
batch = thrift.make_jaeger_batch(spans=spans, process=process) | ||
yield self._send(batch) | ||
self.metrics.reporter_success(len(spans)) | ||
except socket.error as e: | ||
self.metrics.reporter_failure(len(spans)) | ||
self.error_reporter.error( | ||
'Failed to submit traces to jaeger-agent socket: %s', e) | ||
except Exception as e: | ||
self.metrics.reporter_failure(len(spans)) | ||
self.error_reporter.error( | ||
'Failed to submit traces to jaeger-agent: %s', e) | ||
for _ in range(num_spans): | ||
self.queue.task_done() | ||
|
||
@tornado.gen.coroutine | ||
def _send(self, batch): | ||
""" | ||
Send batch of spans out via thrift transport. Any exceptions thrown | ||
will be caught above in the exception handler of _submit(). | ||
""" | ||
return self.agent.emitBatch(batch) | ||
self.metrics.reporter_queue_length(self.queue.qsize()) | ||
self.logger.info('Span publisher exited') | ||
|
||
def close(self): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
# Copyright (c) 2018 Uber Technologies, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
from __future__ import absolute_import | ||
|
||
import sys | ||
import socket | ||
import logging | ||
import tornado.gen | ||
from six import reraise | ||
from threadloop import ThreadLoop | ||
|
||
from . import thrift | ||
from .local_agent_net import LocalAgentSender | ||
from thrift.protocol import TCompactProtocol | ||
|
||
from jaeger_client.thrift_gen.agent import Agent | ||
|
||
|
||
DEFAULT_SAMPLING_PORT = 5778 | ||
|
||
logger = logging.getLogger('jaeger_tracing') | ||
|
||
|
||
class Sender(object): | ||
def __init__(self, io_loop=None): | ||
from threading import Lock | ||
self.io_loop = io_loop or self._create_new_thread_loop() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can these variables be private? |
||
self._process_lock = Lock() | ||
self._process = None | ||
self.spans = [] | ||
|
||
def append(self, span): | ||
"""Queue a span for subsequent submission calls to flush()""" | ||
self.spans.append(span) | ||
|
||
@property | ||
def span_count(self): | ||
return len(self.spans) | ||
|
||
@tornado.gen.coroutine | ||
def flush(self): | ||
"""Examine span and process state before yielding to _flush() for batching and transport.""" | ||
if self.spans: | ||
with self._process_lock: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for another PR, but the locking here seems a little weird looking at it now. I wonder why we only lock on the process but skip locking on the spans which could be flushed and emptied in another thread. |
||
process = self._process | ||
if process: | ||
try: | ||
yield self._flush(self.spans, self._process) | ||
finally: | ||
self.spans = [] | ||
|
||
@tornado.gen.coroutine | ||
def _flush(self, spans, process): | ||
"""Batch spans and invokes send(). Override with specific batching logic, if desired.""" | ||
batch = thrift.make_jaeger_batch(spans=spans, process=process) | ||
yield self.send(batch) | ||
|
||
@tornado.gen.coroutine | ||
def send(self, batch): | ||
raise NotImplementedError('This method should be implemented by subclasses') | ||
|
||
def set_process(self, service_name, tags, max_length): | ||
with self._process_lock: | ||
self._process = thrift.make_process( | ||
service_name=service_name, tags=tags, max_length=max_length, | ||
) | ||
|
||
def _create_new_thread_loop(self): | ||
""" | ||
Create a daemonized thread that will run Tornado IOLoop. | ||
:return: the IOLoop backed by the new thread. | ||
""" | ||
self._thread_loop = ThreadLoop() | ||
if not self._thread_loop.is_ready(): | ||
self._thread_loop.start() | ||
return self._thread_loop._io_loop | ||
|
||
|
||
class UDPSender(Sender): | ||
def __init__(self, host, port, io_loop=None): | ||
super(UDPSender, self).__init__(io_loop=io_loop) | ||
self.host = host | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we make all these variables private? |
||
self.port = port | ||
self.channel = self._create_local_agent_channel(self.io_loop) | ||
self.agent = Agent.Client(self.channel, self) | ||
|
||
@tornado.gen.coroutine | ||
def send(self, batch): | ||
""" | ||
Send batch of spans out via thrift transport. | ||
""" | ||
try: | ||
yield self.agent.emitBatch(batch) | ||
except socket.error as e: | ||
reraise(type(e), | ||
type(e)('Failed to submit traces to jaeger-agent socket: {}'.format(e)), | ||
sys.exc_info()[2]) | ||
except Exception as e: | ||
reraise(type(e), type(e)('Failed to submit traces to jaeger-agent: {}'.format(e)), | ||
sys.exc_info()[2]) | ||
|
||
def _create_local_agent_channel(self, io_loop): | ||
""" | ||
Create an out-of-process channel communicating to local jaeger-agent. | ||
Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled | ||
via JSON HTTP. | ||
|
||
:param self: instance of Config | ||
""" | ||
logger.info('Initializing Jaeger Tracer with UDP reporter') | ||
return LocalAgentSender( | ||
host=self.host, | ||
sampling_port=DEFAULT_SAMPLING_PORT, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we're only using the |
||
reporting_port=self.port, | ||
io_loop=io_loop | ||
) | ||
|
||
# method for protocol factory | ||
def getProtocol(self, transport): | ||
""" | ||
Implements Thrift ProtocolFactory interface | ||
:param: transport: | ||
:return: Thrift compact protocol | ||
""" | ||
return TCompactProtocol.TCompactProtocol(transport) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that the sole purpose for saving these is for the initializing the UDP sender, I'd rather make these private. We're going to nuke this class in the future anyway, let's not increase the public API if we don't have to.