## MicroVision

This notebook covers the following: 

1. Loading and Preprocessing the raw log datasets
2. 

In [12]:
import os

if os.getcwd() == '/Users/matildamwendwa/Desktop/Desktop - Admin‚Äôs MacBook Pro/Python_Projects/microvision/notebooks':
    os.chdir('/Users/matildamwendwa/Desktop/Desktop - Admin‚Äôs MacBook Pro/Python_Projects/microvision')
    print("Changed!!")

print("Current working directory:", os.getcwd())

Current working directory: /Users/matildamwendwa/Desktop/Desktop - Admin‚Äôs MacBook Pro/Python_Projects/microvision


#### Install & Import Dependencies

In [13]:
%pip install drain3 pandas matplotlib tqdm --quiet


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [14]:
import os
import re
import json
import gzip
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import matplotlib.pyplot as plt
from drain3 import TemplateMiner
from drain3.file_persistence import FilePersistence
from drain3.template_miner_config import TemplateMinerConfig
from drain3.template_miner_config import MaskingInstruction

print("‚úÖ Drain3 and dependencies imported successfully.")

‚úÖ Drain3 and dependencies imported successfully.


#### Configurations and Setup

In [15]:
config = {
        "DATA_DIR": "data",
        "DATASET_NAME": "OpenStack",
        "DATASET_LOG": "_full.log",
        "ENRICHED_CSV": "_enriched.csv",
        "TEMPLATES_CSV": "_templates.csv",
        "TEMPLATES_JSON": "_templates.json",
        "MAX_LINES": None,  # Set to None to process all lines
        "PERSISTENCE_PATH": "persistence",
        "DRAIN_PATH": "drain3_state",
}


DATASET_PATH = f"{config['DATA_DIR']}/{config['DATASET_NAME']+config['DATASET_LOG']}"


ENRICHED_CSV = f"{config['DATA_DIR']}/{config['DATASET_NAME']}{config['DATASET_LOG']}{config['ENRICHED_CSV']}"
TEMPLATES_CSV = f"{config['DATA_DIR']}/{config['DATASET_NAME']}{config['DATASET_LOG']}{config['TEMPLATES_CSV']}"
TEMPLATES_JSON = f"{config['DATA_DIR']}/{config['DATASET_NAME']}{config['DATASET_LOG']}{config['TEMPLATES_JSON']}"

persistence_dir = os.path.join(f"{config['DATA_DIR']}/{config['PERSISTENCE_PATH']}", config['DRAIN_PATH'])
os.makedirs(persistence_dir, exist_ok=True)
persistence = FilePersistence(f"{persistence_dir}/drain3_state.bin")

print("Dataset Path:", DATASET_PATH)
print("The extracted templated will be written to:", TEMPLATES_CSV)
print("The extracted templates enriched with metadata will be written to:", ENRICHED_CSV)
print("Persistence path (Drain3)", f"{persistence_dir}")


Dataset Path: data/OpenStack_full.log
The extracted templated will be written to: data/OpenStack_full.log_templates.csv
The extracted templates enriched with metadata will be written to: data/OpenStack_full.log_enriched.csv
Persistence path (Drain3) data/persistence/drain3_state


### Utility Functions for Log Parsing Module

In [16]:
import glob
import re

# ------- UF: Loading Raw Log files

def load_raw_logs(log_dir: str, dataset_name: str):
    log_files = glob.glob(f"{log_dir}/{dataset_name}*.log")
    raw_logs = []
    for file_path in log_files:
        with open(file_path, "r") as f:
            raw_logs.extend(f.readlines())
    return raw_logs


# ------- UF: Convert LogPai-style format to regex

def log_format_to_regex(log_format: str) -> str:
    tokens = re.findall(r"<([^>]+)>", log_format)
    regex = re.escape(log_format)
    for t in tokens:
        esc = re.escape(f"<{t}>")
        if t.lower() == "content":
            repl = rf"(?P<{t}>.*)"
        else:
            repl = rf"(?P<{t}>.+?)"
        regex = regex.replace(esc, repl, 1)
    regex = regex.replace(r"\ ", r"\s+")
    return rf"^{regex}$"


# ------- UF: Parse log line with given format 

