Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Commit

Permalink
Merge ceae3df into 2e0b5bd
Browse files Browse the repository at this point in the history
  • Loading branch information
isaachier committed Jun 25, 2018
2 parents 2e0b5bd + ceae3df commit e915e2d
Show file tree
Hide file tree
Showing 17 changed files with 590 additions and 60 deletions.
46 changes: 38 additions & 8 deletions crossdock/server/endtoend.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2016 Uber Technologies, Inc.
# Copyright (c) 2016-2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
# limitations under the License.

import tornado.web
import logging
import json
import os

Expand All @@ -28,6 +29,7 @@
)
from jaeger_client.sampler import RemoteControlledSampler, ConstSampler
from jaeger_client.reporter import Reporter
from jaeger_client.throttler import RemoteThrottler
from jaeger_client.tracer import Tracer

config = {
Expand Down Expand Up @@ -65,25 +67,28 @@ def __init__(self):
init_sampler = cfg.sampler
channel = self.local_agent_sender

reporter = Reporter(
channel=channel,
flush_interval=cfg.reporter_flush_interval)
reporter = Reporter(channel=channel,
flush_interval=cfg.reporter_flush_interval)

remote_sampler = RemoteControlledSampler(
channel=channel,
service_name=cfg.service_name,
sampling_refresh_interval=cfg.sampling_refresh_interval,
init_sampler=init_sampler)

throttler = RemoteThrottler(channel, cfg.service_name)

remote_tracer = Tracer(
service_name=cfg.service_name,
reporter=reporter,
sampler=remote_sampler)
sampler=remote_sampler,
throttler=throttler)

const_tracer = Tracer(
service_name=cfg.service_name,
reporter=reporter,
sampler=ConstSampler(decision=True)
sampler=ConstSampler(decision=True),
throttler=throttler
)

self._tracers = {
Expand All @@ -101,10 +106,12 @@ def tracers(self, tracers):

@property
def local_agent_sender(self):
host, port = _determine_host_port()
return LocalAgentSender(
host=os.getenv('AGENT_HOST', 'jaeger-agent'),
host=host,
sampling_port=DEFAULT_SAMPLING_PORT,
reporting_port=DEFAULT_REPORTING_PORT,
reporting_port=port,
throttling_port=DEFAULT_SAMPLING_PORT,
)

@tornado.gen.coroutine
Expand All @@ -118,3 +125,26 @@ def generate_traces(self, request, response_writer):
span.set_tag(k, v)
span.finish()
response_writer.finish()


def _determine_host_port():
host_port = os.environ.get('AGENT_HOST_PORT', None)
if host_port:
host, port = _parse_host_port(host_port,
'jaeger-agent',
DEFAULT_REPORTING_PORT)
else:
host, port = 'jaeger-agent', DEFAULT_REPORTING_PORT
return host, port


def _parse_host_port(host_port, default_host, default_port):
try:
host, port_str = host_port.split(':')
port = int(port_str)
return host, port
except ValueError:
logging.getLogger().error(
'Invalid host port (%s), using default host port (%s:%d)',
host_port, default_host, default_port)
return default_host, default_port
56 changes: 52 additions & 4 deletions jaeger_client/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2016 Uber Technologies, Inc.
# Copyright (c) 2016-2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
from opentracing.propagation import Format
from . import Tracer
from .local_agent_net import LocalAgentSender
from .throttler import RemoteThrottler
from .reporter import (
Reporter,
CompositeReporter,
Expand All @@ -43,6 +44,7 @@
BAGGAGE_HEADER_PREFIX,
DEBUG_ID_HEADER_KEY,
MAX_TAG_VALUE_LENGTH,
DEFAULT_THROTTLER_REFRESH_INTERVAL,
)
from .metrics import LegacyMetricsFactory, MetricsFactory, Metrics
from .utils import get_boolean, ErrorReporter
Expand All @@ -51,6 +53,7 @@
DEFAULT_REPORTING_HOST = 'localhost'
DEFAULT_REPORTING_PORT = 6831
DEFAULT_SAMPLING_PORT = 5778
DEFAULT_THROTTLER_PORT = DEFAULT_SAMPLING_PORT
LOCAL_AGENT_DEFAULT_ENABLED = True

logger = logging.getLogger('jaeger_tracing')
Expand Down Expand Up @@ -119,7 +122,8 @@ def _validate_config(self, config):
'sampling_refresh_interval',
'trace_id_header',
'baggage_header_prefix',
'service_name']
'service_name',
'throttler']
config_keys = config.keys()
unexpected_config_keys = [k for k in config_keys if k not in allowed_keys]
if unexpected_config_keys:
Expand Down Expand Up @@ -242,7 +246,12 @@ def local_agent_reporting_port(self):
try:
return int(self.local_agent_group()['reporting_port'])
except:
return DEFAULT_REPORTING_PORT
pass

try:
return int(os.getenv('JAEGER_AGENT_PORT'))
except:
return DEFAULT_SAMPLING_PORT

@property
def local_agent_reporting_host(self):
Expand Down Expand Up @@ -278,6 +287,31 @@ def propagation(self):
return {Format.HTTP_HEADERS: B3Codec()}
return {}

def throttler_group(self):
return self.config.get('throttler', None)

@property
def throttler_port(self):
throttler_config = self.throttler_group()
if throttler_config is None:
return None
# noinspection PyBroadException
try:
return int(throttler_config['port'])
except:
return DEFAULT_THROTTLER_PORT

