Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
Revert "Preliminary refactor for supporting spans over HTTP (#176)"
Browse files Browse the repository at this point in the history
This reverts commit 51df6ab.
  • Loading branch information
black-adder committed May 30, 2018
1 parent 108b626 commit d0dd4b2
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 194 deletions.
8 changes: 1 addition & 7 deletions crossdock/server/endtoend.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
SAMPLER_TYPE_REMOTE,
)
from jaeger_client.sampler import RemoteControlledSampler, ConstSampler
from jaeger_client.senders import UDPSender
from jaeger_client.reporter import Reporter
from jaeger_client.tracer import Tracer

Expand Down Expand Up @@ -66,13 +65,8 @@ def __init__(self):
init_sampler = cfg.sampler
channel = self.local_agent_sender

sender = UDPSender(
io_loop=channel.io_loop,
host=os.getenv('AGENT_HOST', 'jaeger-agent'),
port=cfg.local_agent_reporting_port,
)
reporter = Reporter(
sender=sender,
channel=channel,
flush_interval=cfg.reporter_flush_interval)

remote_sampler = RemoteControlledSampler(
Expand Down
10 changes: 1 addition & 9 deletions jaeger_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
RateLimitingSampler,
RemoteControlledSampler,
)
from .senders import UDPSender
from .constants import (
DEFAULT_SAMPLING_INTERVAL,
DEFAULT_FLUSH_INTERVAL,
Expand Down Expand Up @@ -307,7 +306,6 @@ def new_tracer(self, io_loop=None):
Create a new Jaeger Tracer based on the passed `jaeger_client.Config`.
Does not set `opentracing.tracer` global variable.
"""

channel = self._create_local_agent_channel(io_loop=io_loop)
sampler = self.sampler
if sampler is None:
Expand All @@ -321,14 +319,8 @@ def new_tracer(self, io_loop=None):
max_operations=self.max_operations)
logger.info('Using sampler %s', sampler)

sender = UDPSender(
host=self.local_agent_reporting_host,
port=self.local_agent_reporting_port,
io_loop=io_loop
)

reporter = Reporter(
sender=sender,
channel=channel,
queue_capacity=self.reporter_queue_size,
batch_size=self.reporter_batch_size,
flush_interval=self.reporter_flush_interval,
Expand Down
23 changes: 17 additions & 6 deletions jaeger_client/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from .metrics import Metrics, LegacyMetricsFactory
from .utils import ErrorReporter

from thrift.protocol import TCompactProtocol
from jaeger_client.thrift_gen.agent import Agent

default_logger = logging.getLogger('jaeger_tracing')

Expand Down Expand Up @@ -73,13 +75,12 @@ def report_span(self, span):

class Reporter(NullReporter):
"""Receives completed spans from Tracer and submits them out of process."""
def __init__(self, sender, queue_capacity=100, batch_size=10,
def __init__(self, channel, queue_capacity=100, batch_size=10,
flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None,
error_reporter=None, metrics=None, metrics_factory=None,
**kwargs):
"""
:param sender: senders.Sender subclass implementing send method,
for sending batch of spans to jaeger.
:param channel: a communication channel to jaeger-agent
:param queue_capacity: how many spans we can hold in memory before
starting to drop spans
:param batch_size: how many spans we can submit at once to Collector
Expand All @@ -96,18 +97,19 @@ def __init__(self, sender, queue_capacity=100, batch_size=10,
"""
from threading import Lock

self.sender = sender
self._channel = channel
self.queue_capacity = queue_capacity
self.batch_size = batch_size
self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics())
self.metrics = ReporterMetrics(self.metrics_factory)
self.error_reporter = error_reporter or ErrorReporter(Metrics())
self.logger = kwargs.get('logger', default_logger)
self.agent = Agent.Client(self._channel, self)

if queue_capacity < batch_size:
raise ValueError('Queue capacity cannot be less than batch size')

self.io_loop = io_loop or self.sender.io_loop
self.io_loop = io_loop or channel.io_loop
if self.io_loop is None:
self.logger.error('Jaeger Reporter has no IOLoop')
else:
Expand Down Expand Up @@ -175,6 +177,15 @@ def _consume_queue(self):
self.metrics.reporter_queue_length(self.queue.qsize())
self.logger.info('Span publisher exited')

# method for protocol factory
def getProtocol(self, transport):
"""
Implements Thrift ProtocolFactory interface
:param: transport:
:return: Thrift compact protocol
"""
return TCompactProtocol.TCompactProtocol(transport)

@tornado.gen.coroutine
def _submit(self, spans):
if not spans:
Expand Down Expand Up @@ -202,7 +213,7 @@ def _send(self, batch):
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught above in the exception handler of _submit().
"""
return self.sender.send(batch)
return self.agent.emitBatch(batch)

def close(self):
"""
Expand Down
91 changes: 0 additions & 91 deletions jaeger_client/senders.py

This file was deleted.

16 changes: 1 addition & 15 deletions tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _new_span(name):

@staticmethod
def _new_reporter(batch_size, flush=None, queue_cap=100):
reporter = Reporter(sender=mock.MagicMock(),
reporter = Reporter(channel=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=batch_size,
flush_interval=flush,
Expand Down Expand Up @@ -291,17 +291,3 @@ def send(_):
yield reporter.close()
assert reporter.queue.qsize() == 0, 'all spans drained'
assert count[0] == 4, 'last span submitted in one extrac batch'

@gen_test
def test_reporter_calls_sender_correctly(self):
reporter = Reporter(sender=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=10,
flush_interval=None,
metrics_factory=FakeMetricsFactory(),
error_reporter=HardErrorReporter(),
queue_capacity=100)
test_data = {'foo': 'bar'}

reporter._send(test_data)
reporter.sender.send.assert_called_once_with(test_data)
66 changes: 0 additions & 66 deletions tests/test_senders.py

This file was deleted.

0 comments on commit d0dd4b2

Please sign in to comment.