def parse_line_with_format(line: str, log_format: str):
    """Return dict of matched groups (or {'Content': line} fallback)."""
    if not log_format:
        return {"Content": line}
    regex = log_format_to_regex(log_format)
    line = line.strip()
    m = re.match(regex, line)
    if not m:
        return {"ParseError": True, "Raw": line, "Content": line}
    return m.groupdict()


### Dynamically Select Log Format Mapping for current dataset

In [17]:
# LOG FORMAT MAPPINGS FROM LogPai's GitHub
log_format_mappings = {
    "OpenStack": "<Date> <Time> <Pid> <Level> <Component> <Content>",
    "Hadoop": "<Date> <Time> <Pid> <Level> <Component>: <Content>",
    "HDFS": "<Date> <Time> <Level> <Component>: <Content>",
    "Spark": "<Date> <Time> <Level> <Component>: <Content>",
    "Zookeeper": "<Date> <Time> <Level> <Component>: <Content>"
}

# Auto-select format for our dataset
log_format = log_format_mappings.get(f"{config['DATASET_NAME']}", None)

if log_format:
    print(f"‚úÖ Log format for {config['DATASET_NAME']}: {log_format}")
else:
    print(f"‚ö†Ô∏è No log format found for {config['DATASET_NAME']}. Please update mapping.")


‚úÖ Log format for OpenStack: <Date> <Time> <Pid> <Level> <Component> <Content>


In [18]:
# Additional Metadata Enrichment Functions

ENRICHMENT_CONFIG = {
    "core_fields": ["Component", "Level", "Method", "URL"],             # Core log identifiers
    "enrich_fields": ["ReqID", "UserID", "TenantID", "IP", "Status"],   # Enrichment layer
    "metadata_fields": [
        "Component", "Level", "Pid", "ReqID", "UserID", 
        "TenantID", "IP", "Status", "Method", "URL", 
        # "Timestamp"
    ],  
}

import datetime
import pandas as pd
from typing import List, Dict

# ---------------- Semantic Text Builder ----------------
def build_semantic_text(row: pd.Series, 
                        core_fields: List[str], 
                        enrich_fields: List[str]) -> str:
    parts = []
    for f in core_fields:
        if f in row and pd.notna(row[f]):
            parts.append(f"[{row[f]}]")
    base = " ".join(parts) + " " + str(row.get("template", ""))

    extras = []
    for f in enrich_fields:
        if f in row and pd.notna(row[f]):
            extras.append(f"{f.lower()}={row[f]}")

    # Return combined text
    return base + (" " + " ".join(extras) if extras else "")


# ---------------- Structured Metadata Builder ----------------
def build_structured_metadata(row: pd.Series, 
                              metadata_fields: List[str]) -> Dict[str, str]:
    meta = {}
    for f in metadata_fields:
        if f in row and pd.notna(row[f]):
            meta[f.lower()] = row[f]

    # Step 2: Combine Date + Time into Timestamp 
    date_str = str(row.get("Date", "")).strip()
    time_str = str(row.get("Time", "")).strip()

    if date_str and time_str:
        # Use consistent format for speed and reliability
        ts = pd.to_datetime(
            f"{date_str} {time_str}",
            format="%Y-%m-%d %H:%M:%S.%f",   # consistent microsecond precision
            errors="coerce"                  # returns NaT if parsing fails
        )
        meta["timestamp"] = ts if pd.notna(ts) else None
    else:
        meta["timestamp"] = None

    return meta


# -------- Apply Enrichment to DataFrame ----------------
def apply_enrichment(df: pd.DataFrame, config: dict):

    print("üîß Applying Additional enrichment...")
    df["semantic_text"] = df.apply(
        lambda r: build_semantic_text(r, config["core_fields"], config["enrich_fields"]), axis=1
    )
    df["structured_metadata"] = df.apply(
        lambda r: build_structured_metadata(r, config["metadata_fields"]), axis=1
    )

    # Extract Timestamp from structured_metadata dict
    df["Timestamp"] = df["structured_metadata"].apply(
        lambda m: m.get("timestamp") if isinstance(m, dict) else None
    )
    # Sort by time for temporal consistency
    df = df.sort_values("Timestamp").reset_index(drop=True)
    
    return df


