diff --git a/jaeger_client/config.py b/jaeger_client/config.py index 4658a7ab..bb0276cb 100644 --- a/jaeger_client/config.py +++ b/jaeger_client/config.py @@ -306,6 +306,7 @@ def new_tracer(self, io_loop=None): Create a new Jaeger Tracer based on the passed `jaeger_client.Config`. Does not set `opentracing.tracer` global variable. """ + channel = self._create_local_agent_channel(io_loop=io_loop) sampler = self.sampler if sampler is None: @@ -326,7 +327,8 @@ def new_tracer(self, io_loop=None): flush_interval=self.reporter_flush_interval, logger=logger, metrics_factory=self._metrics_factory, - error_reporter=self.error_reporter) + error_reporter=self.error_reporter + ) if self.logging: reporter = CompositeReporter(reporter, LoggingReporter(logger)) diff --git a/jaeger_client/local_agent_net.py b/jaeger_client/local_agent_net.py index 5627d2e9..625008ff 100644 --- a/jaeger_client/local_agent_net.py +++ b/jaeger_client/local_agent_net.py @@ -55,6 +55,8 @@ def __init__(self, host, sampling_port, reporting_port, io_loop=None): # IOLoop self._thread_loop = None self.io_loop = io_loop or self._create_new_thread_loop() + self.reporting_port = reporting_port + self.host = host # http sampling self.local_agent_http = LocalAgentHTTP(host, sampling_port) diff --git a/jaeger_client/reporter.py b/jaeger_client/reporter.py index 1fedb995..c1855d43 100644 --- a/jaeger_client/reporter.py +++ b/jaeger_client/reporter.py @@ -26,10 +26,9 @@ from . import thrift from . import ioloop_util from .metrics import Metrics, LegacyMetricsFactory +from .senders import UDPSender from .utils import ErrorReporter -from thrift.protocol import TCompactProtocol -from jaeger_client.thrift_gen.agent import Agent default_logger = logging.getLogger('jaeger_tracing') @@ -78,9 +77,11 @@ class Reporter(NullReporter): def __init__(self, channel, queue_capacity=100, batch_size=10, flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None, error_reporter=None, metrics=None, metrics_factory=None, - **kwargs): + sender=None, **kwargs): """ :param channel: a communication channel to jaeger-agent + :param sender: senders.Sender subclass implementing send method, + for sending batch of spans to jaeger. :param queue_capacity: how many spans we can hold in memory before starting to drop spans :param batch_size: how many spans we can submit at once to Collector @@ -97,19 +98,20 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, """ from threading import Lock - self._channel = channel + # TODO for next major rev: remove channel param in favor of sender + self._sender = sender or self._create_default_sender(channel) self.queue_capacity = queue_capacity self.batch_size = batch_size self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics()) self.metrics = ReporterMetrics(self.metrics_factory) self.error_reporter = error_reporter or ErrorReporter(Metrics()) self.logger = kwargs.get('logger', default_logger) - self.agent = Agent.Client(self._channel, self) if queue_capacity < batch_size: raise ValueError('Queue capacity cannot be less than batch size') - self.io_loop = io_loop or channel.io_loop + self.io_loop = io_loop or self.fetch_io_loop(channel, self._sender) + if self.io_loop is None: self.logger.error('Jaeger Reporter has no IOLoop') else: @@ -123,6 +125,23 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, self._process_lock = Lock() self._process = None + @staticmethod + def fetch_io_loop(channel, sender): + if channel: + return channel.io_loop + elif sender: + return sender.io_loop + return None + + + def _create_default_sender(self, channel): + sender = UDPSender( + port=channel.reporting_port, + host=channel.host, + io_loop=channel.io_loop + ) + return sender + def set_process(self, service_name, tags, max_length): with self._process_lock: self._process = thrift.make_process( @@ -177,15 +196,6 @@ def _consume_queue(self): self.metrics.reporter_queue_length(self.queue.qsize()) self.logger.info('Span publisher exited') - # method for protocol factory - def getProtocol(self, transport): - """ - Implements Thrift ProtocolFactory interface - :param: transport: - :return: Thrift compact protocol - """ - return TCompactProtocol.TCompactProtocol(transport) - @tornado.gen.coroutine def _submit(self, spans): if not spans: @@ -213,7 +223,7 @@ def _send(self, batch): Send batch of spans out via thrift transport. Any exceptions thrown will be caught above in the exception handler of _submit(). """ - return self.agent.emitBatch(batch) + return self._sender.send(batch) def close(self): """ diff --git a/jaeger_client/senders.py b/jaeger_client/senders.py new file mode 100644 index 00000000..30eb5277 --- /dev/null +++ b/jaeger_client/senders.py @@ -0,0 +1,91 @@ +# Copyright (c) 2018 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import absolute_import + +import logging +from threadloop import ThreadLoop + +from .local_agent_net import LocalAgentSender +from thrift.protocol import TCompactProtocol + +from jaeger_client.thrift_gen.agent import Agent + + +DEFAULT_SAMPLING_PORT = 5778 + +logger = logging.getLogger('jaeger_tracing') + + +class Sender(object): + def __init__(self, host, port, io_loop=None): + self.host = host + self.port = port + self.io_loop = io_loop or self._create_new_thread_loop() + + def send(self, batch): + raise NotImplementedError('This method should be implemented by subclasses') + + def _create_new_thread_loop(self): + """ + Create a daemonized thread that will run Tornado IOLoop. + :return: the IOLoop backed by the new thread. + """ + self._thread_loop = ThreadLoop() + if not self._thread_loop.is_ready(): + self._thread_loop.start() + return self._thread_loop._io_loop + + +class UDPSender(Sender): + def __init__(self, host, port, io_loop=None): + super(UDPSender, self).__init__( + host=host, + port=port, + io_loop=io_loop + ) + self.channel = self._create_local_agent_channel(self.io_loop) + self.agent = Agent.Client(self.channel, self) + + def send(self, batch): + """ Send batch of spans out via thrift transport. + + Any exceptions thrown will be caught by the caller. + """ + + return self.agent.emitBatch(batch) + + def _create_local_agent_channel(self, io_loop): + """ + Create an out-of-process channel communicating to local jaeger-agent. + Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled + via JSON HTTP. + + :param self: instance of Config + """ + logger.info('Initializing Jaeger Tracer with UDP reporter') + return LocalAgentSender( + host=self.host, + sampling_port=DEFAULT_SAMPLING_PORT, + reporting_port=self.port, + io_loop=io_loop + ) + + # method for protocol factory + def getProtocol(self, transport): + """ + Implements Thrift ProtocolFactory interface + :param: transport: + :return: Thrift compact protocol + """ + return TCompactProtocol.TCompactProtocol(transport) diff --git a/tests/test_reporter.py b/tests/test_reporter.py index b7e8e69e..ce875467 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -291,3 +291,38 @@ def send(_): yield reporter.close() assert reporter.queue.qsize() == 0, 'all spans drained' assert count[0] == 4, 'last span submitted in one extrac batch' + + +class TestReporterUnit: + def test_reporter_calls_sender_correctly(self): + reporter = Reporter( + channel=None, + sender=mock.MagicMock(), + io_loop=IOLoop.current(), + batch_size=10, + flush_interval=None, + metrics_factory=FakeMetricsFactory(), + error_reporter=HardErrorReporter(), + queue_capacity=100 + ) + test_data = {'foo': 'bar'} + + reporter._send(test_data) + reporter._sender.send.assert_called_once_with(test_data) + + @pytest.mark.parametrize( + 'channel, sender, expected', + [ + (None, None, None), + (None, type('X', (object,), {'io_loop': 'foo'}), 'foo'), + (type('X', (object,), {'io_loop': 'bar'}), None, 'bar'), + ( + type('X', (object,), {'io_loop': 'bar'}), + type('X', (object,), {'io_loop': 'foo'}), + 'bar' + ), + ] + ) + def test_reporter_fetch_io_loop_works_as_expected(self, channel, sender, expected): + result = Reporter.fetch_io_loop(channel, sender) + assert expected == result diff --git a/tests/test_senders.py b/tests/test_senders.py new file mode 100644 index 00000000..41b5860d --- /dev/null +++ b/tests/test_senders.py @@ -0,0 +1,66 @@ +# Copyright (c) 2018 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import print_function + + +import mock +from tornado import ioloop + +from jaeger_client import senders +from jaeger_client.local_agent_net import LocalAgentSender +from jaeger_client.thrift_gen.agent import Agent +from thrift.protocol import TCompactProtocol + + +def test_base_sender_create_io_loop_if_not_provided(): + + sender = senders.Sender(host='mock', port=4242) + + assert sender.io_loop is not None + assert isinstance(sender.io_loop, ioloop.IOLoop) + +def test_udp_sender_instantiate_thrift_agent(): + + sender = senders.UDPSender(host='mock', port=4242) + + assert sender.agent is not None + assert isinstance(sender.agent, Agent.Client) + + +def test_udp_sender_intantiate_local_agent_channel(): + + sender = senders.UDPSender(host='mock', port=4242) + + assert sender.channel is not None + assert sender.channel.io_loop == sender.io_loop + assert isinstance(sender.channel, LocalAgentSender) + +def test_udp_sender_calls_agent_emitBatch_on_sending(): + + test_data = {'foo': 'bar'} + sender = senders.UDPSender(host='mock', port=4242) + sender.agent = mock.Mock() + + sender.send(test_data) + + sender.agent.emitBatch.assert_called_once_with(test_data) + + +def test_udp_sender_implements_thrift_protocol_factory(): + + sender = senders.UDPSender(host='mock', port=4242) + + assert callable(sender.getProtocol) + protocol = sender.getProtocol(mock.MagicMock()) + assert isinstance(protocol, TCompactProtocol.TCompactProtocol)