Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Refactor for future span sending over http #176

Merged
merged 1 commit into from
May 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion crossdock/server/endtoend.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
SAMPLER_TYPE_REMOTE,
)
from jaeger_client.sampler import RemoteControlledSampler, ConstSampler
from jaeger_client.senders import UDPSender
from jaeger_client.reporter import Reporter
from jaeger_client.tracer import Tracer

Expand Down Expand Up @@ -65,8 +66,13 @@ def __init__(self):
init_sampler = cfg.sampler
channel = self.local_agent_sender

sender = UDPSender(
io_loop=channel.io_loop,
host=os.getenv('AGENT_HOST', 'jaeger-agent'),
port=cfg.local_agent_reporting_port,
)
reporter = Reporter(
channel=channel,
sender=sender,
flush_interval=cfg.reporter_flush_interval)

remote_sampler = RemoteControlledSampler(
Expand Down
10 changes: 9 additions & 1 deletion jaeger_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
RateLimitingSampler,
RemoteControlledSampler,
)
from .senders import UDPSender
from .constants import (
DEFAULT_SAMPLING_INTERVAL,
DEFAULT_FLUSH_INTERVAL,
Expand Down Expand Up @@ -306,6 +307,7 @@ def new_tracer(self, io_loop=None):
Create a new Jaeger Tracer based on the passed `jaeger_client.Config`.
Does not set `opentracing.tracer` global variable.
"""

channel = self._create_local_agent_channel(io_loop=io_loop)
sampler = self.sampler
if sampler is None:
Expand All @@ -319,8 +321,14 @@ def new_tracer(self, io_loop=None):
max_operations=self.max_operations)
logger.info('Using sampler %s', sampler)

sender = UDPSender(
host=self.local_agent_reporting_host,
port=self.local_agent_reporting_port,
io_loop=io_loop
)

reporter = Reporter(
channel=channel,
sender=sender,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@black-adder renaming the field is a breaking change. Are you sure this can be merged?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doh reverting

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's find a way to let this PR to get merged, but in backwards compatible way.

Copy link
Contributor Author

@ProvoK ProvoK May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did changes to make it backwards compatible. Should I open another PR?

edit: Still I don't like much the workaround I did. If you got better ideas I'm open

queue_capacity=self.reporter_queue_size,
batch_size=self.reporter_batch_size,
flush_interval=self.reporter_flush_interval,
Expand Down
23 changes: 6 additions & 17 deletions jaeger_client/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
from .metrics import Metrics, LegacyMetricsFactory
from .utils import ErrorReporter

from thrift.protocol import TCompactProtocol
from jaeger_client.thrift_gen.agent import Agent

default_logger = logging.getLogger('jaeger_tracing')

Expand Down Expand Up @@ -75,12 +73,13 @@ def report_span(self, span):

class Reporter(NullReporter):
"""Receives completed spans from Tracer and submits them out of process."""
def __init__(self, channel, queue_capacity=100, batch_size=10,
def __init__(self, sender, queue_capacity=100, batch_size=10,
flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None,
error_reporter=None, metrics=None, metrics_factory=None,
**kwargs):
"""
:param channel: a communication channel to jaeger-agent
:param sender: senders.Sender subclass implementing send method,
for sending batch of spans to jaeger.
:param queue_capacity: how many spans we can hold in memory before
starting to drop spans
:param batch_size: how many spans we can submit at once to Collector
Expand All @@ -97,19 +96,18 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
"""
from threading import Lock

self._channel = channel
self.sender = sender
self.queue_capacity = queue_capacity
self.batch_size = batch_size
self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics())
self.metrics = ReporterMetrics(self.metrics_factory)
self.error_reporter = error_reporter or ErrorReporter(Metrics())
self.logger = kwargs.get('logger', default_logger)
self.agent = Agent.Client(self._channel, self)

if queue_capacity < batch_size:
raise ValueError('Queue capacity cannot be less than batch size')

self.io_loop = io_loop or channel.io_loop
self.io_loop = io_loop or self.sender.io_loop
if self.io_loop is None:
self.logger.error('Jaeger Reporter has no IOLoop')
else:
Expand Down Expand Up @@ -177,15 +175,6 @@ def _consume_queue(self):
self.metrics.reporter_queue_length(self.queue.qsize())
self.logger.info('Span publisher exited')

# method for protocol factory
def getProtocol(self, transport):
"""
Implements Thrift ProtocolFactory interface
:param: transport:
:return: Thrift compact protocol
"""
return TCompactProtocol.TCompactProtocol(transport)

@tornado.gen.coroutine
def _submit(self, spans):
if not spans:
Expand Down Expand Up @@ -213,7 +202,7 @@ def _send(self, batch):
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught above in the exception handler of _submit().
"""
return self.agent.emitBatch(batch)
return self.sender.send(batch)

def close(self):
"""
Expand Down
91 changes: 91 additions & 0 deletions jaeger_client/senders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright (c) 2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import

import logging
from threadloop import ThreadLoop

from .local_agent_net import LocalAgentSender
from thrift.protocol import TCompactProtocol

from jaeger_client.thrift_gen.agent import Agent


DEFAULT_SAMPLING_PORT = 5778
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default should probably be moved to constants.
Actually is defined only in config (and here ofc)


logger = logging.getLogger('jaeger_tracing')


class Sender(object):
def __init__(self, host, port, io_loop=None):
self.host = host
self.port = port
self.io_loop = io_loop or self._create_new_thread_loop()

def send(self, batch):
raise NotImplementedError('This method should be implemented by subclasses')

def _create_new_thread_loop(self):
"""
Create a daemonized thread that will run Tornado IOLoop.
:return: the IOLoop backed by the new thread.
"""
self._thread_loop = ThreadLoop()
if not self._thread_loop.is_ready():
self._thread_loop.start()
return self._thread_loop._io_loop


class UDPSender(Sender):
def __init__(self, host, port, io_loop=None):
super(UDPSender, self).__init__(
host=host,
port=port,
io_loop=io_loop
)
self.channel = self._create_local_agent_channel(self.io_loop)
self.agent = Agent.Client(self.channel, self)

def send(self, batch):
""" Send batch of spans out via thrift transport.

Any exceptions thrown will be caught by the caller.
"""

return self.agent.emitBatch(batch)

def _create_local_agent_channel(self, io_loop):
"""
Create an out-of-process channel communicating to local jaeger-agent.
Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled
via JSON HTTP.

:param self: instance of Config
"""
logger.info('Initializing Jaeger Tracer with UDP reporter')
return LocalAgentSender(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said earlier here I would like to split responsibility between LocalAgentSender and LocalAgentHttp (which actually just perform an HTTP call for the sampling). I could do the separation in this PR, I removed in order to keep the changes small.
This would remove the need of the sampling_port argument

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did all but this one. This will touch a lot of things (Sampler and Config), so it's probably better do it in a separate PR

host=self.host,
sampling_port=DEFAULT_SAMPLING_PORT,
reporting_port=self.port,
io_loop=io_loop
)

# method for protocol factory
def getProtocol(self, transport):
"""
Implements Thrift ProtocolFactory interface
:param: transport:
:return: Thrift compact protocol
"""
return TCompactProtocol.TCompactProtocol(transport)
16 changes: 15 additions & 1 deletion tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _new_span(name):

@staticmethod
def _new_reporter(batch_size, flush=None, queue_cap=100):
reporter = Reporter(channel=mock.MagicMock(),
reporter = Reporter(sender=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=batch_size,
flush_interval=flush,
Expand Down Expand Up @@ -291,3 +291,17 @@ def send(_):
yield reporter.close()
assert reporter.queue.qsize() == 0, 'all spans drained'
assert count[0] == 4, 'last span submitted in one extrac batch'

@gen_test
def test_reporter_calls_sender_correctly(self):
reporter = Reporter(sender=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=10,
flush_interval=None,
metrics_factory=FakeMetricsFactory(),
error_reporter=HardErrorReporter(),
queue_capacity=100)
test_data = {'foo': 'bar'}

reporter._send(test_data)
reporter.sender.send.assert_called_once_with(test_data)
66 changes: 66 additions & 0 deletions tests/test_senders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright (c) 2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function


import mock
from tornado import ioloop

from jaeger_client import senders
from jaeger_client.local_agent_net import LocalAgentSender
from jaeger_client.thrift_gen.agent import Agent
from thrift.protocol import TCompactProtocol


def test_base_sender_create_io_loop_if_not_provided():

sender = senders.Sender(host='mock', port=4242)

assert sender.io_loop is not None
assert isinstance(sender.io_loop, ioloop.IOLoop)

def test_udp_sender_instantiate_thrift_agent():

sender = senders.UDPSender(host='mock', port=4242)

assert sender.agent is not None
assert isinstance(sender.agent, Agent.Client)


def test_udp_sender_intantiate_local_agent_channel():

sender = senders.UDPSender(host='mock', port=4242)

assert sender.channel is not None
assert sender.channel.io_loop == sender.io_loop
assert isinstance(sender.channel, LocalAgentSender)

def test_udp_sender_calls_agent_emitBatch_on_sending():

test_data = {'foo': 'bar'}
sender = senders.UDPSender(host='mock', port=4242)
sender.agent = mock.Mock()

sender.send(test_data)

sender.agent.emitBatch.assert_called_once_with(test_data)


def test_udp_sender_implements_thrift_protocol_factory():

sender = senders.UDPSender(host='mock', port=4242)

assert callable(sender.getProtocol)
protocol = sender.getProtocol(mock.MagicMock())
assert isinstance(protocol, TCompactProtocol.TCompactProtocol)