# log tracing details

> Manage correlation IDs and context propagation for distributed logging.

In [11]:
#| default_exp client.Log

In [12]:
#| exporti
from enum import Enum
from dataclasses import dataclass, field
import uuid
from typing import Optional, Dict, Any, Literal
from contextvars import ContextVar

import datetime as dt
import json

In [13]:
# | export

@dataclass
class Correlation:
    """Correlation information for distributed tracing"""

    trace_id: Optional[str] = None
    span_id: Optional[str] = None
    parent_span_id: Optional[str] = None

    def to_dict(self) -> Dict[str, Any]:
        return {
            "trace_id": self.trace_id,
            "span_id": self.span_id,
            "parent_span_id": self.parent_span_id,
        }


class CorrelationManager:
    """Manages correlation IDs and context propagation"""

    def __init__(self):
        self.trace_id_var: ContextVar[Optional[str]] = ContextVar(
            "trace_id", default=None
        )
        self.request_id_var: ContextVar[Optional[str]] = ContextVar(
            "request_id", default=None
        )
        self.session_id_var: ContextVar[Optional[str]] = ContextVar(
            "session_id", default=None
        )
        self.span_id_var: ContextVar[Optional[str]] = ContextVar(
            "span_id", default=None
        )
        self.correlation_var: ContextVar[Optional[Correlation]] = ContextVar(
            "correlation", default=None
        )
        # Track last span_id per trace_id for proper parent span relationships
        self._trace_span_history: Dict[str, str] = {}

    def generate_trace_id(self) -> str:
        """Generate a new trace ID"""
        return str(uuid.uuid4())

    def generate_request_id(self) -> str:
        """Generate a new request ID"""
        return uuid.uuid4().hex[:12]

    def generate_span_id(self) -> str:
        """Generate a new span ID"""
        return uuid.uuid4().hex[:16]

    def generate_session_id(self) -> str:
        """Generate a new session ID"""
        # Simple random session ID
        # Auth-based session ID generation can be implemented in domain-specific libraries
        return uuid.uuid4().hex[:12]
    
    def get_or_create_correlation(self) -> Correlation:
        """Get or create correlation with automatic span chaining.
        
        Each call creates a NEW span that chains to the previous span,
        enabling span-per-log microservices-style tracing.
        """
        # Get or create trace_id (persists across logs)
        current_trace_id = self.trace_id_var.get()
        if not current_trace_id:
            current_trace_id = self.generate_trace_id()
            self.trace_id_var.set(current_trace_id)
        
        # Get previous span_id to set as parent
        previous_span_id = self._trace_span_history.get(current_trace_id)
        
        # ALWAYS generate a NEW span_id for this log
        new_span_id = self.generate_span_id()
        
        # Update context and history
        self.span_id_var.set(new_span_id)
        self._trace_span_history[current_trace_id] = new_span_id
        
        # Create correlation with chaining
        correlation = Correlation(
            trace_id=current_trace_id,
            span_id=new_span_id,
            parent_span_id=previous_span_id  # Chain to previous span
        )
        self.correlation_var.set(correlation)
        
        return correlation

    def start_request(
        self,
        parent_trace_id: Optional[str] = None,
        auth=None,
        is_pagination_request: bool = False,
    ) -> str:
        """Start a new request context"""
        # Use existing trace_id if available, otherwise generate new one
        # Only generate new trace_id if we don't have one in context AND no parent provided
        current_trace_id = self.trace_id_var.get()
        trace_id = parent_trace_id or current_trace_id or self.generate_trace_id()

        request_id = self.generate_request_id()

        # Use existing session_id or generate new one
        session_id = self.session_id_var.get() or self.generate_session_id()
        span_id = self.generate_span_id()

        # Handle parent span for pagination vs regular requests
        if is_pagination_request:
            # For pagination requests, use the original parent span for this trace
            # This ensures all pagination requests have the same parent
            parent_span_id = self._trace_span_history.get(f"{trace_id}_original_parent")
            if not parent_span_id:
                # If no original parent stored, this is the first pagination request
                # Store current span as original parent for future pagination requests
                parent_span_id = self._trace_span_history.get(trace_id)
                self._trace_span_history[f"{trace_id}_original_parent"] = (
                    parent_span_id or None
                )
        else:
            # For regular requests, use normal span chaining
            parent_span_id = self._trace_span_history.get(trace_id)
            # Store this as the original parent for future pagination requests
            self._trace_span_history[f"{trace_id}_original_parent"] = parent_span_id

        # Update the span history with the current span_id for this trace
        self._trace_span_history[trace_id] = span_id

        # Set context variables
        self.trace_id_var.set(trace_id)
        self.request_id_var.set(request_id)
        self.session_id_var.set(session_id)
        self.span_id_var.set(span_id)

        # Create correlation object
        correlation = Correlation(
            trace_id=trace_id, span_id=span_id, parent_span_id=parent_span_id
        )
        self.correlation_var.set(correlation)

        return request_id

    def get_current_context(self) -> Dict[str, Any]:
        """Get current correlation context"""
        correlation = self.correlation_var.get()
        return {
            "trace_id": self.trace_id_var.get(),
            "request_id": self.request_id_var.get(),
            "session_id": self.session_id_var.get(),
            "span_id": self.span_id_var.get(),
            "correlation": correlation,
        }

    def set_context_value(self, key: str, value: Any):
        """Set a value in the correlation context"""
        correlation = self.correlation_var.get()
        if correlation:
            correlation_dict = correlation.__dict__.copy()
            correlation_dict[key] = value
            self.correlation_var.set(Correlation(**correlation_dict))

