Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
Preliminary refactor for supporting spans over HTTP
Browse files Browse the repository at this point in the history
`Reporter` now delegate span sending to new `Sender` classes .

Signed-off-by: Vittorio Camisa <vittorio.camisa@gmail.com>
  • Loading branch information
ProvoK committed May 31, 2018
1 parent 9429e29 commit b37df0a
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 17 deletions.
4 changes: 3 additions & 1 deletion jaeger_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def new_tracer(self, io_loop=None):
Create a new Jaeger Tracer based on the passed `jaeger_client.Config`.
Does not set `opentracing.tracer` global variable.
"""

channel = self._create_local_agent_channel(io_loop=io_loop)
sampler = self.sampler
if sampler is None:
Expand All @@ -326,7 +327,8 @@ def new_tracer(self, io_loop=None):
flush_interval=self.reporter_flush_interval,
logger=logger,
metrics_factory=self._metrics_factory,
error_reporter=self.error_reporter)
error_reporter=self.error_reporter
)

if self.logging:
reporter = CompositeReporter(reporter, LoggingReporter(logger))
Expand Down
2 changes: 2 additions & 0 deletions jaeger_client/local_agent_net.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def __init__(self, host, sampling_port, reporting_port, io_loop=None):
# IOLoop
self._thread_loop = None
self.io_loop = io_loop or self._create_new_thread_loop()
self.reporting_port = reporting_port
self.host = host

# http sampling
self.local_agent_http = LocalAgentHTTP(host, sampling_port)
Expand Down
32 changes: 16 additions & 16 deletions jaeger_client/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
from . import thrift
from . import ioloop_util
from .metrics import Metrics, LegacyMetricsFactory
from .senders import UDPSender
from .utils import ErrorReporter

from thrift.protocol import TCompactProtocol
from jaeger_client.thrift_gen.agent import Agent

default_logger = logging.getLogger('jaeger_tracing')

Expand Down Expand Up @@ -78,9 +77,11 @@ class Reporter(NullReporter):
def __init__(self, channel, queue_capacity=100, batch_size=10,
flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None,
error_reporter=None, metrics=None, metrics_factory=None,
**kwargs):
sender=None, **kwargs):
"""
:param channel: a communication channel to jaeger-agent
:param sender: senders.Sender subclass implementing send method,
for sending batch of spans to jaeger.
:param queue_capacity: how many spans we can hold in memory before
starting to drop spans
:param batch_size: how many spans we can submit at once to Collector
Expand All @@ -97,19 +98,19 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
"""
from threading import Lock

self._channel = channel
# TODO for next major rev: remove channel param in favor of sender
self.sender = sender or self._create_default_sender(channel)
self.queue_capacity = queue_capacity
self.batch_size = batch_size
self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics())
self.metrics = ReporterMetrics(self.metrics_factory)
self.error_reporter = error_reporter or ErrorReporter(Metrics())
self.logger = kwargs.get('logger', default_logger)
self.agent = Agent.Client(self._channel, self)

if queue_capacity < batch_size:
raise ValueError('Queue capacity cannot be less than batch size')

self.io_loop = io_loop or channel.io_loop
self.io_loop = io_loop or channel.io_loop or self.sender.io_loop
if self.io_loop is None:
self.logger.error('Jaeger Reporter has no IOLoop')
else:
Expand All @@ -123,6 +124,14 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
self._process_lock = Lock()
self._process = None

def _create_default_sender(self, channel):
sender = UDPSender(
port=channel.reporting_port,
host=channel.host,
io_loop=channel.io_loop
)
return sender

def set_process(self, service_name, tags, max_length):
with self._process_lock:
self._process = thrift.make_process(
Expand Down Expand Up @@ -177,15 +186,6 @@ def _consume_queue(self):
self.metrics.reporter_queue_length(self.queue.qsize())
self.logger.info('Span publisher exited')

# method for protocol factory
def getProtocol(self, transport):
"""
Implements Thrift ProtocolFactory interface
:param: transport:
:return: Thrift compact protocol
"""
return TCompactProtocol.TCompactProtocol(transport)

@tornado.gen.coroutine
def _submit(self, spans):
if not spans:
Expand Down Expand Up @@ -213,7 +213,7 @@ def _send(self, batch):
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught above in the exception handler of _submit().
"""
return self.agent.emitBatch(batch)
return self.sender.send(batch)

