# Introduction

A log processing system using Fenic's text extraction and semantic enrichment capabilities to transform unstructured logs into actionable incident response data.

This pipeline demonstrates log enrichment through multi-stage processing:

- Template-based parsing without regex
- Service metadata enrichment via joins
- LLM-powered error categorization and remediation
- Incident severity assessment with business context


## Session Configuration for Log Enrichment

This code sets up a session for semantic log enrichment using the fenic library. 

It defines the application and language model configuration, specifying the use of the "gpt-4o-mini" model with custom rate and token limits. The session is then created and initialized, enabling subsequent semantic operations on log data.

In [None]:
import fenic as fc
from pydantic import BaseModel, Field

# Configure session with semantic capabilities
config = fc.SessionConfig(
    app_name="log_enrichment",
    semantic=fc.SemanticConfig(
        language_models={
            "mini" : fc.OpenAIModelConfig(model_name="gpt-4o-mini", rpm=500,tpm=200_000)
        }
    )
)

# Create session
session = fc.Session.get_or_create(config)



## Raw Log and Service Metadata Preparation

This section defines two datasets:

**Raw application logs**: A list of log messages from various services, covering different log levels (ERROR, WARN, INFO, CRITICAL) and formats, simulating real world operational events.

**Service metadata**: A list of dictionaries providing additional context for each service, such as the responsible team, criticality level, and on-call communication channel.

These datasets are used as the foundation for log enrichment and analysis.


In [None]:
# Raw application logs with different text formats
raw_logs_data = [
    "2024-01-15 14:32:01 [ERROR] payment-api: Connection timeout to db-primary.internal:5432 after 30s retries=3 connection_id=conn_789",
    "2024-01-15 14:32:15 [WARN] user-service: Rate limit exceeded client_ip=192.168.1.100 requests=1205/min endpoint=/api/v1/users",
    "2024-01-15 14:33:02 [ERROR] order-processor: Payment validation failed order_id=12345 payment_method=credit_card error_code=INVALID_CVV",
    "2024-01-15 14:35:22 [INFO] auth-service: User login successful user_id=user_67890 session_id=sess_abc123 ip=10.0.1.55",
    "2024-01-15 14:36:45 [ERROR] notification-service: Failed to send email to user@example.com smtp_error=Connection_refused retries=2",
    "2024-01-15 14:37:12 [WARN] inventory-service: Low stock alert product_id=SKU_9876 current_stock=5 threshold=10",
    "2024-01-15 14:38:33 [ERROR] payment-api: Database connection pool exhausted max_connections=50 active=50 pending=15",
    "2024-01-15 14:39:01 [CRITICAL] order-processor: Circuit breaker opened for payment-gateway failure_rate=85% threshold=80%",
    "2024-01-15 14:40:15 [ERROR] user-service: Authentication failed user_id=user_12345 reason=invalid_token attempts=3",
    "2024-01-15 14:41:22 [WARN] cache-service: Redis connection latency high avg_latency=250ms threshold=100ms",
    "2024-01-15 14:42:33 [ERROR] file-service: Disk space critical mount=/data/uploads usage=95% available=2GB",
    "2024-01-15 14:43:44 [INFO] metrics-service: Health check passed response_time=45ms status=healthy",
    "2024-01-15 14:44:55 [ERROR] search-service: Elasticsearch cluster unhealthy nodes_down=2 total_nodes=5",
    "2024-01-15 14:45:10 [WARN] api-gateway: Request timeout to upstream service=user-service timeout=10s endpoint=/api/v1/profile",
    "2024-01-15 14:46:20 [ERROR] backup-service: S3 upload failed file=backup_20240115.tar.gz error=AccessDenied bucket=prod-backups"
]

