Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
Preliminary refactor for supporting spans over HTTP
Browse files Browse the repository at this point in the history
This include changes in `Reporter` and `RemoteControlledSampler`
init methods.
`Reporter` now delegate span sending to new `Sender` classes .

Signed-off-by: vitto <vittorio.camisa@gmail.com>
  • Loading branch information
ProvoK committed May 20, 2018
1 parent dee0547 commit f6eb07b
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 51 deletions.
21 changes: 14 additions & 7 deletions crossdock/server/endtoend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json
import os

from jaeger_client.local_agent_net import LocalAgentSender
from jaeger_client.local_agent_net import LocalAgentHTTP, LocalAgentSender
from jaeger_client.config import (
Config,
DEFAULT_SAMPLING_PORT,
Expand All @@ -27,6 +27,7 @@
SAMPLER_TYPE_REMOTE,
)
from jaeger_client.sampler import RemoteControlledSampler, ConstSampler
from jaeger_client.senders import UDPSender
from jaeger_client.reporter import Reporter
from jaeger_client.tracer import Tracer

Expand Down Expand Up @@ -63,13 +64,20 @@ class EndToEndHandler(object):
def __init__(self):
cfg = Config(config)
init_sampler = cfg.sampler
channel = self.local_agent_sender
channel = self.local_agent_http
io_loop = tornado.ioloop.IOLoop.current()

sender = UDPSender(
io_loop=io_loop,
host=cfg.local_agent_reporting_host,
port=cfg.local_agent_reporting_port
)
reporter = Reporter(
channel=channel,
sender=sender,
flush_interval=cfg.reporter_flush_interval)

