In [1]:
import logging
import time

from celery import shared_task

from asynclog.handler import AsyncLogHandler

In [2]:
def write_record(record):
    time.sleep(1)
    
class SyncLogHandler(logging.Handler):
    def emit(self, record):
        time.sleep(1)

### AsyncLogHandler (use thread)

In [3]:
async_logger = logging.getLogger('Async Logger')
async_logger.setLevel(logging.INFO)

In [4]:
async_handler = AsyncLogHandler(write_record)
async_handler.setLevel(logging.INFO)
async_logger.addHandler(async_handler)

In [5]:
async_logger.info('Test log')

In [6]:
%timeit async_logger.info('Test log')

49.3 µs ± 9 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


### SyncLogHandler

In [7]:
sync_logger = logging.getLogger('Sync Logger')
sync_logger.setLevel(logging.INFO)

sync_handler = SyncLogHandler()
sync_handler.setLevel(logging.INFO)
sync_logger.addHandler(sync_handler)

In [8]:
sync_logger.info('Test log')

In [9]:
%timeit sync_logger.info('Test log')

1 s ± 1.96 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### AsyncLogHandler (use celery)

In [14]:
from unittest import mock

In [15]:
from tests.test_handler import app, write_task, has_celery

celery_logger = logging.getLogger('Celery logger')
celery_logger.setLevel(logging.INFO)

if not has_celery:
    write_task.delay = mock.MagicMock()
celery_handler = AsyncLogHandler(write_task, use_thread=False, use_celery=True)
celery_handler.setLevel(logging.INFO)
celery_logger.addHandler(celery_handler)

In [16]:
celery_logger.info('Test log')

In [17]:
%timeit celery_logger.info('Test log')

1.58 ms ± 74.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