# Service metadata for classical enrichment
service_metadata_data = [
    {"service_name": "payment-api", "team_owner": "payments-team", "criticality": "critical", "on_call_channel": "#payments-oncall"},
    {"service_name": "user-service", "team_owner": "identity-team", "criticality": "high", "on_call_channel": "#identity-alerts"},
    {"service_name": "order-processor", "team_owner": "commerce-team", "criticality": "critical", "on_call_channel": "#commerce-oncall"},
    {"service_name": "auth-service", "team_owner": "identity-team", "criticality": "critical", "on_call_channel": "#identity-alerts"},
    {"service_name": "notification-service", "team_owner": "platform-team", "criticality": "medium", "on_call_channel": "#platform-alerts"},
    {"service_name": "inventory-service", "team_owner": "commerce-team", "criticality": "high", "on_call_channel": "#commerce-oncall"},
    {"service_name": "cache-service", "team_owner": "platform-team", "criticality": "high", "on_call_channel": "#platform-alerts"},
    {"service_name": "file-service", "team_owner": "platform-team", "criticality": "medium", "on_call_channel": "#platform-alerts"},
    {"service_name": "metrics-service", "team_owner": "observability-team", "criticality": "medium", "on_call_channel": "#observability"},
    {"service_name": "search-service", "team_owner": "data-team", "criticality": "high", "on_call_channel": "#data-oncall"},
    {"service_name": "api-gateway", "team_owner": "platform-team", "criticality": "critical", "on_call_channel": "#platform-alerts"},
    {"service_name": "backup-service", "team_owner": "platform-team", "criticality": "medium", "on_call_channel": "#platform-alerts"}
]

## Creating DataFrames for Logs and Metadata

Here we convert the raw log messages and service metadata into DataFrames, making them ready for further processing and enrichment. We also print a summary of the pipeline setup, including the number of log entries and metadata records being processed.

In [None]:
# Create DataFrames
logs_df = session.create_dataframe({"raw_message": raw_logs_data})
metadata_df = session.create_dataframe(service_metadata_data)

print("🚀 Log Enrichment Pipeline")
print("=" * 70)
print(f"Processing {logs_df.count()} log entries with {metadata_df.count()} service metadata records\n")

## Stage 1: Parsing Unstructured Log Messages

This section demonstrates the first stage of the log enrichment pipeline: parsing raw, unstructured log messages into structured fields (timestamp, level, service, and message) using a template based extraction method. 

The parsed results are filtered to include only valid entries and a sample of the structured output is displayed.

In [None]:
 # Stage 1: Parse unstructured logs using template extraction
print("🔍 Stage 1: Parsing unstructured log messages...")
log_template = "${timestamp:none} [${level:none}] ${service:none}: ${message:none}"

parsed_df = logs_df.select(
    fc.text.extract("raw_message", log_template).alias("parsed")
).select(
    fc.col("parsed").get_item("timestamp").alias("timestamp"),
    fc.col("parsed").get_item("level").alias("level"),
    fc.col("parsed").get_item("service").alias("service"),
    fc.col("parsed").get_item("message").alias("message")
).filter(
    fc.col("timestamp").is_not_null()
)

print("Sample parsed logs:")
parsed_df.select("timestamp", "level", "service").show(3)

## Stage 2: Classical Enrichment with Service Metadata

In this stage, the parsed log data is enriched by joining it with service metadata. 

The service name is aligned between datasets, and additional context, such as team ownership, criticality, and on-call channel, is added to each log entry. 

A sample of the enriched logs is then displayed, showing how operational context is integrated with the raw log information.

In [None]:
# Stage 2: Classical enrichment with service metadata
print("\n🔗 Stage 2: Enriching with service metadata...")
# Rename service_name to service using select with alias
metadata_df_renamed = metadata_df.select(
    metadata_df.service_name.alias("service"),
    "team_owner",
    "criticality",
    "on_call_channel"
)
enriched_df = parsed_df.join(
    metadata_df_renamed,
    on="service",
    how="left"
).select(
    "timestamp",
    "level",
    "service",
    "message",
    "team_owner",
    "criticality",
    "on_call_channel"
)

print("Sample enriched logs:")
enriched_df.select("service", "timestamp", "team_owner", "criticality").show(3)

## Stage 3: Semantic Enrichment with Language Models

In this stage, advanced semantic enrichment is applied to the log data using large language models (LLMs). 

