Click [here]() to access the associated Medium article.

# Setup

In [1]:
!uv pip install loguru

[2mUsing Python 3.12.0 environment at /Users/mori/Desktop/portfolio/.venv3.12[0m
[2mAudited [1m1 package[0m [2min 10ms[0m[0m


# Getting Started with Loguru

In [2]:
from loguru import logger

# A simple log message
logger.info("Hello, Loguru!")

[32m2024-12-01 15:53:20.032[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m4[0m - [1mHello, Loguru![0m


In [3]:
import sys
from loguru import logger

# Remove the previous logger
logger.remove()

# Redirect standard output to Loguru
logger.add(sys.stdout, format="{time} {level} {message}", level="DEBUG")

# Now, even print statements go through Loguru
print("This is a standard print statement.")
logger.debug("And this is a debug log!")

This is a standard print statement.
2024-12-01T15:53:20.042745+0000 DEBUG And this is a debug log!


**NOTE:** Uncommenting the followings will write to the "app.log" file

In [4]:
# logger.remove()
# logger.add("app.log", rotation="1 MB", retention="10 days", compression="zip")

# logger.info("This log will be saved to 'app.log'!")

# Advanced Loguru Features

## Structured Logging with Context

In [5]:
logger.remove()
logger.add(sys.stderr, format="{extra[ip]} - {message}")

class Server:
    def __init__(self, ip):
        self.ip = ip
        self.logger = logger.bind(ip=ip)

    def call(self, message):
        self.logger.info(message)

instance_1 = Server("192.168.0.200")
instance_2 = Server("127.0.0.1")

instance_1.call("First instance")
instance_2.call("Second instance")

192.168.0.200 - First instance
127.0.0.1 - Second instance
127.0.0.1 - Second instance


In [6]:
from loguru import logger

logger.remove()
logger.add(sys.stderr, format="{extra[id]} - {message}")
logger = logger.bind(id=123)

logger.info("This is a log message with context.")
logger.debug("This is a debug message with context.")

123 - This is a log message with context.
123 - This is a debug message with context.
123 - This is a debug message with context.


## Logging to Files with Rotation and Retention

**NOTE:** Uncommenting the followings will write to the "logs/app.log" file

In [7]:
# logger.remove()
# logger.add(
#     "logs/app.log",
#     rotation="500 MB",    # Rotate after file reaches 500 MB
#     retention="2 weeks",  # Keep logs for 2 weeks
#     compression="zip",    # Compress old logs
# )

# logger.info("This will be written to a file with rotation and retention!")

## Customizing Log Formats

In [8]:
logger.remove()
logger.add(sys.stdout, format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}")

logger.warning("This is a custom log format!")



## Async Logging

**NOTE:** Uncommenting the followings will write to the "async.log" file

In [9]:
# from loguru import logger
# import asyncio

# async def main():
#     logger.remove()
#     logger.add("async.log")
#     await asyncio.gather(
#         asyncio.to_thread(logger.info, "Log from thread 1"),
#         asyncio.to_thread(logger.info, "Log from thread 2"),
#     )

# await main()

# Error Handling and Debugging with Loguru

## Logging Exceptions with Decorators

In [10]:
from loguru import logger

@logger.catch
def divide_numbers(a, b):
    return a / b

divide_numbers(10, 0)

2024-12-01 at 15:53:20 | ERROR | An error has been caught in function '<module>', process 'MainProcess' (71680), thread 'MainThread' (8559835200):
[33m[1mTraceback (most recent call last):[0m

  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/mori/Desktop/portfolio/.venv3.12/lib/python3.12/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
    │   └ <bound method Application.launch_instance of <class 'ipykernel.kernelapp.IPKernelApp'>>
    └ <module 'ipykernel.kernelapp' from '/Users/mori/Desktop/portfolio/.venv3.12/lib/python3.12/site-packages/ipykernel/kernelapp....
  File "/Users/mori/Desktop/portfolio/.venv3.12/lib/python3.12/site-packages/traitlets/config/application.py", line 1075, in launch_instance
    app.start()
    │   └ <function IPKernelApp.start at 0x107ebe700>
    └ <ipykernel.kernelapp.IPKernelApp object at 0x1049ef5f0>
  File "/Users/mori/Desktop/portfolio/.ve

## Catching Uncaught Exceptions Globally

In [11]:
from loguru import logger
import sys

logger.remove()

# Catch all uncaught exceptions
logger.add(sys.stderr, backtrace=True, diagnose=True)

def main():
    1 / 0  # Intentional error

main()

ZeroDivisionError: division by zero

## Adding Context to Exception Logs

In [12]:
from loguru import logger

logger.remove()
logger.add(sys.stderr, format="{extra[user_id]} - {message}")
logger = logger.bind(user_id=42)

@logger.catch
def process_data(data):
    if not data:
        raise ValueError("No data provided!")

logger.info("Starting data processing...")
process_data([])

42 - Starting data processing...
42 - An error has been caught in function '<module>', process 'MainProcess' (71680), thread 'MainThread' (8559835200):
[33m[1mTraceback (most recent call last):[0m

  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/mori/Desktop/portfolio/.venv3.12/lib/python3.12/site-packages/ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
    │   └ <bound method Application.launch_instance of <class 'ipykernel.kernelapp.IPKernelApp'>>
    └ <module 'ipykernel.kernelapp' from '/Users/mori/Desktop/portfolio/.venv3.12/lib/python3.12/site-packages/ipykernel/kernelapp....
  File "/Users/mori/Desktop/portfolio/.venv3.12/lib/python3.12/site-packages/traitlets/config/application.py", line 1075, in launch_instance
    app.start()
    │   └ <function IPKernelApp.start at 0x107ebe700>
    └ <ipykernel.kernelapp.IPKernelApp object at 0x1049ef5f0>
  File "/Users/mori/Desktop/portfoli

# Performance Considerations

## Asynchronous Logging for High Throughput

**NOTE:** Uncommenting the followings will write to the "async_performance.log" file

In [13]:
# from loguru import logger
# import nest_asyncio

# nest_asyncio.apply()

# logger.remove()

# # Add a log file for async logging
# logger.add("async_performance.log", enqueue=True)

# async def log_messages():
#     for i in range(10_000):
#         logger.info(f"Logging message {i}")

# await log_messages()

## Log Rotation to Manage Disk Usage

**NOTE:** Uncommenting the followings will write to the "performance_logs.log" file

In [14]:
# logger.remove()

# logger.add("performance_logs.log", rotation="100 MB")

## Fine-Grained Control Over Logging Levels

**NOTE:** Uncommenting the followings will write to the "filtered.log" file

In [15]:
# logger.remove()

# logger.add("filtered.log", level="WARNING")
# logger.debug("This debug message won't be logged.")
# logger.warning("This warning message will be logged.")

## Profiling Loguru's Impact

In [16]:
import time
from loguru import logger

start_time = time.time()
for _ in range(10_000):
    logger.info("Performance test log message.")
print(f"Logging completed in {time.time() - start_time:.2f} seconds.")

Logging completed in 0.00 seconds.


# Practical Tips for Using Loguru Effectively

## Use Contextual Logging for Traceability

In [17]:
from loguru import logger
import sys

def process_request(user_id, request_id):
    logger.remove()
    logger.add(
        sys.stderr,
        format="{time:YYYY-MM-DD at HH:mm:ss} | user_id={extra[user_id]}, request_id={extra[request_id]} - {message}",
    )
    bound_logger = logger.bind(user_id=user_id, request_id=request_id)
    
    bound_logger.info("Processing request...")
    # Your processing logic here
    bound_logger.info("Request completed.")


process_request(user_id=101, request_id="XYZ123")
process_request(user_id=102, request_id="ABC456")

2024-12-01 at 15:53:56 | user_id=101, request_id=XYZ123 - Processing request...
2024-12-01 at 15:53:56 | user_id=101, request_id=XYZ123 - Request completed.
2024-12-01 at 15:53:56 | user_id=102, request_id=ABC456 - Processing request...
2024-12-01 at 15:53:56 | user_id=102, request_id=ABC456 - Request completed.
2024-12-01 at 15:53:56 | user_id=101, request_id=XYZ123 - Request completed.
2024-12-01 at 15:53:56 | user_id=102, request_id=ABC456 - Processing request...
2024-12-01 at 15:53:56 | user_id=102, request_id=ABC456 - Request completed.


## Use Filters for Fine-Grained Control

In [18]:
from loguru import logger
import sys

def my_filter(record):
    return "user_id" in record["extra"] and record["extra"]["user_id"] == 42

logger.remove()
logger.add(
    sys.stderr,
    format="{time:YYYY-MM-DD at HH:mm:ss} | user_id={extra[user_id]} - {message}",
    filter=my_filter,
)
logger.bind(user_id=42).info("This will be logged.")
logger.bind(user_id=22).info("This won't be logged.")

2024-12-01 at 15:53:56 | user_id=42 - This will be logged.


## Secure Your Logs

In [19]:
from loguru import logger
import sys

logger.remove()
logger.add(sys.stderr)

email = "abcdef@ghi.com"
logger.info("User logged in with email: {}", email.replace(email[:5], "*****"))

[32m2024-12-01 15:53:56.162[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m8[0m - [1mUser logged in with email: *****f@ghi.com[0m


# Real-World Use Cases of Loguru

## Monitoring Data Pipelines

In [20]:
from loguru import logger

def extract():
    logger.info("Extracting data...")
    # Simulate data extraction
    return [1, 2, 3]

def transform(data):
    logger.info("Transforming data...")
    return [x * 2 for x in data]

def load(data):
    logger.info("Loading data...")
    if not data:
        raise ValueError("No data to load!")
    logger.info(f"Data loaded successfully: {data}")

try:
    data = extract()
    transformed_data = transform(data)
    load(transformed_data)
except Exception:
    logger.exception("An error occurred in the ETL pipeline.")

[32m2024-12-01 15:53:56.176[0m | [1mINFO    [0m | [36m__main__[0m:[36mextract[0m:[36m4[0m - [1mExtracting data...[0m
[32m2024-12-01 15:53:56.176[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransform[0m:[36m9[0m - [1mTransforming data...[0m
[32m2024-12-01 15:53:56.177[0m | [1mINFO    [0m | [36m__main__[0m:[36mload[0m:[36m13[0m - [1mLoading data...[0m
[32m2024-12-01 15:53:56.177[0m | [1mINFO    [0m | [36m__main__[0m:[36mload[0m:[36m16[0m - [1mData loaded successfully: [2, 4, 6][0m
[32m2024-12-01 15:53:56.176[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransform[0m:[36m9[0m - [1mTransforming data...[0m
[32m2024-12-01 15:53:56.177[0m | [1mINFO    [0m | [36m__main__[0m:[36mload[0m:[36m13[0m - [1mLoading data...[0m
[32m2024-12-01 15:53:56.177[0m | [1mINFO    [0m | [36m__main__[0m:[36mload[0m:[36m16[0m - [1mData loaded successfully: [2, 4, 6][0m


## Handling Distributed Systems

**NOTE:** Uncommenting the followings will write to the "distributed_logs.json" file

In [21]:
# from loguru import logger

# logger.remove()
# logger.add(
#     "distributed_logs.json",
#     format="{time:MMMM D, YYYY > HH:mm:ss!UTC} | {level} | {message}",
#     serialize=True,
# )
# logger.debug("Happy logging with Loguru!")