Official Python SDK for LogTide — self-hosted log management with async client, logging integration, batching, retry, circuit breaker, and middleware.
- Sync & async clients —
LogTideClient(requests) andAsyncLogTideClient(aiohttp) - stdlib
loggingintegration — drop-inLogTideHandlerfor existing logging setups - Automatic batching with configurable size and interval
- Retry logic with exponential backoff
- Circuit breaker pattern for fault tolerance
- Payload limits — field truncation, base64 removal, field exclusion, max entry size
- Max buffer size with silent drop policy to prevent memory leaks
- Query API for searching and filtering logs
- Live tail with Server-Sent Events (SSE)
- Trace ID context for distributed tracing
- Global metadata added to all logs
- Structured exception serialization with parsed stack frames
- Internal metrics (logs sent, errors, latency, circuit breaker trips)
- Flask, Django, FastAPI & Starlette middleware for auto-logging HTTP requests
- Full Python 3.8+ support with type hints
- Python 3.8 or higher
pip install logtide-sdk# Async client (AsyncLogTideClient)
pip install logtide-sdk[async]
# Flask middleware
pip install logtide-sdk[flask]
# Django middleware
pip install logtide-sdk[django]
# FastAPI middleware
pip install logtide-sdk[fastapi]
# Starlette middleware (standalone, without FastAPI)
pip install logtide-sdk[starlette]
# Install all extras
pip install logtide-sdk[async,flask,django,fastapi,starlette]from logtide_sdk import LogTideClient, ClientOptions
client = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)
)
client.info('api-gateway', 'Server started', {'port': 3000})
client.error('database', 'Connection failed', Exception('Timeout'))
# Graceful shutdown (also registered automatically via atexit)
client.close()| Option | Type | Default | Description |
|---|---|---|---|
api_url |
str |
required | Base URL of your LogTide instance |
api_key |
str |
required | Project API key (starts with lp_) |
batch_size |
int |
100 |
Logs per batch before an immediate flush |
flush_interval |
int |
5000 |
Auto-flush interval in ms |
| Option | Type | Default | Description |
|---|---|---|---|
max_buffer_size |
int |
10000 |
Max buffered logs; excess are silently dropped |
max_retries |
int |
3 |
Max retry attempts on send failure |
retry_delay_ms |
int |
1000 |
Initial retry delay (doubles each attempt) |
circuit_breaker_threshold |
int |
5 |
Consecutive failures before opening circuit |
circuit_breaker_reset_ms |
int |
30000 |
Time before testing a half-open circuit |
debug |
bool |
False |
Print debug output to console |
global_metadata |
dict |
{} |
Metadata merged into every log entry |
auto_trace_id |
bool |
False |
Auto-generate a UUID trace ID per log |
payload_limits |
PayloadLimitsOptions |
see below | Safeguards against oversized payloads |
PayloadLimitsOptions prevents 413 errors from oversized entries.
| Field | Default | Description |
|---|---|---|
max_field_size |
10 * 1024 (10 KB) |
Max length of any single string field |
max_log_size |
100 * 1024 (100 KB) |
Max total serialized entry size |
exclude_fields |
[] |
Field names replaced with "[EXCLUDED]" |
truncation_marker |
"...[TRUNCATED]" |
Appended to truncated strings |
from logtide_sdk import LogTideClient, ClientOptions, PayloadLimitsOptions
client = LogTideClient(
ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
payload_limits=PayloadLimitsOptions(
max_field_size=5 * 1024,
exclude_fields=['password', 'token'],
),
)
)Base64-encoded strings (data URIs or long base64 blobs) are automatically replaced with "[BASE64 DATA REMOVED]".
client.debug('service', 'Debug message')
client.info('service', 'Info message', {'userId': 123})
client.warn('service', 'Warning message')
client.error('service', 'Error message', {'custom': 'data'})
client.critical('service', 'Critical message')Pass an Exception directly to error() or critical() — it is serialized automatically:
try:
raise RuntimeError('Database timeout')
except Exception as e:
client.error('database', 'Query failed', e)Generated metadata:
{
"exception": {
"type": "RuntimeError",
"message": "Database timeout",
"language": "python",
"stacktrace": [
{"file": "app.py", "function": "run_query", "line": 42}
],
"raw": "Traceback (most recent call last):\n ..."
}
}AsyncLogTideClient is the async equivalent, using aiohttp. Best used as an async context manager.
pip install logtide-sdk[async]import asyncio
from logtide_sdk import AsyncLogTideClient, ClientOptions
async def main():
async with AsyncLogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
)) as client:
await client.info('my-service', 'Hello from async!')
await client.error('my-service', 'Something failed', Exception('oops'))
asyncio.run(main())Manual lifecycle (without context manager):
client = AsyncLogTideClient(options)
await client.start() # starts background flush loop
try:
await client.info('svc', 'message')
finally:
await client.close()All sync logging, query, stream, and metrics methods have async equivalents.
LogTideHandler is a standard logging.Handler — drop it into any existing logging setup.
import logging
from logtide_sdk import LogTideClient, ClientOptions, LogTideHandler
client = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
))
handler = LogTideHandler(client=client, service='my-service')
handler.setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
# These are forwarded to LogTide automatically
logger.warning('Low disk space')
logger.error('Unhandled exception', exc_info=True)Exception info is serialized with full structured stack frames when exc_info=True is used.
client.set_trace_id('request-123')
client.info('api', 'Request received')
client.info('db', 'Querying users')
client.info('api', 'Response sent')
client.set_trace_id(None) # clearwith client.with_trace_id('request-456'):
client.info('api', 'Processing in context')
client.warn('cache', 'Cache miss')
# Trace ID automatically restored after blockwith client.with_new_trace_id():
client.info('worker', 'Background job started')
client.info('worker', 'Job completed')from datetime import datetime, timedelta
from logtide_sdk import QueryOptions, LogLevel
result = client.query(
QueryOptions(
service='api-gateway',
level=LogLevel.ERROR,
from_time=datetime.now() - timedelta(hours=24),
to_time=datetime.now(),
limit=100,
offset=0,
)
)
print(f"Found {result.total} logs")
for log in result.logs:
print(log)result = client.query(QueryOptions(q='timeout', limit=50))logs = client.get_by_trace_id('trace-123')from logtide_sdk import AggregatedStatsOptions
stats = client.get_aggregated_stats(
AggregatedStatsOptions(
from_time=datetime.now() - timedelta(days=7),
to_time=datetime.now(),
interval='1h',
)
)
for service in stats.top_services:
print(f"{service['service']}: {service['count']} logs")stream() runs in a background daemon thread and returns immediately with a stop function.
def handle_log(log):
print(f"[{log['time']}] {log['level']}: {log['message']}")
stop = client.stream(
on_log=handle_log,
on_error=lambda e: print(f"Stream error: {e}"),
filters={'service': 'api-gateway', 'level': 'error'},
)
# ... later, to stop:
stop()Async streaming runs as a cancellable coroutine:
task = asyncio.create_task(client.stream(on_log=handle_log))
# ... later:
task.cancel()metrics = client.get_metrics()
print(f"Logs sent: {metrics.logs_sent}")
print(f"Logs dropped: {metrics.logs_dropped}")
print(f"Errors: {metrics.errors}")
print(f"Retries: {metrics.retries}")
print(f"Avg latency: {metrics.avg_latency_ms:.1f}ms")
print(f"Circuit breaker trips: {metrics.circuit_breaker_trips}")
print(client.get_circuit_breaker_state()) # CLOSED | OPEN | HALF_OPEN
client.reset_metrics()from flask import Flask
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideFlaskMiddleware
app = Flask(__name__)
client = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
))
LogTideFlaskMiddleware(
app,
client=client,
service_name='flask-api',
log_requests=True,
log_responses=True,
skip_paths=['/metrics'],
)# settings.py
from logtide_sdk import LogTideClient, ClientOptions
LOGTIDE_CLIENT = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
))
LOGTIDE_SERVICE_NAME = 'django-api'
MIDDLEWARE = [
'logtide_sdk.middleware.LogTideDjangoMiddleware',
# ...
]from fastapi import FastAPI
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideFastAPIMiddleware
app = FastAPI()
client = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
))
app.add_middleware(LogTideFastAPIMiddleware, client=client, service_name='fastapi-api')pip install logtide-sdk[starlette]from starlette.applications import Starlette
from logtide_sdk import LogTideClient, ClientOptions
from logtide_sdk.middleware import LogTideStarletteMiddleware
app = Starlette()
client = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
))
app.add_middleware(LogTideStarletteMiddleware, client=client, service_name='starlette-api')All middleware auto-logs requests, responses (with duration and status code), and errors (with serialized exception metadata). Health check paths (/health, /healthz) are skipped by default.
See the examples/ directory for complete working examples:
- basic.py - Simple usage
- advanced.py - All advanced features
client = LogTideClient(ClientOptions(
api_url='http://localhost:8080',
api_key='lp_your_api_key_here',
global_metadata={
'env': os.getenv('APP_ENV', 'production'),
'version': '2.0.0',
'region': 'eu-west-1',
},
))import threading
def _monitor():
while True:
m = client.get_metrics()
if m.logs_dropped > 0:
print(f"WARNING: {m.logs_dropped} logs dropped")
if m.circuit_breaker_trips > 0:
print("ERROR: Circuit breaker tripped")
time.sleep(60)
threading.Thread(target=_monitor, daemon=True).start()Contributions are welcome! Please see CONTRIBUTING.md for guidelines.
MIT License — see LICENSE for details.
