Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
Preliminary refactor for supporting spans over HTTP
Browse files Browse the repository at this point in the history
`Reporter` now delegate span sending to new `Sender` classes .

Signed-off-by: vitto <vittorio.camisa@gmail.com>
  • Loading branch information
ProvoK committed May 29, 2018
1 parent 9429e29 commit dfd6548
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 20 deletions.
8 changes: 7 additions & 1 deletion crossdock/server/endtoend.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
SAMPLER_TYPE_REMOTE,
)
from jaeger_client.sampler import RemoteControlledSampler, ConstSampler
from jaeger_client.senders import UDPSender
from jaeger_client.reporter import Reporter
from jaeger_client.tracer import Tracer

Expand Down Expand Up @@ -65,8 +66,13 @@ def __init__(self):
init_sampler = cfg.sampler
channel = self.local_agent_sender

sender = UDPSender(
io_loop=channel.io_loop,
host=os.getenv('AGENT_HOST', 'jaeger-agent'),
port=cfg.local_agent_reporting_port,
)
reporter = Reporter(
channel=channel,
sender=sender,
flush_interval=cfg.reporter_flush_interval)

remote_sampler = RemoteControlledSampler(
Expand Down
10 changes: 9 additions & 1 deletion jaeger_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
RateLimitingSampler,
RemoteControlledSampler,
)
from .senders import UDPSender
from .constants import (
DEFAULT_SAMPLING_INTERVAL,
DEFAULT_FLUSH_INTERVAL,
Expand Down Expand Up @@ -306,6 +307,7 @@ def new_tracer(self, io_loop=None):
Create a new Jaeger Tracer based on the passed `jaeger_client.Config`.
Does not set `opentracing.tracer` global variable.
"""

channel = self._create_local_agent_channel(io_loop=io_loop)
sampler = self.sampler
if sampler is None:
Expand All @@ -319,8 +321,14 @@ def new_tracer(self, io_loop=None):
max_operations=self.max_operations)
logger.info('Using sampler %s', sampler)

sender = UDPSender(
host=self.local_agent_reporting_host,
port=self.local_agent_reporting_port,
io_loop=io_loop
)

reporter = Reporter(
channel=channel,
sender=sender,
queue_capacity=self.reporter_queue_size,
batch_size=self.reporter_batch_size,
flush_interval=self.reporter_flush_interval,
Expand Down
23 changes: 6 additions & 17 deletions jaeger_client/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
from .metrics import Metrics, LegacyMetricsFactory
from .utils import ErrorReporter

from thrift.protocol import TCompactProtocol
from jaeger_client.thrift_gen.agent import Agent

default_logger = logging.getLogger('jaeger_tracing')

Expand Down Expand Up @@ -75,12 +73,13 @@ def report_span(self, span):

class Reporter(NullReporter):
"""Receives completed spans from Tracer and submits them out of process."""
def __init__(self, channel, queue_capacity=100, batch_size=10,
def __init__(self, sender, queue_capacity=100, batch_size=10,
flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None,
error_reporter=None, metrics=None, metrics_factory=None,
**kwargs):
"""
:param channel: a communication channel to jaeger-agent
:param sender: senders.Sender subclass implementing send method,
for sending batch of spans to jaeger.
:param queue_capacity: how many spans we can hold in memory before
starting to drop spans
:param batch_size: how many spans we can submit at once to Collector
Expand All @@ -97,19 +96,18 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
"""
from threading import Lock

self._channel = channel
self.sender = sender
self.queue_capacity = queue_capacity
self.batch_size = batch_size
self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics())
self.metrics = ReporterMetrics(self.metrics_factory)
self.error_reporter = error_reporter or ErrorReporter(Metrics())
self.logger = kwargs.get('logger', default_logger)
self.agent = Agent.Client(self._channel, self)

if queue_capacity < batch_size:
raise ValueError('Queue capacity cannot be less than batch size')

self.io_loop = io_loop or channel.io_loop
self.io_loop = io_loop or self.sender.io_loop
if self.io_loop is None:
self.logger.error('Jaeger Reporter has no IOLoop')
else:
Expand Down Expand Up @@ -177,15 +175,6 @@ def _consume_queue(self):
self.metrics.reporter_queue_length(self.queue.qsize())
self.logger.info('Span publisher exited')

# method for protocol factory
def getProtocol(self, transport):
"""
Implements Thrift ProtocolFactory interface
:param: transport:
:return: Thrift compact protocol
"""
return TCompactProtocol.TCompactProtocol(transport)

@tornado.gen.coroutine
def _submit(self, spans):
if not spans:
Expand Down Expand Up @@ -213,7 +202,7 @@ def _send(self, batch):
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught above in the exception handler of _submit().
"""
return self.agent.emitBatch(batch)
return self.sender.send(batch)

def close(self):
"""
Expand Down
91 changes: 91 additions & 0 deletions jaeger_client/senders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright (c) 2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import

import logging
from threadloop import ThreadLoop

from .local_agent_net import LocalAgentSender
from thrift.protocol import TCompactProtocol

from jaeger_client.thrift_gen.agent import Agent


DEFAULT_SAMPLING_PORT = 5778

logger = logging.getLogger('jaeger_tracing')


class Sender(object):
def __init__(self, host, port, io_loop=None):
self.host = host
self.port = port
self.io_loop = io_loop or self._create_new_thread_loop()

def send(self, batch):
raise NotImplementedError('This method should be implemented by subclasses')

def _create_new_thread_loop(self):
"""
Create a daemonized thread that will run Tornado IOLoop.
:return: the IOLoop backed by the new thread.
"""
self._thread_loop = ThreadLoop()
if not self._thread_loop.is_ready():
self._thread_loop.start()
return self._thread_loop._io_loop


class UDPSender(Sender):
def __init__(self, host, port, io_loop=None):
super(UDPSender, self).__init__(
host=host,
port=port,
io_loop=io_loop
)
self.channel = self._create_local_agent_channel(self.io_loop)
self.agent = Agent.Client(self.channel, self)

def send(self, batch):
""" Send batch of spans out via thrift transport.
Any exceptions thrown will be caught by the caller.
"""

return self.agent.emitBatch(batch)

def _create_local_agent_channel(self, io_loop):
"""
Create an out-of-process channel communicating to local jaeger-agent.
Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled
via JSON HTTP.
:param self: instance of Config
"""
logger.info('Initializing Jaeger Tracer with UDP reporter')
return LocalAgentSender(
host=self.host,
sampling_port=DEFAULT_SAMPLING_PORT,
reporting_port=self.port,
io_loop=io_loop
)

# method for protocol factory
def getProtocol(self, transport):
"""
Implements Thrift ProtocolFactory interface
:param: transport:
:return: Thrift compact protocol
"""
return TCompactProtocol.TCompactProtocol(transport)
16 changes: 15 additions & 1 deletion tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _new_span(name):

@staticmethod
def _new_reporter(batch_size, flush=None, queue_cap=100):
reporter = Reporter(channel=mock.MagicMock(),
reporter = Reporter(sender=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=batch_size,
flush_interval=flush,
Expand Down Expand Up @@ -291,3 +291,17 @@ def send(_):
yield reporter.close()
assert reporter.queue.qsize() == 0, 'all spans drained'
assert count[0] == 4, 'last span submitted in one extrac batch'

@gen_test
def test_reporter_calls_sender_correctly(self):
reporter = Reporter(sender=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=10,
flush_interval=None,
metrics_factory=FakeMetricsFactory(),
error_reporter=HardErrorReporter(),
queue_capacity=100)
test_data = {'foo': 'bar'}

reporter._send(test_data)
reporter.sender.send.assert_called_once_with(test_data)
66 changes: 66 additions & 0 deletions tests/test_senders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright (c) 2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function


import mock
from tornado import ioloop

from jaeger_client import senders
from jaeger_client.local_agent_net import LocalAgentSender
from jaeger_client.thrift_gen.agent import Agent
from thrift.protocol import TCompactProtocol


def test_base_sender_create_io_loop_if_not_provided():

sender = senders.Sender(host='mock', port=4242)

assert sender.io_loop is not None
assert isinstance(sender.io_loop, ioloop.IOLoop)

def test_udp_sender_instantiate_thrift_agent():

sender = senders.UDPSender(host='mock', port=4242)

assert sender.agent is not None
assert isinstance(sender.agent, Agent.Client)


def test_udp_sender_intantiate_local_agent_channel():

sender = senders.UDPSender(host='mock', port=4242)

assert sender.channel is not None
assert sender.channel.io_loop == sender.io_loop
assert isinstance(sender.channel, LocalAgentSender)

def test_udp_sender_calls_agent_emitBatch_on_sending():

test_data = {'foo': 'bar'}
sender = senders.UDPSender(host='mock', port=4242)
sender.agent = mock.Mock()

sender.send(test_data)

sender.agent.emitBatch.assert_called_once_with(test_data)


def test_udp_sender_implements_thrift_protocol_factory():

sender = senders.UDPSender(host='mock', port=4242)

assert callable(sender.getProtocol)
protocol = sender.getProtocol(mock.MagicMock())
assert isinstance(protocol, TCompactProtocol.TCompactProtocol)

0 comments on commit dfd6548

Please sign in to comment.