The code extracts detailed error analysis fields, classifies the severity of each incident by considering both the message and service criticality, and generates actionable remediation steps for on-call teams. 

The final output presents a comprehensive, enriched view of each log entry, combining structured metadata, semantic insights, and recommended actions.

In [None]:
# Stage 3: Semantic enrichment using LLM operations
print("\n🧠 Stage 3: Applying semantic enrichment with LLMs...")
print("This may take a few moments as we process logs with language models...")

# Define the Pydantic model for semantic error extraction
class ErrorAnalysis(BaseModel):
    """Pydantic model for semantic error extraction"""
    error_category: str = Field(..., description="Main category of the error (e.g., database, network, authentication, resource)")
    affected_component: str = Field(..., description="Specific component or resource affected")
    potential_cause: str = Field(..., description="Most likely root cause of the issue")

# Semantic extraction for error analysis using Pydantic model
final_df = enriched_df.select(
    "timestamp",
    "level",
    "service",
    "message",
    "team_owner",
    "criticality",
    "on_call_channel",
    # Extract error analysis information using Pydantic model
    fc.semantic.extract("message", ErrorAnalysis).alias("analysis"),
    # Classify incident severity based on message and service criticality
    fc.semantic.classify(
        fc.text.concat(fc.col("message"), fc.lit(" (criticality: "), fc.col("criticality"), fc.lit(")")),
        ["low", "medium", "high", "critical"]
    ).alias("incident_severity"),
    # Generate remediation steps
    fc.semantic.map(
        "Generate 2-3 specific remediation steps that the on-call team should take to resolve this issue: {message} | Service: {service} | Team: {team_owner}"
    ).alias("remediation_steps")
)

# Create readable final output with extracted fields
final_readable = final_df.select(
    "timestamp",
    "level",
    "service",
    "message",
    "team_owner",
    "criticality",
    "on_call_channel",
    final_df.analysis.error_category.alias("error_category"),
    final_df.analysis.affected_component.alias("affected_component"),
    final_df.analysis.potential_cause.alias("potential_cause"),
    "incident_severity",
    "remediation_steps"
).cache()

In [None]:
 # Display results
print("\n✅ Pipeline Complete! Final enriched logs:")
print("-" * 70)
final_readable.show()

## Analytics: Error Category Distribution

This section demonstrates basic analytics on the enriched log data by calculating and displaying the distribution of error categories. 

It groups the final results by error category and shows the count of occurrences for each, providing insights into the most common types of errors in the dataset.

In [None]:
# Analytics examples
print("\n📈 Analytics Examples:")
print("-" * 70)

# Error category distribution
print("\nError Category Distribution:")
final_readable.group_by("error_category").agg(fc.count("*").alias("count")).show()

## Analytics: Incident Severity by Service Criticality

This section analyzes the relationship between service criticality and incident severity.

It groups the enriched log data by both criticality and severity levels, displaying the count of incidents for each combination. 

This helps identify which critical services are experiencing the most severe issues.

In [None]:
# Severity by service criticality
print("\nIncident Severity by Service Criticality:")
final_readable.group_by("criticality", "incident_severity").agg(fc.count("*").alias("count")).show()

## Identifying and Displaying High-Priority Incidents

This section filters the enriched log data to highlight incidents classified as "critical" or "high" severity. 

It displays key details for each high-priority incident, including the affected service, responsible team, severity, on-call channel, and recommended remediation steps. 

The total number of urgent incidents is also reported, to quickly identify and respond to the most pressing issues.

In [None]:
# High-priority incidents requiring immediate attention
print("\nHigh-Priority Incidents (Critical/High severity):")
print("-" * 70)
critical_incidents = final_readable.filter(
    (final_readable.incident_severity == "critical") | (final_readable.incident_severity == "high")
).select(
    "service",
    "team_owner",
    "incident_severity",
    "on_call_channel",
    "remediation_steps"
)
critical_incidents.show()

critical_count = critical_incidents.count()
print(f"\n🚨 Found {critical_count} high-priority incidents requiring immediate attention")
session.stop()