def close(self):
"""
Expand Down
91 changes: 91 additions & 0 deletions jaeger_client/senders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright (c) 2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import

import logging
from threadloop import ThreadLoop

from .local_agent_net import LocalAgentSender
from thrift.protocol import TCompactProtocol

from jaeger_client.thrift_gen.agent import Agent


DEFAULT_SAMPLING_PORT = 5778

logger = logging.getLogger('jaeger_tracing')


class Sender(object):
def __init__(self, host, port, io_loop=None):
self.host = host
self.port = port
self.io_loop = io_loop or self._create_new_thread_loop()

def send(self, batch):
raise NotImplementedError('This method should be implemented by subclasses')

def _create_new_thread_loop(self):
"""
Create a daemonized thread that will run Tornado IOLoop.
:return: the IOLoop backed by the new thread.
"""
self._thread_loop = ThreadLoop()
if not self._thread_loop.is_ready():
self._thread_loop.start()
return self._thread_loop._io_loop


class UDPSender(Sender):
def __init__(self, host, port, io_loop=None):
super(UDPSender, self).__init__(
host=host,
port=port,
io_loop=io_loop
)
self.channel = self._create_local_agent_channel(self.io_loop)
self.agent = Agent.Client(self.channel, self)

def send(self, batch):
""" Send batch of spans out via thrift transport.
Any exceptions thrown will be caught by the caller.
"""

return self.agent.emitBatch(batch)

def _create_local_agent_channel(self, io_loop):
"""
Create an out-of-process channel communicating to local jaeger-agent.
Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled
via JSON HTTP.
:param self: instance of Config
"""
logger.info('Initializing Jaeger Tracer with UDP reporter')
return LocalAgentSender(
host=self.host,
sampling_port=DEFAULT_SAMPLING_PORT,
reporting_port=self.port,
io_loop=io_loop
)

# method for protocol factory
def getProtocol(self, transport):
"""
Implements Thrift ProtocolFactory interface
:param: transport:
:return: Thrift compact protocol
"""
return TCompactProtocol.TCompactProtocol(transport)
17 changes: 17 additions & 0 deletions tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,20 @@ def send(_):
yield reporter.close()
assert reporter.queue.qsize() == 0, 'all spans drained'
assert count[0] == 4, 'last span submitted in one extrac batch'

@gen_test
def test_reporter_calls_sender_correctly(self):
reporter = Reporter(
channel=None,
sender=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=10,
flush_interval=None,
metrics_factory=FakeMetricsFactory(),
error_reporter=HardErrorReporter(),
queue_capacity=100
)
test_data = {'foo': 'bar'}

reporter._send(test_data)
reporter.sender.send.assert_called_once_with(test_data)
66 changes: 66 additions & 0 deletions tests/test_senders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright (c) 2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function


import mock
from tornado import ioloop

from jaeger_client import senders
from jaeger_client.local_agent_net import LocalAgentSender
from jaeger_client.thrift_gen.agent import Agent
from thrift.protocol import TCompactProtocol


def test_base_sender_create_io_loop_if_not_provided():

sender = senders.Sender(host='mock', port=4242)

assert sender.io_loop is not None
assert isinstance(sender.io_loop, ioloop.IOLoop)

def test_udp_sender_instantiate_thrift_agent():

sender = senders.UDPSender(host='mock', port=4242)

assert sender.agent is not None
assert isinstance(sender.agent, Agent.Client)


def test_udp_sender_intantiate_local_agent_channel():

sender = senders.UDPSender(host='mock', port=4242)

assert sender.channel is not None
assert sender.channel.io_loop == sender.io_loop
assert isinstance(sender.channel, LocalAgentSender)

def test_udp_sender_calls_agent_emitBatch_on_sending():

test_data = {'foo': 'bar'}
sender = senders.UDPSender(host='mock', port=4242)
sender.agent = mock.Mock()

sender.send(test_data)

sender.agent.emitBatch.assert_called_once_with(test_data)


def test_udp_sender_implements_thrift_protocol_factory():

sender = senders.UDPSender(host='mock', port=4242)

assert callable(sender.getProtocol)
protocol = sender.getProtocol(mock.MagicMock())
assert isinstance(protocol, TCompactProtocol.TCompactProtocol)

0 comments on commit b37df0a

Please sign in to comment.