### MetaDataDrainParser Class - Custom class that 

In [19]:

class MetadataDrainParser:
    def __init__(self, template_miner, log_format: str, 
                 enriched_csv: str, save_every: int, templates_csv: str, templates_json: str, 
                 enrichment_config: dict, 
                 ):
        self.template_miner = template_miner
        self.log_format = log_format
        self.enriched_csv = enriched_csv
        self.save_every = save_every
        self.templates_csv = templates_csv
        self.templates_json = templates_json
        self.enrichment_config = enrichment_config or ENRICHMENT_CONFIG

        self.buffer = []
        self.total = 0
        self.unique_templates = set()

        # Clean up existing files

        for path in [self.enriched_csv, self.templates_csv, self.templates_json]:
            try:
                if os.path.exists(path):
                    os.remove(path)
                    print(f"üóëÔ∏è Removed existing {path} for a fresh run.")
            except Exception as e:
                print(f"‚ö†Ô∏è Could not remove {path}: {e}")

    def detect_log_format(self, sample_line: str, base_format: str) -> str:

        rotated_pattern = r'^[\w\-.]+\.log(?:\.\d+)?\.\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2}'
        if re.match(rotated_pattern, sample_line):
            if not base_format.startswith("<File>"):
                print("üß† Auto-detected rotated log prefix ‚Äî prepending <File> to log format.")
                return "<File> " + base_format
        return base_format

    def process_line(self, raw_line: str, line_no: int):
        """Parse one log line, enrich with metadata, and append to buffer."""
        raw = raw_line.rstrip("\n")
        
        if self.detect_log_format and line_no == 1:
            old_format = self.log_format
            self.log_format = self.detect_log_format(raw, old_format)
            if self.log_format != old_format:
                print(f"‚úÖ Adjusted log format ‚Üí {self.log_format}")

        # ------- Extract structured metadata
        parsed_meta = parse_line_with_format(raw, self.log_format)
        content = parsed_meta.get("Content") or raw

        # -------- Send to Drain3 for template extraction
        try:
            result = self.template_miner.add_log_message(content)
        except Exception as e:
            result = {"cluster_id": None, "template_mined": None, "change_type": f"error:{e}"}

        # --------- Collect Drain3 + metadata output
        template = result.get("template_mined")
        template_id = result.get("cluster_id")

        self.unique_templates.add(template or f"__none_{template_id}")

        # Merge all information
        row = {
            "line_no": line_no,
            "raw": raw,
            "content": content,
            "template_id": template_id,
            "template": template,
        }
        # Add extracted metadata fields (timestamp, service, level, etc.)
        row.update(parsed_meta)

        self.buffer.append(row)
        self.total += 1

        # Periodic flush
        if len(self.buffer) >= self.save_every:
            self.flush_to_csv()

    def flush_to_csv(self):
        """Write buffer to disk and clear memory."""
        df = pd.DataFrame(self.buffer)
        df = apply_enrichment(df, self.enrichment_config)

        header = not os.path.exists(self.enriched_csv)
        df.to_csv(self.enriched_csv, mode="a", index=False, header=header)
        print(f"[flush] wrote {len(self.buffer)} rows ‚Üí {self.enriched_csv} (total parsed {self.total})")
        self.buffer = []

    def export_templates(self):
        """Extract and persist Drain3 templates """
        clusters = self.template_miner.drain.clusters
        records = []
        for c in clusters:
            tmpl = (getattr(c, "template", None)
                    or (lambda f: f() if callable(f) else None)(getattr(c, "get_template", None))
                    or getattr(c, "template_str", None)
                    or getattr(c, "example_log", None))
            
            records.append({
                "template_id": c.cluster_id,
                "template": tmpl,
                "size": c.size,
            })

        # Export to CSV
        df = pd.DataFrame(records)
        df.to_csv(self.templates_csv, index=False)
        print(f"üß© Exported {len(records)} templates ‚Üí {self.templates_csv}")

        # Export to JSON for downstream semantic loading
        with open(self.templates_json, "w") as f:
            json.dump(records, f, indent=2)
        print(f"üì¶ Templates also saved as JSON ‚Üí {self.templates_json}")


    def finalize(self):
        """Final flush after finishing all lines."""
        if self.buffer:
            self.flush_to_csv()
        print(f"‚úÖ Parsing complete. Total parsed lines: {self.total}")
        print(f"üß© Unique templates discovered: {len(self.unique_templates)}")
        
        # Export template catalogue
        self.export_templates()
        print("üìä Template catalogue exported for validation and benchmarking.")


