-
Notifications
You must be signed in to change notification settings - Fork 775
Description
Describe your environment
OS: Windows
Python version: Python 3.8.10
SDK version: (e.g., 1.25.0)
API version: (e.g., 1.25.0)
What happened?
I am using BatchLogRecordProcessor for emitting the logs to export them to OTLP collector.
I have observed that, the BatchLogRecordProcessor is dropping some logs resulting it into data loss.
Below is the observation -
def emit(self, log_data: LogData) -> None:
"""Adds the `LogData` to queue and notifies the waiting threads
when size of queue reaches max_export_batch_size.
"""
if self._shutdown:
return
if self._pid != os.getpid():
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
self._queue.appendleft(log_data)
if len(self._queue) >= self._max_export_batch_size:
with self._condition:
self._condition.notify()
In the above code, the line self._queue.appendleft(log_data) is dropping the oldest log_data when the queue is full.
I believe this is happening when the previous export of the batch is not yet complete and in the meanwhile queue also gets full, even when the below code
if len(self._queue) >= self._max_export_batch_size:
with self._condition:
self._condition.notify()
notifies the thread there is no new export happening as the locks are acquired by the previous export.
And in this process eventually the queue is full and for any new addition to the queue, as we are using appendLeft the new logs are added by discarding the oldest log to maintain the max queue size.
Steps to Reproduce
- Create a data array of size 10000 of any log data
- Register the BatchLogRecordProcessor to the LoggerProvider
logger_provider = LoggerProvider(resource=resource)
# Create an OTLPLogExporter
otlp_exporter = OTLPLogExporter(
endpoint=endpoint, insecure=False , timeout=20 # Set to False if you want to use TLS/SSL
)
processor = BatchLogRecordProcessor(otlp_exporter, max_batch_size=MAX_EVENTS_IN_BATCH, max_queue_size=MAX_QUEUE_SIZE)
# Add a BatchLogProcessor to the LoggerProvider
logger_provider.add_log_record_processor(processor)
set_logger_provider(logger_provider)
set MAX_EVENTS_IN_BATCH = 128 and MAX_QUEUE_SIZE = 2048
3. Use the logger.emit in a for loop
logger = logger_provider.get_logger(__name__)
for event in data:
logger.emit(event)
- Add some logging statements as below -
- Run the above python code
Expected Result
All 10000 logs should be exported.
As our batch size is 128, total 10000/128 = 78 complete batches and 1 last batch of 16 logs should be created to be exported.
So there should be 79 print statements of In export_batch
Actual Result
There are only around 35 print statements of In export_batch
The queue size remains constant at 2048 after emit is called more than 2048.
Further logs when logger.emit is invoked are dropped from the queue until the notified thread starts exporting another batch of 128 logs from the queue and making space for new logs.
Additional context
No response
Would you like to implement a fix?
No