Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Preliminary refactor for supporting spans over HTTP #186

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion jaeger_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def new_tracer(self, io_loop=None):
Create a new Jaeger Tracer based on the passed `jaeger_client.Config`.
Does not set `opentracing.tracer` global variable.
"""

channel = self._create_local_agent_channel(io_loop=io_loop)
sampler = self.sampler
if not sampler:
Expand All @@ -360,7 +361,8 @@ def new_tracer(self, io_loop=None):
flush_interval=self.reporter_flush_interval,
logger=logger,
metrics_factory=self._metrics_factory,
error_reporter=self.error_reporter)
error_reporter=self.error_reporter
)

if self.logging:
reporter = CompositeReporter(reporter, LoggingReporter(logger))
Expand Down
2 changes: 2 additions & 0 deletions jaeger_client/local_agent_net.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling
# IOLoop
self._thread_loop = None
self.io_loop = io_loop or self._create_new_thread_loop()
self.reporting_port = reporting_port
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that the sole purpose for saving these is for the initializing the UDP sender, I'd rather make these private. We're going to nuke this class in the future anyway, let's not increase the public API if we don't have to.

self.host = host

# HTTP sampling
self.local_agent_http = LocalAgentHTTP(host, sampling_port)
Expand Down
101 changes: 41 additions & 60 deletions jaeger_client/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
import tornado.gen
import tornado.ioloop
import tornado.queues
import socket
from tornado.concurrent import Future
from .constants import DEFAULT_FLUSH_INTERVAL
from . import thrift
from . import ioloop_util
from .metrics import Metrics, LegacyMetricsFactory
from .senders import UDPSender
from .utils import ErrorReporter

from thrift.protocol import TCompactProtocol
from jaeger_client.thrift_gen.agent import Agent

default_logger = logging.getLogger('jaeger_tracing')

Expand Down Expand Up @@ -78,9 +75,11 @@ class Reporter(NullReporter):
def __init__(self, channel, queue_capacity=100, batch_size=10,
flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None,
error_reporter=None, metrics=None, metrics_factory=None,
**kwargs):
sender=None, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would making sender a kwargs be the safest bet? Won't this still run into the issue where sender might be incorrectly interpreted as kwargs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sender=None, **kwargs is good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@black-adder I think that putting sender in kwargs will hid it more than necessary, without any benefit.
Honestly, if a developer just writes down 8/9 positional arguments, all without naming (aka kwarg), he actually deserves some kind of error 😆

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

"""
:param channel: a communication channel to jaeger-agent
:param sender: senders.Sender subclass implementing send method,
for sending batch of spans to jaeger.
:param queue_capacity: how many spans we can hold in memory before
starting to drop spans
:param batch_size: how many spans we can submit at once to Collector
Expand All @@ -97,19 +96,20 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
"""
from threading import Lock

self._channel = channel
# TODO for next major rev: remove channel param in favor of sender
self._sender = sender or self._create_default_sender(channel)
self.queue_capacity = queue_capacity
self.batch_size = batch_size
self.metrics_factory = metrics_factory or LegacyMetricsFactory(metrics or Metrics())
self.metrics = ReporterMetrics(self.metrics_factory)
self.error_reporter = error_reporter or ErrorReporter(Metrics())
self.logger = kwargs.get('logger', default_logger)
self.agent = Agent.Client(self._channel, self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is technically a breaking change since it was already public before


if queue_capacity < batch_size:
raise ValueError('Queue capacity cannot be less than batch size')

self.io_loop = io_loop or channel.io_loop
self.io_loop = io_loop or self.fetch_io_loop(channel, self._sender)

if self.io_loop is None:
self.logger.error('Jaeger Reporter has no IOLoop')
else:
Expand All @@ -120,14 +120,24 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
self.flush_interval = flush_interval or None
self.io_loop.spawn_callback(self._consume_queue)

self._process_lock = Lock()
self._process = None
@staticmethod
def fetch_io_loop(channel, sender):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be private?

if channel:
return channel.io_loop
elif sender:
return sender.io_loop
return None

def _create_default_sender(self, channel):
sender = UDPSender(
port=channel.reporting_port,
host=channel.host,
io_loop=channel.io_loop
)
return sender

def set_process(self, service_name, tags, max_length):
with self._process_lock:
self._process = thrift.make_process(
service_name=service_name, tags=tags, max_length=max_length,
)
self._sender.set_process(service_name, tags, max_length)

def report_span(self, span):
# We should not be calling `queue.put_nowait()` from random threads,
Expand All @@ -150,14 +160,14 @@ def _report_span_from_ioloop(self, span):

@tornado.gen.coroutine
def _consume_queue(self):
spans = []
stopped = False

while not stopped:
while len(spans) < self.batch_size:
while self._sender.span_count < self.batch_size:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as Yuri mentioned, this is fine for now but ideally the reporter only appends to the sender and both the http sender and the udp sender maintains it's own batch size.

try:
# using timeout allows periodic flush with smaller packet
timeout = self.flush_interval + self.io_loop.time() \
if self.flush_interval and spans else None
if self.flush_interval and self._sender.span_count else None
span = yield self.queue.get(timeout=timeout)
except tornado.gen.TimeoutError:
break
Expand All @@ -168,52 +178,23 @@ def _consume_queue(self):
# don't return yet, submit accumulated spans first
break
else:
spans.append(span)
if spans:
yield self._submit(spans)
for _ in spans:
self.queue.task_done()
spans = spans[:0]
self.metrics.reporter_queue_length(self.queue.qsize())
self.logger.info('Span publisher exited')
self._sender.append(span)

# method for protocol factory
def getProtocol(self, transport):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

breaking change

"""
Implements Thrift ProtocolFactory interface
:param: transport:
:return: Thrift compact protocol
"""
return TCompactProtocol.TCompactProtocol(transport)
if self._sender.span_count:
num_spans = self._sender.span_count
try:
yield self._sender.flush()
except Exception as exc:
self.metrics.reporter_failure(num_spans)
self.error_reporter.error(exc)
else:
self.metrics.reporter_success(num_spans)

@tornado.gen.coroutine
def _submit(self, spans):
if not spans:
return
with self._process_lock:
process = self._process
if not process:
return
try:
batch = thrift.make_jaeger_batch(spans=spans, process=process)
yield self._send(batch)
self.metrics.reporter_success(len(spans))
except socket.error as e:
self.metrics.reporter_failure(len(spans))
self.error_reporter.error(
'Failed to submit traces to jaeger-agent socket: %s', e)
except Exception as e:
self.metrics.reporter_failure(len(spans))
self.error_reporter.error(
'Failed to submit traces to jaeger-agent: %s', e)
for _ in range(num_spans):
self.queue.task_done()

@tornado.gen.coroutine
def _send(self, batch):
"""
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught above in the exception handler of _submit().
"""
return self.agent.emitBatch(batch)
self.metrics.reporter_queue_length(self.queue.qsize())
self.logger.info('Span publisher exited')

def close(self):
"""
Expand Down
136 changes: 136 additions & 0 deletions jaeger_client/senders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright (c) 2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import