In [14]:
#| export

@dataclass
class MultiTenant:
    """Multi-tenant information"""

    user_id: Optional[str] = None
    session_id: Optional[str] = None
    tenant_id: Optional[str] = None
    organization_id: Optional[str] = None

    @classmethod
    def from_kwargs(cls, kwargs, user=None):
        mt = kwargs.get("multi_tenant")
        if isinstance(mt, dict) and mt:
            return cls(**mt)
        elif isinstance(mt, cls):
            return mt
        elif any(k in kwargs for k in ["user_id", "session_id", "tenant_id", "organization_id"]):
            return cls(
                user_id=kwargs.get("user_id") or user,
                session_id=kwargs.get("session_id"),
                tenant_id=kwargs.get("tenant_id"),
                organization_id=kwargs.get("organization_id"),
            )
        
        return None


    def to_dict(self) -> Dict[str, Any]:
        return {
            "user_id": self.user_id,
            "session_id": self.session_id,
            "tenant_id": self.tenant_id,
            "organization_id": self.organization_id,
        }


In [15]:
# | export

@dataclass
class HTTPDetails:
    """HTTP request/response details"""

    method: Optional[str] = None
    url: Optional[str] = None
    status_code: Optional[int] = None
    headers: Optional[Dict[str, str]] = None
    params: Optional[Dict[str, Any]] = None
    response_size: Optional[int] = None
    request_body: Optional[Any] = None
    response_body: Optional[Any] = None

    @classmethod
    def from_kwargs(cls, kwargs):
        hd = kwargs.get("http_details")
        
        if isinstance(hd, dict) and hd:
            return cls(**hd)
        elif isinstance(hd, cls):
            return hd
        elif any(k in kwargs for k in ["method", "url", "status_code", "headers", "response_size"]):
            return cls(
                method=kwargs.get("method"),
                url=kwargs.get("url"),
                status_code=kwargs.get("status_code"),
                headers=kwargs.get("headers"),
                response_size=kwargs.get("response_size"),
                request_body=kwargs.get("request_body"),
                response_body=kwargs.get("response_body"),
            )
        return None

    def to_dict(self) -> Dict[str, Any]:
        return {
            "method": self.method,
            "url": self.url,
            "status_code": self.status_code,
            "headers": self.headers,
            "params": self.params,
            "response_size": self.response_size,
            "request_body": self.request_body,
            "response_body": self.response_body,
        }

In [16]:
#| export

@dataclass
class LogEntity:
    """Entity information for logging"""

    type: str 
    id: Optional[str] = None
    name: Optional[str] = None

    additional_info: Dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_any(cls, obj):
        if isinstance(obj, dict) and obj:
            return cls(**obj)
        elif isinstance(obj, cls):
            return obj
        return None

    def to_dict(self) -> Dict[str, Any]:
        return {
            "type": self.type,
            "id": self.id,
            "name": self.name,
            "additional_info": self.additional_info,
        }

