Skip to content

Commit

Permalink
Add option zipkin-max-span-batch-size (#8075)
Browse files Browse the repository at this point in the history
### Problem
py_zipkin sends traces to Zipkin server in batches. Currently, the default value is set to 100 that can be not an optimal number for the number of spans in a trace for pants.

### Result
Adding the zipkin-max-span-batch-size option will allow finding the right batch size.
  • Loading branch information
cattibrie authored and illicitonion committed Aug 29, 2019
1 parent 382674b commit 8831c0c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
9 changes: 7 additions & 2 deletions src/python/pants/reporting/reporting.py
Expand Up @@ -59,7 +59,10 @@ def register_options(cls, register):
register('--zipkin-trace-v2', advanced=True, type=bool, default=False,
help='If enabled, the zipkin spans are tracked for v2 engine execution progress.')
register('--zipkin-service-name-prefix', advanced=True, default='pants',
help='The prefix for service name for Zipkin spans.')
help='The prefix for service name for Zipkin spans.')
register('--zipkin-max-span-batch-size', advanced=True, type=int, default=100,
help='Spans in a Zipkin trace are sent to the Zipkin server in batches.'
'zipkin-max-span-batch-size sets the max size of one batch.')

def initialize(self, run_tracker, all_options, start_time=None):
"""Initialize with the given RunTracker.
Expand Down Expand Up @@ -104,6 +107,7 @@ def initialize(self, run_tracker, all_options, start_time=None):
service_name_prefix = self.get_options().zipkin_service_name_prefix
if "{}" not in service_name_prefix:
service_name_prefix = service_name_prefix + "/{}"
max_span_batch_size = int(self.get_options().zipkin_max_span_batch_size)

if zipkin_endpoint is None and trace_id is not None and parent_id is not None:
raise ValueError(
Expand Down Expand Up @@ -132,7 +136,8 @@ def initialize(self, run_tracker, all_options, start_time=None):
if zipkin_endpoint is not None:
zipkin_reporter_settings = ZipkinReporter.Settings(log_level=Report.INFO)
zipkin_reporter = ZipkinReporter(
run_tracker, zipkin_reporter_settings, zipkin_endpoint, trace_id, parent_id, sample_rate, service_name_prefix
run_tracker, zipkin_reporter_settings, zipkin_endpoint, trace_id, parent_id, sample_rate,
service_name_prefix, max_span_batch_size
)
report.add_reporter('zipkin', zipkin_reporter)

Expand Down
13 changes: 10 additions & 3 deletions src/python/pants/reporting/zipkin_reporter.py
Expand Up @@ -39,7 +39,8 @@ class ZipkinReporter(Reporter):
Reporter that implements Zipkin tracing.
"""

def __init__(self, run_tracker, settings, endpoint, trace_id, parent_id, sample_rate, service_name_prefix):
def __init__(self, run_tracker, settings, endpoint, trace_id, parent_id, sample_rate,
service_name_prefix, max_span_batch_size):
"""
When trace_id and parent_id are set a Zipkin trace will be created with given trace_id
and parent_id. If trace_id and parent_id are set to None, a trace_id will be randomly
Expand All @@ -51,6 +52,10 @@ def __init__(self, run_tracker, settings, endpoint, trace_id, parent_id, sample_
:param string trace_id: The overall 64 or 128-bit ID of the trace. May be None.
:param string parent_id: The 64-bit ID for a parent span that invokes Pants. May be None.
:param float sample_rate: Rate at which to sample Zipkin traces. Value 0.0 - 100.0.
:param string service_name_prefix: Prefix for service name.
:param int max_span_batch_size: Spans in a trace are sent in batches,
max_span_batch_size defines max size of one batch.
"""
super().__init__(run_tracker, settings)
# Create a transport handler
Expand All @@ -62,6 +67,7 @@ def __init__(self, run_tracker, settings, endpoint, trace_id, parent_id, sample_
self.tracer = get_default_tracer()
self.run_tracker = run_tracker
self.service_name_prefix = service_name_prefix
self.max_span_batch_size = max_span_batch_size

def start_workunit(self, workunit):
"""Implementation of Reporter callback."""
Expand Down Expand Up @@ -105,11 +111,12 @@ def start_workunit(self, workunit):
self.parent_id = zipkin_attrs.span_id

span = local_tracer.zipkin_span(
service_name=self.service_name_prefix.format(service_name),
service_name=self.service_name_prefix.format("main"),
span_name=workunit.name,
transport_handler=self.handler,
encoding=Encoding.V1_THRIFT,
zipkin_attrs=zipkin_attrs,
max_span_batch_size=self.max_span_batch_size,
)
else:
# If start_workunit is called from a new thread local_tracer doesn't have zipkin attributes.
Expand All @@ -128,7 +135,7 @@ def start_workunit(self, workunit):
span.zipkin_attrs = span.zipkin_attrs._replace(
parent_span_id=workunit.parent.zipkin_span.zipkin_attrs.span_id
)
span.service_name = "pants background workunit"
span.service_name = self.service_name_prefix.format("background")

# Goals and tasks save their start time at the beginning of their run.
# This start time is passed to workunit, because the workunit may be created much later.
Expand Down

0 comments on commit 8831c0c

Please sign in to comment.