Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LogProcessors implementation #1916

Merged
merged 24 commits into from
Jul 13, 2021

Conversation

srikanthccv
Copy link
Member

@srikanthccv srikanthccv commented Jun 18, 2021

Description

Part-4 of #1890. This PR adds support for log processors implementation.

@srikanthccv srikanthccv marked this pull request as ready for review June 22, 2021 15:04
@srikanthccv srikanthccv requested a review from a team as a code owner June 22, 2021 15:04
"""

def __init__(self):
# use a tuple to avoid race conditions when adding a new log and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the tuple what avoids race conditions or is it the lock in add_log_processor? A tuple is an ordered data structure, is the order meaningful for log processors? If not, I suggest this be a set, instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's both tuple and the lock. We are trying to be consistent with span processors so the order is important here.

self._log_processors = self._log_processors + (log_processor,)

def emit(self, log_data: LogData) -> None:
for lp in self._log_processors:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a lock also necessary here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we wouldn't want to hold the lock until all processors complete emit. This just iterates over the log processor available at the time of emitting and returns.


def shutdown(self) -> None:
"""Shutdown the log processors one by one"""
for lp in self._log_processors:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

False otherwise.
"""
deadline_ns = _time_ns() + timeout_millis * 1000000
for lp in self._log_processors:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

@@ -112,6 +115,133 @@ def force_flush(self, timeout_millis: int = 30000):
"""


class SynchronousMultiLogProcessor(LogProcessor):
"""Implementation of class:`LogProcessor` that forwards all received
events to a list of log processors sequentially.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this class be named SequentialMultiLogProcessor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current name is more accurate as we are synchronously calling one log processor after other.

return False

for future in done_futures:
if not future.result():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any of the processors fails to flush within timeout this should return false.

@@ -51,3 +58,215 @@ def shutdown(self):

Called when the SDK is shut down.
"""


class SimpleLogProcessor(LogProcessor):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Simple descriptive for this kind of processor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from spec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to the tracing spec? Are we trying to be consistent with tracing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I am referring to logs spec(OTEP at this point) https://github.com/open-telemetry/oteps/blob/main/text/logs/0150-logging-library-sdk.md#logprocessor.

Built-in implementations: SimpleLogProcessor, BatchLogProcessor.

And yes the Logs SDK tries to be consistent with tracing as much as possible.

https://github.com/open-telemetry/oteps/blob/main/text/logs/0150-logging-library-sdk.md#motivation

The specification defines SDK elements that to some extent mirror the OpenTelemetry Trace SDK. This ensures uniformity and consistency of the OpenTelemetry specification and of the implementations across traces and logs. For additional clarity the definitions in this document refer to the Trace analogs where appropriate.

and instrumentation info.
"""
log_data = LogData(record, self._instrumentation_info)
self._multi_log_processor.emit(log_data)