### Initialize and Configure Drain3

In [20]:
drain_config = TemplateMinerConfig()

# --- Core Parameters ---
drain_config.profiling_enabled = True
drain_config.drain_sim_th = 0.45         
drain_config.drain_depth = 5             
drain_config.mask_prefix = "<*>"         
drain_config.extra_delimiters = ["=", ",", " ", ":", "-"]

# --- Dynamic Field Masking ---

drain_config.masking_instructions = [
        MaskingInstruction(r"req-[0-9a-f-]+", "<REQ_ID>"),
        MaskingInstruction(r"[0-9a-f]{32}", "<HASH>"),
        MaskingInstruction(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", "<IP>"),
    ]

# --- Initialize TemplateMiner ---
try:
    template_miner = TemplateMiner(persistence, drain_config)
    print("‚úÖ Drain3 TemplateMiner initialized successfully.")
except Exception as e:
    print("‚ö†Ô∏è Error initializing TemplateMiner:", e)
    raise

‚úÖ Drain3 TemplateMiner initialized successfully.


#### Running the Log Parsing and Metadata Extraction Process

In [21]:
parser = MetadataDrainParser(
    template_miner=template_miner,
    log_format=log_format,
    enriched_csv=ENRICHED_CSV,
    save_every=50000,
    templates_csv=TEMPLATES_CSV,
    templates_json=TEMPLATES_JSON,
    enrichment_config=ENRICHMENT_CONFIG,
)

with open(DATASET_PATH, "r", encoding="utf-8", errors="ignore") as fh:
    for i, line in enumerate(tqdm(fh, desc="Parsing lines"), start=1):
        if config['MAX_LINES'] and i > config['MAX_LINES']:
            break
        parser.process_line(line, i)

# Finalize and save
parser.finalize()

Parsing lines: 0it [00:00, ?it/s]

üß† Auto-detected rotated log prefix ‚Äî prepending <File> to log format.
‚úÖ Adjusted log format ‚Üí <File> <Date> <Time> <Pid> <Level> <Component> <Content>


Parsing lines: 49918it [00:01, 28362.97it/s]

üîß Applying Additional enrichment...


Parsing lines: 52755it [00:07, 1703.05it/s] 

[flush] wrote 50000 rows ‚Üí data/OpenStack_full.log_enriched.csv (total parsed 50000)


Parsing lines: 98030it [00:08, 27535.24it/s]

üîß Applying Additional enrichment...


Parsing lines: 103863it [00:13, 2556.57it/s]

[flush] wrote 50000 rows ‚Üí data/OpenStack_full.log_enriched.csv (total parsed 100000)


Parsing lines: 149755it [00:15, 28105.60it/s]

üîß Applying Additional enrichment...


Parsing lines: 155598it [00:20, 2523.98it/s] 

[flush] wrote 50000 rows ‚Üí data/OpenStack_full.log_enriched.csv (total parsed 150000)


Parsing lines: 198334it [00:22, 27810.40it/s]

üîß Applying Additional enrichment...


Parsing lines: 204225it [00:27, 2538.74it/s] 

[flush] wrote 50000 rows ‚Üí data/OpenStack_full.log_enriched.csv (total parsed 200000)


Parsing lines: 207632it [00:27, 7516.75it/s]


üîß Applying Additional enrichment...
[flush] wrote 7632 rows ‚Üí data/OpenStack_full.log_enriched.csv (total parsed 207632)
‚úÖ Parsing complete. Total parsed lines: 207632
üß© Unique templates discovered: 31
üß© Exported 31 templates ‚Üí data/OpenStack_full.log_templates.csv
üì¶ Templates also saved as JSON ‚Üí data/OpenStack_full.log_templates.json
üìä Template catalogue exported for validation and benchmarking.