@property
def throttler_refresh_interval(self):
throttler_config = self.throttler_group()
if throttler_config is None:
return None
# noinspection PyBroadException
try:
return int(throttler_config['refresh_interval'])
except:
return DEFAULT_THROTTLER_REFRESH_INTERVAL

@staticmethod
def initialized():
with Config._initialized_lock:
Expand Down Expand Up @@ -331,12 +365,24 @@ def new_tracer(self, io_loop=None):
if self.logging:
reporter = CompositeReporter(reporter, LoggingReporter(logger))

if not self.throttler_group() is None:
throttler = RemoteThrottler(
channel,
self.service_name,
refresh_interval=self.throttler_refresh_interval,
logger=logger,
metrics_factory=self._metrics_factory,
error_reporter=self.error_reporter)
else:
throttler = None

return self.create_tracer(
reporter=reporter,
sampler=sampler,
throttler=throttler,
)

def create_tracer(self, reporter, sampler):
def create_tracer(self, reporter, sampler, throttler=None):
return Tracer(
service_name=self.service_name,
reporter=reporter,
Expand All @@ -348,6 +394,7 @@ def create_tracer(self, reporter, sampler):
tags=self.tags,
max_tag_value_length=self.max_tag_value_length,
extra_codecs=self.propagation,
throttler=throttler,
)

def _initialize_global_tracer(self, tracer):
Expand All @@ -368,5 +415,6 @@ def _create_local_agent_channel(self, io_loop):
host=self.local_agent_reporting_host,
sampling_port=self.local_agent_sampling_port,
reporting_port=self.local_agent_reporting_port,
throttling_port=self.throttler_port,
io_loop=io_loop
)
10 changes: 8 additions & 2 deletions jaeger_client/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2016 Uber Technologies, Inc.
# Copyright (c) 2016-2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
# Max number of bits to use when generating random ID
MAX_ID_BITS = 64

# How often remotely controller sampler polls for sampling strategy
# How often remotely controlled sampler polls for sampling strategy
DEFAULT_SAMPLING_INTERVAL = 60

# How often remote reporter does a preemptive flush of its buffers
Expand Down Expand Up @@ -69,6 +69,9 @@
# noinspection SpellCheckingInspection
SAMPLER_TYPE_LOWER_BOUND = 'lowerbound'

# Tag key for unique client identifier. Used in throttler implementation.
CLIENT_UUID_TAG_KEY = 'client-uuid'

# max length for tag values. Longer values will be truncated.
MAX_TAG_VALUE_LENGTH = 1024

Expand All @@ -77,3 +80,6 @@

# Constant for debug flag
DEBUG_FLAG = 0x02

# How often throttler polls for credits
DEFAULT_THROTTLER_REFRESH_INTERVAL = 5
47 changes: 34 additions & 13 deletions jaeger_client/local_agent_net.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2016 Uber Technologies, Inc.
# Copyright (c) 2016-2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -25,19 +25,33 @@

class LocalAgentHTTP(object):

DEFAULT_TIMEOUT = 15

def __init__(self, host, port):
self.agent_http_host = host
self.agent_http_port = int(port)

def request_sampling_strategy(self, service_name, timeout):
def _request(self, path, timeout=DEFAULT_TIMEOUT, args=None):
http_client = tornado.httpclient.AsyncHTTPClient(
defaults=dict(request_timeout=timeout))
# Properly url encode the params
url = url_concat(
'http://%s:%d/sampling' % (self.agent_http_host, self.agent_http_port),
[('service', service_name)])
url = 'http://%s:%d/%s' % (self.agent_http_host, self.agent_http_port, path)
if args:
url = url_concat(url, args)
return http_client.fetch(url)

def request_sampling_strategy(self, service_name, timeout=DEFAULT_TIMEOUT):
return self._request('sampling', timeout=timeout, args={'service': service_name})

def request_throttling_credits(self,
service_name,
client_id,
operations,
timeout=DEFAULT_TIMEOUT):
return self._request('credits', timeout=timeout, args=[
('service', service_name),
('uuid', client_id),
] + [('operations', op) for op in operations])


class LocalAgentSender(TBufferedTransport):
"""
Expand All @@ -51,15 +65,19 @@ class LocalAgentSender(TBufferedTransport):
end of the batch span submission call.
"""

def __init__(self, host, sampling_port, reporting_port, io_loop=None):
def __init__(self, host, sampling_port, reporting_port, io_loop=None, throttling_port=None):
# IOLoop
self._thread_loop = None
self.io_loop = io_loop or self._create_new_thread_loop()

# http sampling
# HTTP sampling
self.local_agent_http = LocalAgentHTTP(host, sampling_port)

# udp reporting - this will only get written to after our flush() call.
# HTTP throttling
if throttling_port:
self.throttling_http = LocalAgentHTTP(host, throttling_port)

# UDP reporting - this will only get written to after our flush() call.
# We are buffering things up because we are a TBufferedTransport.
udp = TUDPTransport(host, reporting_port)
TBufferedTransport.__init__(self, udp)
Expand All @@ -78,7 +96,10 @@ def readFrame(self):
"""Empty read frame that is never ready"""
return Future()

# Pass-through for the http
def request_sampling_strategy(self, service_name, timeout):
return self.local_agent_http.request_sampling_strategy(
service_name, timeout)
# Pass-through for HTTP sampling strategies request.
def request_sampling_strategy(self, *args, **kwargs):
return self.local_agent_http.request_sampling_strategy(*args, **kwargs)

# Pass-through for HTTP throttling credit request.
def request_throttling_credits(self, *args, **kwargs):
return self.throttling_http.request_throttling_credits(*args, **kwargs)

0 comments on commit e915e2d

Please sign in to comment.