Skip to content

Commit

Permalink
Remove lingering distro pkg files (open-telemetry#2241)
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanielRN authored and xingzhaozhu committed Nov 2, 2021
1 parent 2109633 commit 81eb354
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 67 deletions.
62 changes: 0 additions & 62 deletions opentelemetry-distro/setup.cfg

This file was deleted.

21 changes: 16 additions & 5 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,8 @@ def __init__(
self.queue = collections.deque(
[], max_queue_size
) # type: typing.Deque[Span]
self.worker_thread = threading.Thread(
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
)
# at on_end delay init worker thread
self.worker_thread = None
self.condition = threading.Condition(threading.Lock())
self._flush_request = None # type: typing.Optional[_FlushRequest]
self.schedule_delay_millis = schedule_delay_millis
Expand All @@ -196,14 +195,20 @@ def __init__(
self.spans_list = [
None
] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
self.worker_thread.start()

def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
) -> None:
pass

def on_end(self, span: ReadableSpan) -> None:
# delay init worker_thread
if self.worker_thread is None:
self.worker_thread = threading.Thread(
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
)
self.worker_thread.start()

if self.done:
logger.warning("Already shutdown, dropping span.")
return
Expand Down Expand Up @@ -366,6 +371,10 @@ def force_flush(self, timeout_millis: int = None) -> bool:
logger.warning("Already shutdown, ignoring call to force_flush().")
return True

if self.worker_thread is None:
logger.warning("worker thread not init, ignoring call to force_flush().")
return True

with self.condition:
flush_request = self._get_or_create_flush_request()
# signal the worker thread to flush and wait for it to finish
Expand All @@ -382,7 +391,9 @@ def shutdown(self) -> None:
self.done = True
with self.condition:
self.condition.notify_all()
self.worker_thread.join()
# worker_thread maybe not init
if self.worker_thread:
self.worker_thread.join()
self.span_exporter.shutdown()


Expand Down
15 changes: 15 additions & 0 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,21 @@ def test_on_start_accepts_parent_context(self):
span, parent_context=context
)

def test_work_thread_delay_init(self):
spans_names_list = []
my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.BatchSpanProcessor(my_exporter)
self.assertIsNone(span_processor.worker_thread, "worker thread at __init__() not init")
span_names = ["xxx", "bar", "foo"]

for name in span_names:
_create_start_and_end_span(name, span_processor)
self.assertIsNotNone(span_processor.worker_thread, "worker thread at on_end() init")

span_processor.shutdown()
self.assertTrue(my_exporter.is_shutdown)
self.assertListEqual(span_names, spans_names_list)

def test_shutdown(self):
spans_names_list = []

Expand Down

0 comments on commit 81eb354

Please sign in to comment.