diff --git a/crossdock/server/endtoend.py b/crossdock/server/endtoend.py index 03d603e1..b1f277ff 100644 --- a/crossdock/server/endtoend.py +++ b/crossdock/server/endtoend.py @@ -27,6 +27,7 @@ SAMPLER_TYPE_REMOTE, ) from jaeger_client.sampler import RemoteControlledSampler, ConstSampler +from jaeger_client.senders import UDPSender from jaeger_client.reporter import Reporter from jaeger_client.tracer import Tracer @@ -65,8 +66,13 @@ def __init__(self): init_sampler = cfg.sampler channel = self.local_agent_sender + sender = UDPSender( + io_loop=channel.io_loop, + host=os.getenv('AGENT_HOST', 'jaeger-agent'), + port=cfg.local_agent_reporting_port, + ) reporter = Reporter( - channel=channel, + sender=sender, flush_interval=cfg.reporter_flush_interval) remote_sampler = RemoteControlledSampler( diff --git a/jaeger_client/config.py b/jaeger_client/config.py index 4658a7ab..608945f0 100644 --- a/jaeger_client/config.py +++ b/jaeger_client/config.py @@ -33,6 +33,7 @@ RateLimitingSampler, RemoteControlledSampler, ) +from .senders import UDPSender from .constants import ( DEFAULT_SAMPLING_INTERVAL, DEFAULT_FLUSH_INTERVAL, @@ -306,6 +307,7 @@ def new_tracer(self, io_loop=None): Create a new Jaeger Tracer based on the passed `jaeger_client.Config`. Does not set `opentracing.tracer` global variable. """ + channel = self._create_local_agent_channel(io_loop=io_loop) sampler = self.sampler if sampler is None: @@ -319,8 +321,14 @@ def new_tracer(self, io_loop=None): max_operations=self.max_operations) logger.info('Using sampler %s', sampler) + sender = UDPSender( + host=self.local_agent_reporting_host, + port=self.local_agent_reporting_port, + io_loop=io_loop + ) + reporter = Reporter( - channel=channel, + sender=sender, queue_capacity=self.reporter_queue_size, batch_size=self.reporter_batch_size, flush_interval=self.reporter_flush_interval, diff --git a/jaeger_client/reporter.py b/jaeger_client/reporter.py index 1fedb995..8515c9b3 100644 --- a/jaeger_client/reporter.py +++ b/jaeger_client/reporter.py @@ -28,8 +28,6 @@ from .metrics import Metrics, LegacyMetricsFactory from .utils import ErrorReporter -from thrift.protocol import TCompactProtocol -from jaeger_client.thrift_gen.agent import Agent default_logger = logging.getLogger('jaeger_tracing') @@ -75,12 +73,13 @@ def report_span(self, span): class Reporter(NullReporter): """Receives completed spans from Tracer and submits them out of process.""" - def __init__(self, channel, queue_capacity=100, batch_size=10, + def __init__(self, sender, queue_capacity=100, batch_size=10, flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None, error_reporter=None, metrics=None, metrics_factory=None, **kwargs): """ - :param channel: a communication channel to jaeger-agent + :param sender: senders.Sender subclass implementing send method, + for sending batch of spans to jaeger. :param queue_capacity: how many spans we can hold in memory before starting to drop spans :param batch_size: how many spans we can submit at once to Collector @@ -97,19 +96,18 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, """ from threading import Lock - self._channel = channel + self.sender = sender self.queue_capacity = queue_capacity self.batch_size = batch_size self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics()) self.metrics = ReporterMetrics(self.metrics_factory) self.error_reporter = error_reporter or ErrorReporter(Metrics()) self.logger = kwargs.get('logger', default_logger) - self.agent = Agent.Client(self._channel, self) if queue_capacity < batch_size: raise ValueError('Queue capacity cannot be less than batch size') - self.io_loop = io_loop or channel.io_loop + self.io_loop = io_loop or self.sender.io_loop if self.io_loop is None: self.logger.error('Jaeger Reporter has no IOLoop') else: @@ -177,15 +175,6 @@ def _consume_queue(self): self.metrics.reporter_queue_length(self.queue.qsize()) self.logger.info('Span publisher exited') - # method for protocol factory - def getProtocol(self, transport): - """ - Implements Thrift ProtocolFactory interface - :param: transport: - :return: Thrift compact protocol - """ - return TCompactProtocol.TCompactProtocol(transport) - @tornado.gen.coroutine def _submit(self, spans): if not spans: @@ -213,7 +202,7 @@ def _send(self, batch): Send batch of spans out via thrift transport. Any exceptions thrown will be caught above in the exception handler of _submit(). """ - return self.agent.emitBatch(batch) + return self.sender.send(batch) def close(self): """ diff --git a/jaeger_client/senders.py b/jaeger_client/senders.py new file mode 100644 index 00000000..30eb5277 --- /dev/null +++ b/jaeger_client/senders.py @@ -0,0 +1,91 @@ +# Copyright (c) 2018 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import absolute_import + +import logging +from threadloop import ThreadLoop + +from .local_agent_net import LocalAgentSender +from thrift.protocol import TCompactProtocol + +from jaeger_client.thrift_gen.agent import Agent + + +DEFAULT_SAMPLING_PORT = 5778 + +logger = logging.getLogger('jaeger_tracing') + + +class Sender(object): + def __init__(self, host, port, io_loop=None): + self.host = host + self.port = port + self.io_loop = io_loop or self._create_new_thread_loop() + + def send(self, batch): + raise NotImplementedError('This method should be implemented by subclasses') + + def _create_new_thread_loop(self): + """ + Create a daemonized thread that will run Tornado IOLoop. + :return: the IOLoop backed by the new thread. + """ + self._thread_loop = ThreadLoop() + if not self._thread_loop.is_ready(): + self._thread_loop.start() + return self._thread_loop._io_loop + + +class UDPSender(Sender): + def __init__(self, host, port, io_loop=None): + super(UDPSender, self).__init__( + host=host, + port=port, + io_loop=io_loop + ) + self.channel = self._create_local_agent_channel(self.io_loop) + self.agent = Agent.Client(self.channel, self) + + def send(self, batch): + """ Send batch of spans out via thrift transport. + + Any exceptions thrown will be caught by the caller. + """ + + return self.agent.emitBatch(batch) + + def _create_local_agent_channel(self, io_loop): + """ + Create an out-of-process channel communicating to local jaeger-agent. + Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled + via JSON HTTP. + + :param self: instance of Config + """ + logger.info('Initializing Jaeger Tracer with UDP reporter') + return LocalAgentSender( + host=self.host, + sampling_port=DEFAULT_SAMPLING_PORT, + reporting_port=self.port, + io_loop=io_loop + ) + + # method for protocol factory + def getProtocol(self, transport): + """ + Implements Thrift ProtocolFactory interface + :param: transport: + :return: Thrift compact protocol + """ + return TCompactProtocol.TCompactProtocol(transport) diff --git a/tests/test_reporter.py b/tests/test_reporter.py index b7e8e69e..6b3dfb45 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -149,7 +149,7 @@ def _new_span(name): @staticmethod def _new_reporter(batch_size, flush=None, queue_cap=100): - reporter = Reporter(channel=mock.MagicMock(), + reporter = Reporter(sender=mock.MagicMock(), io_loop=IOLoop.current(), batch_size=batch_size, flush_interval=flush, @@ -291,3 +291,17 @@ def send(_): yield reporter.close() assert reporter.queue.qsize() == 0, 'all spans drained' assert count[0] == 4, 'last span submitted in one extrac batch' + + @gen_test + def test_reporter_calls_sender_correctly(self): + reporter = Reporter(sender=mock.MagicMock(), + io_loop=IOLoop.current(), + batch_size=10, + flush_interval=None, + metrics_factory=FakeMetricsFactory(), + error_reporter=HardErrorReporter(), + queue_capacity=100) + test_data = {'foo': 'bar'} + + reporter._send(test_data) + reporter.sender.send.assert_called_once_with(test_data) diff --git a/tests/test_senders.py b/tests/test_senders.py new file mode 100644 index 00000000..41b5860d --- /dev/null +++ b/tests/test_senders.py @@ -0,0 +1,66 @@ +# Copyright (c) 2018 Uber Technologies, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import print_function + + +import mock +from tornado import ioloop + +from jaeger_client import senders +from jaeger_client.local_agent_net import LocalAgentSender +from jaeger_client.thrift_gen.agent import Agent +from thrift.protocol import TCompactProtocol + + +def test_base_sender_create_io_loop_if_not_provided(): + + sender = senders.Sender(host='mock', port=4242) + + assert sender.io_loop is not None + assert isinstance(sender.io_loop, ioloop.IOLoop) + +def test_udp_sender_instantiate_thrift_agent(): + + sender = senders.UDPSender(host='mock', port=4242) + + assert sender.agent is not None + assert isinstance(sender.agent, Agent.Client) + + +def test_udp_sender_intantiate_local_agent_channel(): + + sender = senders.UDPSender(host='mock', port=4242) + + assert sender.channel is not None + assert sender.channel.io_loop == sender.io_loop + assert isinstance(sender.channel, LocalAgentSender) + +def test_udp_sender_calls_agent_emitBatch_on_sending(): + + test_data = {'foo': 'bar'} + sender = senders.UDPSender(host='mock', port=4242) + sender.agent = mock.Mock() + + sender.send(test_data) + + sender.agent.emitBatch.assert_called_once_with(test_data) + + +def test_udp_sender_implements_thrift_protocol_factory(): + + sender = senders.UDPSender(host='mock', port=4242) + + assert callable(sender.getProtocol) + protocol = sender.getProtocol(mock.MagicMock()) + assert isinstance(protocol, TCompactProtocol.TCompactProtocol)