import sys
import socket
import logging
import tornado.gen
from six import reraise
from threadloop import ThreadLoop

from . import thrift
from .local_agent_net import LocalAgentSender
from thrift.protocol import TCompactProtocol

from jaeger_client.thrift_gen.agent import Agent


DEFAULT_SAMPLING_PORT = 5778

logger = logging.getLogger('jaeger_tracing')


class Sender(object):
def __init__(self, io_loop=None):
from threading import Lock
self.io_loop = io_loop or self._create_new_thread_loop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these variables be private?

self._process_lock = Lock()
self._process = None
self.spans = []

def append(self, span):
"""Queue a span for subsequent submission calls to flush()"""
self.spans.append(span)

@property
def span_count(self):
return len(self.spans)

@tornado.gen.coroutine
def flush(self):
"""Examine span and process state before yielding to _flush() for batching and transport."""
if self.spans:
with self._process_lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for another PR, but the locking here seems a little weird looking at it now. I wonder why we only lock on the process but skip locking on the spans which could be flushed and emptied in another thread.

process = self._process
if process:
try:
yield self._flush(self.spans, self._process)
finally:
self.spans = []

@tornado.gen.coroutine
def _flush(self, spans, process):
"""Batch spans and invokes send(). Override with specific batching logic, if desired."""
batch = thrift.make_jaeger_batch(spans=spans, process=process)
yield self.send(batch)

@tornado.gen.coroutine
def send(self, batch):
raise NotImplementedError('This method should be implemented by subclasses')

def set_process(self, service_name, tags, max_length):
with self._process_lock:
self._process = thrift.make_process(
service_name=service_name, tags=tags, max_length=max_length,
)

def _create_new_thread_loop(self):
"""
Create a daemonized thread that will run Tornado IOLoop.
:return: the IOLoop backed by the new thread.
"""
self._thread_loop = ThreadLoop()
if not self._thread_loop.is_ready():
self._thread_loop.start()
return self._thread_loop._io_loop


class UDPSender(Sender):
def __init__(self, host, port, io_loop=None):
super(UDPSender, self).__init__(io_loop=io_loop)
self.host = host
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make all these variables private?

self.port = port
self.channel = self._create_local_agent_channel(self.io_loop)
self.agent = Agent.Client(self.channel, self)

@tornado.gen.coroutine
def send(self, batch):
"""
Send batch of spans out via thrift transport.
"""
try:
yield self.agent.emitBatch(batch)
except socket.error as e:
reraise(type(e),
type(e)('Failed to submit traces to jaeger-agent socket: {}'.format(e)),
sys.exc_info()[2])
except Exception as e:
reraise(type(e), type(e)('Failed to submit traces to jaeger-agent: {}'.format(e)),
sys.exc_info()[2])

def _create_local_agent_channel(self, io_loop):
"""
Create an out-of-process channel communicating to local jaeger-agent.
Spans are submitted as SOCK_DGRAM Thrift, sampling strategy is polled
via JSON HTTP.

:param self: instance of Config
"""
logger.info('Initializing Jaeger Tracer with UDP reporter')
return LocalAgentSender(
host=self.host,
sampling_port=DEFAULT_SAMPLING_PORT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we're only using the LocalAgentSender here as a means to send spans and not using any of the HTTP functionality, let's not make DEFAULT_SAMPLING_PORT a global and just hard code here for now.

reporting_port=self.port,
io_loop=io_loop
)

# method for protocol factory
def getProtocol(self, transport):
"""
Implements Thrift ProtocolFactory interface
:param: transport:
:return: Thrift compact protocol
"""
return TCompactProtocol.TCompactProtocol(transport)
Loading