diff --git a/jaeger_client/config.py b/jaeger_client/config.py index c8aaf249..80229267 100644 --- a/jaeger_client/config.py +++ b/jaeger_client/config.py @@ -340,6 +340,7 @@ def new_tracer(self, io_loop=None): Create a new Jaeger Tracer based on the passed `jaeger_client.Config`. Does not set `opentracing.tracer` global variable. """ + channel = self._create_local_agent_channel(io_loop=io_loop) sampler = self.sampler if not sampler: @@ -360,7 +361,8 @@ def new_tracer(self, io_loop=None): flush_interval=self.reporter_flush_interval, logger=logger, metrics_factory=self._metrics_factory, - error_reporter=self.error_reporter) + error_reporter=self.error_reporter + ) if self.logging: reporter = CompositeReporter(reporter, LoggingReporter(logger)) diff --git a/jaeger_client/local_agent_net.py b/jaeger_client/local_agent_net.py index 5266fc51..4ffd6651 100644 --- a/jaeger_client/local_agent_net.py +++ b/jaeger_client/local_agent_net.py @@ -69,6 +69,8 @@ def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling # IOLoop self._thread_loop = None self.io_loop = io_loop or self._create_new_thread_loop() + self.reporting_port = reporting_port + self.host = host # HTTP sampling self.local_agent_http = LocalAgentHTTP(host, sampling_port) diff --git a/jaeger_client/reporter.py b/jaeger_client/reporter.py index 1fedb995..3ee066da 100644 --- a/jaeger_client/reporter.py +++ b/jaeger_client/reporter.py @@ -20,16 +20,13 @@ import tornado.gen import tornado.ioloop import tornado.queues -import socket from tornado.concurrent import Future from .constants import DEFAULT_FLUSH_INTERVAL -from . import thrift from . import ioloop_util from .metrics import Metrics, LegacyMetricsFactory +from .senders import UDPSender from .utils import ErrorReporter -from thrift.protocol import TCompactProtocol -from jaeger_client.thrift_gen.agent import Agent default_logger = logging.getLogger('jaeger_tracing') @@ -78,9 +75,11 @@ class Reporter(NullReporter): def __init__(self, channel, queue_capacity=100, batch_size=10, flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None, error_reporter=None, metrics=None, metrics_factory=None, - **kwargs): + sender=None, **kwargs): """ :param channel: a communication channel to jaeger-agent + :param sender: senders.Sender subclass implementing send method, + for sending batch of spans to jaeger. :param queue_capacity: how many spans we can hold in memory before starting to drop spans :param batch_size: how many spans we can submit at once to Collector @@ -97,19 +96,20 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, """ from threading import Lock - self._channel = channel + # TODO for next major rev: remove channel param in favor of sender + self._sender = sender or self._create_default_sender(channel) self.queue_capacity = queue_capacity self.batch_size = batch_size self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics()) self.metrics = ReporterMetrics(self.metrics_factory) self.error_reporter = error_reporter or ErrorReporter(Metrics()) self.logger = kwargs.get('logger', default_logger) - self.agent = Agent.Client(self._channel, self) if queue_capacity < batch_size: raise ValueError('Queue capacity cannot be less than batch size') - self.io_loop = io_loop or channel.io_loop + self.io_loop = io_loop or self.fetch_io_loop(channel, self._sender) + if self.io_loop is None: self.logger.error('Jaeger Reporter has no IOLoop') else: @@ -120,14 +120,24 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, self.flush_interval = flush_interval or None self.io_loop.spawn_callback(self._consume_queue) - self._process_lock = Lock() - self._process = None + @staticmethod + def fetch_io_loop(channel, sender): + if channel: + return channel.io_loop + elif sender: + return sender.io_loop + return None + + def _create_default_sender(self, channel): + sender = UDPSender( + port=channel.reporting_port, + host=channel.host, + io_loop=channel.io_loop + ) + return sender def set_process(self, service_name, tags, max_length): - with self._process_lock: - self._process = thrift.make_process( - service_name=service_name, tags=tags, max_length=max_length, - ) + self._sender.set_process(service_name, tags, max_length) def report_span(self, span): # We should not be calling `queue.put_nowait()` from random threads, @@ -150,14 +160,14 @@ def _report_span_from_ioloop(self, span): @tornado.gen.coroutine def _consume_queue(self): - spans = [] stopped = False + while not stopped: - while len(spans) < self.batch_size: + while self._sender.span_count < self.batch_size: try: # using timeout allows periodic flush with smaller packet timeout = self.flush_interval + self.io_loop.time() \ - if self.flush_interval and spans else None + if self.flush_interval and self._sender.span_count else None span = yield self.queue.get(timeout=timeout) except tornado.gen.TimeoutError: break @@ -168,52 +178,23 @@ def _consume_queue(self): # don't return yet, submit accumulated spans first break else: - spans.append(span) - if spans: - yield self._submit(spans) - for _ in spans: - self.queue.task_done() - spans = spans[:0] - self.metrics.reporter_queue_length(self.queue.qsize()) - self.logger.info('Span publisher exited') + self._sender.append(span) - # method for protocol factory - def getProtocol(self, transport): - """ - Implements Thrift ProtocolFactory interface - :param: transport: - :return: Thrift compact protocol - """ - return TCompactProtocol.TCompactProtocol(transport) + if self._sender.span_count: + num_spans = self._sender.span_count + try: + yield self._sender.flush() + except Exception as exc: + self.metrics.reporter_failure(num_spans) + self.error_reporter.error(exc) + else: + self.metrics.reporter_success(num_spans) - @tornado.gen.coroutine - def _submit(self, spans): - if not spans: - return - with self._process_lock: - process = self._process - if not process: - return - try: - batch = thrift.make_jaeger_batch(spans=spans, process=process) - yield self._send(batch) - self.metrics.reporter_success(len(spans)) - except socket.error as e: - self.metrics.reporter_failure(len(spans)) - self.error_reporter.error( - 'Failed to submit traces to jaeger-agent socket: %s', e) - except Exception as e: - self.metrics.reporter_failure(len(spans)) - self.error_reporter.error( - 'Failed to submit traces to jaeger-agent: %s', e) + for _ in range(num_spans): + self.queue.task_done() - @tornado.gen.coroutine - def _send(self, batch): - """ - Send batch of spans out via thrift transport. Any exceptions thrown - will be caught above in the exception handler of _submit(). - """ - return self.agent.emitBatch(batch) + self.metrics.reporter_queue_length(self.queue.qsize()) + self.logger.info('Span publisher exited') def close(self): """ diff --git a/jaeger_client/senders.py b/jaeger_client/senders.py new file mode 100644 index 00000000..f88ddc35 --- /dev/null +++ b/jaeger_client/senders.py @@ -0,0 +1,136 @@ +# Copyright (c) 2018 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import absolute_import + +import sys +import socket +import logging +import tornado.gen +from six import reraise +from threadloop import ThreadLoop + +from . import thrift +from .local_agent_net import LocalAgentSender +from thrift.protocol import TCompactProtocol + +from jaeger_client.thrift_gen.agent import Agent + + +DEFAULT_SAMPLING_PORT = 5778 + +logger = logging.getLogger('jaeger_tracing') + + +class Sender(object): + def __init__(self, io_loop=None): + from threading import Lock + self.io_loop = io_loop or self._create_new_thread_loop() + self._process_lock = Lock() + self._process = None + self.spans = [] + + def append(self, span): + """Queue a span for subsequent submission calls to flush()""" + self.spans.append(span) + + @property + def span_count(self): + return len(self.spans) + + @tornado.gen.coroutine + def flush(self): + """Examine span and process state before yielding to _flush() for batching and transport.""" + if self.spans: + with self._process_lock: + process = self._process + if process: + try: + yield self._flush(self.spans, self._process) + finally: + self.spans = [] + + @tornado.gen.coroutine + def _flush(self, spans, process): + """Batch spans and invokes send(). Override with specific batching logic, if desired.""" + batch = thrift.make_jaeger_batch(spans=spans, process=process) + yield self.send(batch) + + @tornado.gen.coroutine + def send(self, batch): + raise NotImplementedError('This method should be implemented by subclasses') + + def set_process(self, service_name, tags, max_length): + with self._process_lock: + self._process = thrift.make_process( + service_name=service_name, tags=tags, max_length=max_length, + ) + + def _create_new_thread_loop(self): + """ + Create a daemonized thread that will run Tornado IOLoop. + :return: the IOLoop backed by the new thread. + """ + self._thread_loop = ThreadLoop() + if not self._thread_loop.is_ready(): + self._thread_loop.start() + return self._thread_loop._io_loop + + +class UDPSender(Sender): + def __init__(self, host, port, io_loop=None): + super(UDPSender, self).__init__(io_loop=io_loop) + self.host = host + self.port = port + self.channel = self._create_local_agent_channel(self.io_loop) + self.agent = Agent.Client(self.channel, self) + + @tornado.gen.coroutine + def send(self, batch): + """ + Send batch of spans out via thrift transport. + """ + try: + yield self.agent.emitBatch(batch) + except socket.error as e: + reraise(type(e), + type(e)('Failed to submit traces to jaeger-agent socket: {}'.format(e)), + sys.exc_info()[2]) + except Exception as e: + reraise(type(e), type(e)('Failed to submit traces to jaeger-agent: {}'.format(e)), + sys.exc_info()[2]) + + def _create_local_agent_channel(self, io_loop): + """ + Create an out-of-process channel communicating to local jaeger-agent. + Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled + via JSON HTTP. + + :param self: instance of Config + """ + logger.info('Initializing Jaeger Tracer with UDP reporter') + return LocalAgentSender( + host=self.host, + sampling_port=DEFAULT_SAMPLING_PORT, + reporting_port=self.port, + io_loop=io_loop + ) + + # method for protocol factory + def getProtocol(self, transport): + """ + Implements Thrift ProtocolFactory interface + :param: transport: + :return: Thrift compact protocol + """ + return TCompactProtocol.TCompactProtocol(transport) diff --git a/tests/test_reporter.py b/tests/test_reporter.py index b7e8e69e..ca767805 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -96,7 +96,7 @@ def test_composite_reporter(): class FakeSender(object): """ - Mock the _send() method of the reporter by capturing requests + Mock the send() method of the reporter's Sender by capturing requests and returning incomplete futures that can be completed from inside the test. """ @@ -158,7 +158,7 @@ def _new_reporter(batch_size, flush=None, queue_cap=100): queue_capacity=queue_cap) reporter.set_process('service', {}, max_length=0) sender = FakeSender() - reporter._send = sender + reporter._sender.send = sender return reporter, sender @tornado.gen.coroutine @@ -198,17 +198,12 @@ def test_submit_failure(self): reporter_failure_key = 'jaeger:reporter_spans.result_err' assert reporter_failure_key not in reporter.metrics_factory.counters - # simulate exception in send - reporter._send = mock.MagicMock(side_effect=ValueError()) + reporter._sender.send = mock.MagicMock(side_effect=ValueError()) reporter.report_span(self._new_span('1')) - yield self._wait_for( lambda: reporter_failure_key in reporter.metrics_factory.counters) assert 1 == reporter.metrics_factory.counters.get(reporter_failure_key) - # silly test, for code coverage only - yield reporter._submit([]) - @gen_test def test_submit_queue_full_batch_size_1(self): reporter, sender = self._new_reporter(batch_size=1, queue_cap=1) @@ -275,7 +270,7 @@ def send(_): count[0] += 1 return future_result(True) - reporter._send = send + reporter._sender.send = send reporter.batch_size = 3 for i in range(10): reporter.report_span(self._new_span('%s' % i)) @@ -291,3 +286,22 @@ def send(_): yield reporter.close() assert reporter.queue.qsize() == 0, 'all spans drained' assert count[0] == 4, 'last span submitted in one extrac batch' + + +class TestReporterUnit: + @pytest.mark.parametrize( + 'channel, sender, expected', + [ + (None, None, None), + (None, type('X', (object,), {'io_loop': 'foo'}), 'foo'), + (type('X', (object,), {'io_loop': 'bar'}), None, 'bar'), + ( + type('X', (object,), {'io_loop': 'bar'}), + type('X', (object,), {'io_loop': 'foo'}), + 'bar' + ), + ] + ) + def test_reporter_fetch_io_loop_works_as_expected(self, channel, sender, expected): + result = Reporter.fetch_io_loop(channel, sender) + assert expected == result diff --git a/tests/test_senders.py b/tests/test_senders.py new file mode 100644 index 00000000..25717187 --- /dev/null +++ b/tests/test_senders.py @@ -0,0 +1,161 @@ +# Copyright (c) 2018 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import print_function + +import time +import socket +import collections + +import mock +import pytest +from tornado import ioloop +from tornado import gen +from tornado.testing import AsyncTestCase, gen_test + +from jaeger_client import senders +from jaeger_client import Span, SpanContext +from jaeger_client.local_agent_net import LocalAgentSender +from jaeger_client.thrift_gen.agent import Agent +from jaeger_client.thrift_gen.jaeger import ttypes +from thrift.protocol import TCompactProtocol + + +def test_base_sender_create_io_loop_if_not_provided(): + + sender = senders.Sender() + + assert sender.io_loop is not None + assert isinstance(sender.io_loop, ioloop.IOLoop) + + +def test_base_sender_send_not_implemented(): + sender = senders.Sender() + with pytest.raises(NotImplementedError): + sender.send(1).result() + + +def test_base_sender_set_process_instantiate_jaeger_process(): + sender = senders.Sender() + sender.set_process('service', {}, max_length=0) + assert isinstance(sender._process, ttypes.Process) + assert sender._process.serviceName == 'service' + + +def test_base_sender_spanless_flush_is_noop(): + sender = senders.Sender() + flushed = sender.flush().result() + assert flushed is None + + +def test_base_sender_processless_flush_is_noop(): + sender = senders.Sender() + sender.spans.append('foo') + flushed = sender.flush().result() + assert flushed is None + + +class CustomException(Exception): + pass + + +class SenderFlushTest(AsyncTestCase): + + def span(self): + FakeTracer = collections.namedtuple('FakeTracer', ['ip_address', 'service_name']) + tracer = FakeTracer(ip_address='127.0.0.1', service_name='reporter_test') + ctx = SpanContext(trace_id=1, span_id=1, parent_id=None, flags=1) + span = Span(context=ctx, tracer=tracer, operation_name='foo') + span.start_time = time.time() + span.end_time = span.start_time + 0.001 # 1ms + return span + + @gen_test + def test_base_sender_flush_raises_exceptions(self): + sender = senders.Sender() + sender.set_process('service', {}, max_length=0) + + sender.spans = [self.span()] + + sender.send = mock.MagicMock(side_effect=CustomException('Failed to send batch.')) + assert sender.span_count == 1 + + try: + yield sender.flush() + except Exception as exc: + assert isinstance(exc, CustomException) + assert str(exc) == 'Failed to send batch.' + else: + assert False, "Didn't Raise" + assert sender.span_count == 0 + + @gen_test + def test_udp_sender_flush_reraises_exceptions(self): + exceptions = ((CustomException, 'Failed to send batch.', + 'Failed to submit traces to jaeger-agent: Failed to send batch.'), + (socket.error, 'Connection Failed', + 'Failed to submit traces to jaeger-agent socket: Connection Failed')) + for exception, value, expected_value in exceptions: + sender = senders.UDPSender(host='mock', port=4242) + sender.set_process('service', {}, max_length=0) + + sender.spans = [self.span()] + + sender.agent.emitBatch = mock.MagicMock(side_effect=exception(value)) + assert sender.span_count == 1 + + try: + yield sender.flush() + except Exception as exc: + assert isinstance(exc, exception) + assert str(exc) == expected_value + else: + assert False, "Didn't Raise" + assert sender.span_count == 0 + + +def test_udp_sender_instantiate_thrift_agent(): + + sender = senders.UDPSender(host='mock', port=4242) + + assert sender.agent is not None + assert isinstance(sender.agent, Agent.Client) + + +def test_udp_sender_intantiate_local_agent_channel(): + + sender = senders.UDPSender(host='mock', port=4242) + + assert sender.channel is not None + assert sender.channel.io_loop == sender.io_loop + assert isinstance(sender.channel, LocalAgentSender) + + +def test_udp_sender_calls_agent_emitBatch_on_send(): + + test_data = {'foo': 'bar'} + sender = senders.UDPSender(host='mock', port=4242) + sender.agent = mock.Mock() + + sender.send(test_data) + + sender.agent.emitBatch.assert_called_once_with(test_data) + + +def test_udp_sender_implements_thrift_protocol_factory(): + + sender = senders.UDPSender(host='mock', port=4242) + + assert callable(sender.getProtocol) + protocol = sender.getProtocol(mock.MagicMock()) + assert isinstance(protocol, TCompactProtocol.TCompactProtocol)