In [17]:
#| export 

class LogLevel(str, Enum):
    """Standard logging levels"""

    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"

    @classmethod
    def from_string(cls, level_str: str) -> "LogLevel":
        """Convert string to LogLevel enum"""
        try:
            return cls(level_str.upper())
        except ValueError:
            return cls.INFO  # default fallback

    def should_log(self, other: "LogLevel") -> bool:
        """Check if this level should log the other level"""
        levels = list(LogLevel)
        return levels.index(self) <= levels.index(other)

In [18]:
#| export

LogMethod  = Literal["POST", "PUT", "DELETE", "PATCH", "COMMENT"]

@dataclass
class LogEntry:
    """Enhanced log entry with structured JSON format"""

    timestamp: str
    level: LogLevel
    message: str
    method : LogMethod = "COMMENT"

    app_name: str = 'default'
    user: Optional[str] = None
    action: Optional[str] = None
    entity: Optional[LogEntity] = None
    status: str = "info"
    duration_ms: Optional[int] = None
    correlation: Optional[Correlation] = None
    multi_tenant: Optional[MultiTenant] = None
    http_details: Optional[HTTPDetails] = None
    extra: Dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        if self.http_details:
            self.method = self.http_details.method

    def to_dict(self) -> Dict[str, Any]:
        result = {
            "timestamp": self.timestamp,
            "level": self.level.value,
            "app_name": self.app_name,
            "message": self.message,
            "user": self.user
            or (
                self.multi_tenant.user_id
                if self.multi_tenant and self.multi_tenant.user_id
                else None
            ),
            "method" : self.method,
             "action": self.action,
            "status": self.status,
            "duration_ms": self.duration_ms,
            "entity": self.entity.to_dict() if self.entity else None,
            "correlation": self.correlation.__dict__ if self.correlation else None,
            "multi_tenant": self.multi_tenant.__dict__ if self.multi_tenant else None,
            "http_details": self.http_details.to_dict() if self.http_details else None,
            "extra": self.extra,
        }
        return {k: v for k, v in result.items() if v is not None}

    def to_json(self) -> str:
        return json.dumps(self.to_dict(), default=str)

    @classmethod
    def create(cls, level: LogLevel, message: str, app_name: str = 'default', **kwargs) -> "Log":
        timestamp = dt.datetime.now().isoformat() + "Z"
        user = kwargs.get("user")
        action = kwargs.get("action")
        status = kwargs.get("status", "info")
        duration_ms = kwargs.get("duration_ms")
        extra = kwargs.get("extra", {})

        entity_obj = LogEntity.from_any(kwargs.get("entity"))
        
        # Handle correlation - can be dict or Correlation object
        correlation_param = kwargs.get("correlation")
        if isinstance(correlation_param, Correlation):
            correlation_obj = correlation_param
        elif isinstance(correlation_param, dict):
            correlation_obj = Correlation(**correlation_param)
        else:
            correlation_obj = None
            
        multi_tenant_obj = MultiTenant.from_kwargs(kwargs, user)
        http_details_obj = HTTPDetails.from_kwargs(kwargs)

        if not user and multi_tenant_obj and multi_tenant_obj.user_id:
            user = multi_tenant_obj.user_id

        return cls(
            timestamp=timestamp,
            level=level,
            app_name=app_name,
            message=message,
            user=user,
            action=action,
            entity=entity_obj,
            status=status,
            duration_ms=duration_ms,
            correlation=correlation_obj,
            multi_tenant=multi_tenant_obj,
            http_details=http_details_obj,
            extra=extra,
        )


In [19]:
log = LogEntry.create(level = LogLevel.INFO, message = 'hello world')

log.to_dict()

{'timestamp': '2025-10-10T10:20:20.056718Z',
 'level': 'INFO',
 'app_name': 'default',
 'message': 'hello world',
 'method': 'COMMENT',
 'status': 'info',
 'extra': {}}

In [20]:
#| hide
import nbdev; 
nbdev.nbdev_export('./Log.ipynb')