remote_sampler = RemoteControlledSampler(
io_loop=io_loop,
channel=channel,
service_name=cfg.service_name,
sampling_refresh_interval=cfg.sampling_refresh_interval,
Expand Down Expand Up @@ -100,11 +108,10 @@ def tracers(self, tracers):
self._tracers = tracers

@property
def local_agent_sender(self):
return LocalAgentSender(
def local_agent_http(self):
return LocalAgentHTTP(
host=os.getenv('AGENT_HOST', 'jaeger-agent'),
sampling_port=DEFAULT_SAMPLING_PORT,
reporting_port=DEFAULT_REPORTING_PORT,
port=DEFAULT_SAMPLING_PORT,
)

@tornado.gen.coroutine
Expand Down
40 changes: 20 additions & 20 deletions jaeger_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import opentracing
from opentracing.propagation import Format
from . import Tracer
from .local_agent_net import LocalAgentSender
from .local_agent_net import LocalAgentHTTP
from .reporter import (
Reporter,
CompositeReporter,
Expand All @@ -33,6 +33,10 @@
RateLimitingSampler,
RemoteControlledSampler,
)
from .senders import (
HTTPSender,
UDPSender
)
from .constants import (
DEFAULT_SAMPLING_INTERVAL,
DEFAULT_FLUSH_INTERVAL,
Expand Down Expand Up @@ -306,11 +310,16 @@ def new_tracer(self, io_loop=None):
Create a new Jaeger Tracer based on the passed `jaeger_client.Config`.
Does not set `opentracing.tracer` global variable.
"""
channel = self._create_local_agent_channel(io_loop=io_loop)

sampler = self.sampler
if sampler is None:
sampling_channel = LocalAgentHTTP(
port=self.local_agent_sampling_port,
host=self.local_agent_reporting_host
)
sampler = RemoteControlledSampler(
channel=channel,
io_loop=io_loop,
channel=sampling_channel,
service_name=self.service_name,
logger=logger,
metrics_factory=self._metrics_factory,
Expand All @@ -319,8 +328,15 @@ def new_tracer(self, io_loop=None):
max_operations=self.max_operations)
logger.info('Using sampler %s', sampler)

sender_cls = UDPSender if not self.local_agent_enabled else HTTPSender
sender = sender_cls(
host=self.local_agent_reporting_host,
port=self.local_agent_reporting_port,
io_loop=io_loop
)

reporter = Reporter(
channel=channel,
sender=sender,
queue_capacity=self.reporter_queue_size,
batch_size=self.reporter_batch_size,
flush_interval=self.reporter_flush_interval,
Expand Down Expand Up @@ -354,19 +370,3 @@ def _initialize_global_tracer(self, tracer):
opentracing.tracer = tracer
logger.info('opentracing.tracer initialized to %s[app_name=%s]',
tracer, self.service_name)

def _create_local_agent_channel(self, io_loop):
"""
Create an out-of-process channel communicating to local jaeger-agent.
Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled
via JSON HTTP.
:param self: instance of Config
"""
logger.info('Initializing Jaeger Tracer with UDP reporter')
return LocalAgentSender(
host=self.local_agent_reporting_host,
sampling_port=self.local_agent_sampling_port,
reporting_port=self.local_agent_reporting_port,
io_loop=io_loop
)
10 changes: 1 addition & 9 deletions jaeger_client/local_agent_net.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,11 @@ class LocalAgentSender(TBufferedTransport):
end of the batch span submission call.
"""

def __init__(self, host, sampling_port, reporting_port, io_loop=None):
def __init__(self, host, reporting_port, io_loop=None):
# IOLoop
self._thread_loop = None
self.io_loop = io_loop or self._create_new_thread_loop()

# http sampling
self.local_agent_http = LocalAgentHTTP(host, sampling_port)

# udp reporting - this will only get written to after our flush() call.
# We are buffering things up because we are a TBufferedTransport.
udp = TUDPTransport(host, reporting_port)
Expand All @@ -77,8 +74,3 @@ def _create_new_thread_loop(self):
def readFrame(self):
"""Empty read frame that is never ready"""
return Future()

# Pass-through for the http
def request_sampling_strategy(self, service_name, timeout):
return self.local_agent_http.request_sampling_strategy(
service_name, timeout)
12 changes: 5 additions & 7 deletions jaeger_client/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from .utils import ErrorReporter

from thrift.protocol import TCompactProtocol
from jaeger_client.thrift_gen.agent import Agent

default_logger = logging.getLogger('jaeger_tracing')

Expand Down Expand Up @@ -75,12 +74,12 @@ def report_span(self, span):

class Reporter(NullReporter):
"""Receives completed spans from Tracer and submits them out of process."""
def __init__(self, channel, queue_capacity=100, batch_size=10,
def __init__(self, sender, queue_capacity=100, batch_size=10,
flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None,
error_reporter=None, metrics=None, metrics_factory=None,
**kwargs):
"""
:param channel: a communication channel to jaeger-agent
:param sender: TODO doc
:param queue_capacity: how many spans we can hold in memory before
starting to drop spans
:param batch_size: how many spans we can submit at once to Collector
Expand All @@ -97,19 +96,18 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
"""
from threading import Lock

self._channel = channel
self.sender = sender
self.queue_capacity = queue_capacity
self.batch_size = batch_size
self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics())
self.metrics = ReporterMetrics(self.metrics_factory)
self.error_reporter = error_reporter or ErrorReporter(Metrics())
self.logger = kwargs.get('logger', default_logger)
self.agent = Agent.Client(self._channel, self)

if queue_capacity < batch_size:
raise ValueError('Queue capacity cannot be less than batch size')

self.io_loop = io_loop or channel.io_loop
self.io_loop = io_loop or self.sender.io_loop
if self.io_loop is None:
self.logger.error('Jaeger Reporter has no IOLoop')
else:
Expand Down Expand Up @@ -213,7 +211,7 @@ def _send(self, batch):
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught above in the exception handler of _submit().
"""
return self.agent.emitBatch(batch)
return self.sender.send(batch)

def close(self):
"""
Expand Down
7 changes: 4 additions & 3 deletions jaeger_client/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,10 @@ def __str__(self):

class RemoteControlledSampler(Sampler):
"""Periodically loads the sampling strategy from a remote server."""
def __init__(self, channel, service_name, **kwargs):
def __init__(self, io_loop, channel, service_name, **kwargs):
"""
:param channel: channel for communicating with jaeger-agent
:param io_loop: IOLOOP to use for handling async sampling
:param channel: channel for communicating with jaeger
:param service_name: name of this application
:param kwargs: optional parameters
- init_sampler: initial value of the sampler,
Expand Down Expand Up @@ -356,7 +357,7 @@ def __init__(self, channel, service_name, **kwargs):
self.running = True
self.periodic = None

self.io_loop = channel.io_loop
self.io_loop = io_loop
if not self.io_loop:
self.logger.error(
'Cannot acquire IOLoop, sampler will not be updated')
Expand Down
65 changes: 65 additions & 0 deletions jaeger_client/senders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright (c) 2016 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from .local_agent_net import LocalAgentSender

from jaeger_client.thrift_gen.agent import Agent


logger = logging.getLogger('jaeger_tracing')


class Sender(object):
def __init__(self, host, port, io_loop):
self.host = host
self.port = port
self.io_loop = io_loop

def send(self, batch):
raise NotImplementedError('This method should be implemented by subclasses')


class UDPSender(Sender):
def __init__(self, host, port, io_loop):
super(UDPSender, self).__init__(
host=host,
port=port,
io_loop=io_loop
)
self.channel = self._create_local_agent_channel()
self.agent = Agent.Client(self.channel, self)

def send(self, batch):
"""
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught above in the exception handler of _submit().
"""
return self.agent.emitBatch(batch)

def _create_local_agent_channel(self):
"""
Create an out-of-process channel communicating to local jaeger-agent.
Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled
via JSON HTTP.
:param self: instance of Config
"""
logger.info('Initializing Jaeger Tracer with UDP reporter')
return LocalAgentSender(
host=self.host,
reporting_port=self.port,
io_loop=self.io_loop
)
7 changes: 3 additions & 4 deletions tests/test_local_agent_net.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest
import tornado.web
from urllib.parse import urlparse
from jaeger_client.local_agent_net import LocalAgentSender
from jaeger_client.local_agent_net import LocalAgentSender, LocalAgentHTTP
from jaeger_client.config import DEFAULT_REPORTING_PORT

test_strategy = """
Expand Down Expand Up @@ -45,10 +45,9 @@ def app():
@pytest.mark.gen_test
def test_request_sampling_strategy(http_client, base_url):
o = urlparse(base_url)
sender = LocalAgentSender(
sender = LocalAgentHTTP(
host='localhost',
sampling_port=o.port,
reporting_port=DEFAULT_REPORTING_PORT
port=o.port,
)
response = yield sender.request_sampling_strategy(service_name='svc', timeout=15)
assert response.body == test_strategy.encode('utf-8')
2 changes: 1 addition & 1 deletion tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def _new_span(name):

@staticmethod
def _new_reporter(batch_size, flush=None, queue_cap=100):
reporter = Reporter(channel=mock.MagicMock(),
reporter = Reporter(sender=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=batch_size,
flush_interval=flush,
Expand Down
5 changes: 5 additions & 0 deletions tests/test_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def test_sampler_equality():

def test_remotely_controlled_sampler():
sampler = RemoteControlledSampler(
io_loop=mock.MagicMock(),
channel=mock.MagicMock(),
service_name='x'
)
Expand All @@ -316,6 +317,7 @@ def test_remotely_controlled_sampler():
channel = mock.MagicMock()
channel.io_loop = None
sampler = RemoteControlledSampler(
io_loop=mock.MagicMock(),
channel=channel,
service_name='x',
init_sampler=init_sampler,
Expand Down Expand Up @@ -344,6 +346,7 @@ def test_sampling_request_callback():
error_reporter = mock.MagicMock()
error_reporter.error = mock.MagicMock()
sampler = RemoteControlledSampler(
io_loop=mock.MagicMock(),
channel=channel,
service_name='x',
error_reporter=error_reporter,
Expand Down Expand Up @@ -503,6 +506,7 @@ def test_update_sampler(response, init_sampler, expected_sampler, err_count, err
error_reporter = mock.MagicMock()
error_reporter.error = mock.MagicMock()
remote_sampler = RemoteControlledSampler(
io_loop=mock.MagicMock(),
channel=mock.MagicMock(),
service_name='x',
error_reporter=error_reporter,
Expand All @@ -526,6 +530,7 @@ def test_update_sampler_adaptive_sampler():
error_reporter = mock.MagicMock()
error_reporter.error = mock.MagicMock()
remote_sampler = RemoteControlledSampler(
io_loop=mock.MagicMock(),
channel=mock.MagicMock(),
service_name='x',
error_reporter=error_reporter,
Expand Down

0 comments on commit f6eb07b

Please sign in to comment.