def flush(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the LogEmitter have a flush method? Shouldn't the Provider be responsible for the processors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both provider and emitter can flush. This exists to complete the Appender/Handler like Interfaces/ABC. In our case, OTLPHandler has to implement two methods (emit and flush) after subclassing logging.Handler. Handler will have a access to Emitter (analogous to Tracer in instrumentation). So if .flush of the handler is called (by high-level call logging.shutdown() or any other manner) we make use of LogEmitter flush method to flush the logs. Does this explanation make it clear why there is a flush on the Emitter?

Spec https://github.com/open-telemetry/oteps/blob/main/text/logs/0150-logging-library-sdk.md#logemitter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use case makes sense. However, I'm wondering what would happen if multiple LogEmitters are created via Provider but all sharing the same multilogprocessor. If flush is called for a specific LogEmitter, would we flush the entire queue in the multilogprocessor, which might contain log records from different sources -> different LogEmitters correct? I don't think we have this problem in tracing because only the TracerProvider is allowed to flush. Thoughts? Unless I am missing something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, how could one avoid it and only flush the logs from one lib, except when we create a new pipeline for each source? (that doesn't sound like a good idea). On the other hand provider flush does the same thing just flush everything out. Do you see any potential problems? This is probably worth sharing with logs SIG and see what they have to say about it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we should probably bring it up. I'll be out this week so if you could bring it up during the SIG and relay what is discussed that would be great.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I don't think this should block the PR, maybe just put a TODO next to flush ?

self.num_log_records = 0


class BatchLogProcessor(LogProcessor):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I quite understand the logic flow. How do the handlers (that pick up logging messages from the Python logger) communicate with the Processor? Are handlers added to the root log handler?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are handlers added to the root log handler?

Correct. End-user will have to attach the OTLPHandler to the root logger.

How do the handlers (that pick up logging messages from the Python logger) communicate with the Processor

Every log statement from the logging lib will be propagated to the root logger. Users can attach any number of Handlers with a logger instance. They may already be using out-of-box handlers from stdlib. OTLPHandler is such which sends the log data in OTLP data model format to configured destination (could be n/w destination or file etc...).

The logic flow is Logger -> Handler -> LogEmitter -> LogProcessor -> LogExporter. Think of Handler as Instrumentation and LogEmitter as Tracer from tracing SDK (kind of inaccurate but helps in visualizing things). Does it make sense now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see, each handler SHOULD have a reference to a log emitter. We would expect users to possible create their own Handlers correct? Maybe it would be good to have another abstraction layer on top of logging.Handler specific for OpenTelemetry log handlers (similar to BaseInstrumentor). Thoughts?

Copy link
Member Author

@srikanthccv srikanthccv Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would expect users to possible create their own Handlers correct?

They don't need another handler for the same logging lib but they can create Handler like hooks for other third-party logging libs (as recommended by the same module for example structlog has concept called Processors equivalent to logging.Handler).

Maybe it would be good to have another abstraction layer on top of logging.Handler specific for OpenTelemetry log handlers (similar to BaseInstrumentor).

Yeah, thought about it but couldn't come up with anything meaningful. Do you have any rough idea in your head how you would like it to be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing big for now, but similarly to instrumentations, BaseInstrumentor was needed eventually as we found more commonalities and functions between instrumentations. If a custom Handler is created for OpenTelemetry, maybe we need to it have an emitter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also probably not blocking the PR, just something to note if we ever get confusion on HOW a handler should look like.

@srikanthccv
Copy link
Member Author

The failing tests are not related to this PR. The fix has been merged in this open-telemetry/opentelemetry-python-contrib#560 but if I update the SHA there will be more failures because logs branch is behind main. I think this can get in.

@lzchen
Copy link
Contributor

lzchen commented Jul 12, 2021

@lonewolf3739
There are a couple of things we can do:

  1. Remove instrumentation tests from tox build for logs branch
  2. Create a logs feature branch in contrib and run tests against that
  3. Keep logs branch up to date with main, fixing all tests along the way

What do you suggest?

@srikanthccv
Copy link
Member Author

The first one looks good to me. We can update the https://github.com/lonewolf3739/opentelemetry-python/blob/logs-processor-impl/.github/workflows/test.yml to not run contrib-bulid tests, what do you think?

@lzchen
Copy link
Contributor

lzchen commented Jul 12, 2021

@lonewolf3739
I'm leaning more towards (2). It is the same thing we are doing for metrics, it gives us a clear separation of the features as well we don't have to keep track of different testing matrices. If you agree then I can go ahead and create that branch for you in contrib.

@lzchen lzchen merged commit 196a19c into open-telemetry:logs Jul 13, 2021
@srikanthccv srikanthccv deleted the logs-processor-impl branch September 24, 2021 08:39
lzchen pushed a commit to lzchen/opentelemetry-python that referenced this pull request Oct 15, 2021
codeboten pushed a commit that referenced this pull request Oct 29, 2021
owais added a commit that referenced this pull request Nov 3, 2021
* Add initial overall structure and classes for logs sdk (#1894)

* Add global LogEmitterProvider and convenience function get_log_emitter (#1901)

* Add OTLPHandler for standard library logging module (#1903)

* Add LogProcessors implementation (#1916)

* Fix typos in test_handler.py (#1953)

* Add support for OTLP Log exporter (#1943)

* Add support for user defined attributes in OTLPHandler (#1952)

* use timeout in force_flush (#2118)

* use timeout in force_flush

* fix lint

* Update opentelemetry-sdk/src/opentelemetry/sdk/logs/export/__init__.py

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>

* fix lint

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>

* add a ConsoleExporter for logging (#2099)

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>

* Update SDK docs and Add example with OTEL collector logging (debug) exporter (#2050)

* Fix exception in severity number transformation (#2208)

* Fix exception with warning message transformation

* Fix lint

* Fix lint

* fstring

* Demonstrate how to set the Resource for LogEmitterProvider (#2209)

* Demonstrate how to set the Resource for LogEmitterProvider

Added a Resource to the logs example to make it more complete.
Previously it was using the built-in Resource. Now it adds the
service.name and service.instance.id attributes.

The resulting emitted log records look like this:
```
Resource labels:
     -> telemetry.sdk.language: STRING(python)
     -> telemetry.sdk.name: STRING(opentelemetry)
     -> telemetry.sdk.version: STRING(1.5.0)
     -> service.name: STRING(shoppingcart)
     -> service.instance.id: STRING(instance-12)
InstrumentationLibraryLogs #0
InstrumentationLibrary __main__ 0.1
LogRecord #0
Timestamp: 2021-10-14 18:33:43.425820928 +0000 UTC
Severity: ERROR
ShortName:
Body: Hyderabad, we have a major problem.
Trace ID: ce1577e4a703f42d569e72593ad71888
Span ID: f8908ac4258ceff6
Flags: 1
```

* Fix linting

* Use batch processor in example (#2225)

* move logs to _logs (#2240)

* move logs to _logs

* fix lint

* move log_exporter to _log_exporter as it's still experimental (#2252)

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com>
Co-authored-by: Leighton Chen <lechen@microsoft.com>
Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com>
Co-authored-by: Owais Lone <owais@users.noreply.github.com>
codeboten pushed a commit to codeboten/opentelemetry-python that referenced this pull request Nov 3, 2021
lzchen pushed a commit that referenced this pull request Nov 4, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants