In [181]:
## Library import
import os
import json
import pandas as pd
import numpy as np
import sqlite3
import re
import faiss
import textwrap
from tqdm import tqdm
import openai
from openai import OpenAI
pd.set_option('mode.chained_assignment', None)
openai.api_key = os.getenv("OPENAI_API_KEY")

In [2]:
# Some utils
def create_generic_template(template: str) -> str:
    return re.sub(r"<[^>]+>", "<*>", template.strip())

def build_regex_from_template(template: str) -> str:
    regex_parts = []
    pos = 0
    while pos < len(template):
        if template[pos] == "<":
            end = template.find(">", pos)
            if end == -1:
                raise ValueError("Unmatched '<' in template")
            var_name = template[pos+1:end]
            # Generic field match: match non-space sequences
            regex_parts.append(f"(?P<{var_name}>\\S+)")
            pos = end + 1
        else:
            # Treat spaces as \s+, other chars escaped
            if template[pos].isspace():
                regex_parts.append(r"\s+")
            else:
                regex_parts.append(re.escape(template[pos]))
            pos += 1
    return "^" + "".join(regex_parts) + "$"

def extract_fields(log_line: str, template: str) -> dict:
    pattern = build_regex_from_template(template)
    match = re.match(pattern, log_line)
    if match:
        return match.groupdict()
    else:
        return {}



In [3]:
# Prepare gold for HDFS data

def create_annotated_template_hdfs(row):
    prefix = "<DATE> <TIME> <PID> <LOG_LEVEL> <COMPONENT>: "
    content = ""
    if row["EventId"] == "E1":
        content = "<IP_ADDRESS_1>:<PORT> Served block blk_<BLOCK_ID> to /<IP_ADDRESS_2>"
    elif row["EventId"] == "E2":
        content = "<IP_ADDRESS_1>:<PORT_1> Starting thread to transfer block blk_<BLOCK_ID> to <IP_ADDRESS_2>:<PORT_2>"
    elif row["EventId"] == "E3":
        content = "<IP_ADDRESS_1>:<PORT>:Got exception while serving blk_<BLOCK_ID> to /<IP_ADDRESS_2>:"
    elif row["EventId"] == "E4":
        content = "BLOCK* ask <IP_ADDRESS>:<PORT> to delete  blk_<BLOCK_ID>"
    elif row["EventId"] == "E5":
        content = "BLOCK* ask <IP_ADDRESS_1>:<PORT_1> to replicate blk_<BLOCK_ID> to datanode(s) <IP_ADDRESS_2>:<PORT_2>"
    elif row["EventId"] == "E6":
        content = "BLOCK* NameSystem.addStoredBlock: blockMap updated: <IP_ADDRESS>:<PORT> is added to blk_<BLOCK_ID> size <SIZE>"
    elif row["EventId"] == "E7":
        content = "BLOCK* NameSystem.allocateBlock: /<FILE_PATH>/part-<PART_ID>. blk_<BLOCK_ID>"
    elif row["EventId"] == "E8":
        content = "BLOCK* NameSystem.delete: blk_<BLOCK_ID> is added to invalidSet of <IP_ADDRESS>:<PORT>"
    elif row["EventId"] == "E9":
        content = "Deleting block blk_<BLOCK_ID_1> file /<FILE_PATH>/blk_<BLOCK_ID_2>"
    elif row["EventId"] == "E10":
        content = "PacketResponder <PACKET_RESPONDER_NUM> for block blk_<BLOCK_ID> terminating"
    elif row["EventId"] == "E11":
        content = "Received block blk_<BLOCK_ID> of size <SIZE> from /<IP_ADDRESS>"
    elif row["EventId"] == "E12":
        content = "Received block blk_<BLOCK_ID> src: /<SRC_IP_ADDRESS>:<SRC_PORT> dest: /<DEST_IP_ADDRESS>:<DEST_PORT> of size <SIZE>"
    elif row["EventId"] == "E13":
        content = "Receiving block blk_<BLOCK_ID> src: /<SRC_IP_ADDRESS>:<SRC_PORT> dest: /<DEST_IP_ADDRESS>:<DEST_PORT>"
    elif row["EventId"] == "E14":
        content = "Verification succeeded for blk_<BLOCK_ID>"
    return f"{prefix}{content}"


def get_gold():
    PATH = "data/loghub_2k/HDFS/HDFS_2k.log"
    # HDFS_LOG_STRUCTURED = "data/loghub_2k/HDFS/HDFS_2k.log_structured.csv"
    # hls = pd.read_csv(HDFS_LOG_STRUCTURED)
    HDFS_RAW = "HDFS_2k.log_structured.csv"
    hls = pd.read_csv(HDFS_RAW)
    hdfs_gold = hls[["LineId", "EventId"]]
    hdfs_gold["OriginalLog"] = hls.apply(lambda row: f"{row["Date"]} {row["Time"]} {row["Pid"]} {row["Level"]} {row["Component"]}: {row["Content"]}", axis = 1)
    hdfs_gold["TemplateGeneric"] = hls.apply(lambda row: f"<*> <*> <*> <*> <*>: {row["EventTemplate"]}", axis = 1)
    hdfs_gold["TemplateAnnotated"] = hls.apply(create_annotated_template_hdfs, axis = 1)
    hdfs_gold["VariablesJson"] = hdfs_gold.apply(lambda row: json.dumps(extract_fields(row["OriginalLog"], row["TemplateAnnotated"])), axis = 1)
    return hdfs_gold

In [83]:
# SQLite for storing candidate extracted logs and embeddings

DBNAME = "parsedlog.db"
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS log_examples (
    id INTEGER,
	log_line TEXT NOT NULL,
	template_generic TEXT NOT NULL,
	template_annotated TEXT NOT NULL,
	variables_json	TEXT NOT NULL,
    embedding BLOB NOT NULL,
	PRIMARY KEY(id AUTOINCREMENT)
)
"""
conn = sqlite3.connect(DBNAME)
c = conn.cursor()
c.execute(CREATE_TABLE_SQL)
conn.commit()

def insert_log_example(log_line: str, template_annotated: str, variables: dict, emb, connection):
    # emb = get_normalized_embedding(log_line)
    emb_blob = emb.tobytes()
    cursor = connection.cursor()
    cursor.execute("""
    INSERT INTO log_examples(log_line, template_generic, template_annotated, variables_json, embedding)
    VALUES(?, ?, ?, ?, ?)
    """, (log_line,  create_generic_template(template_annotated) ,template_annotated, json.dumps(variables), emb_blob))
    connection.commit()

# Embedding and top-k

class LogRetriever:
    def __init__(self, db_path, mode="full"):
        assert mode in ("partial", "full"), "mode must be 'partial' or 'full'"
        self.mode = mode
        self.conn = sqlite3.connect(db_path)
        self.client = OpenAI()
        self.c = self.conn.cursor()

    def truncate_log_examples_db(self):
        self.c.execute("DELETE FROM log_examples")
        self.c.execute("DELETE FROM sqlite_sequence WHERE name='log_examples'")
        self.conn.commit()

    def close_conn(self):
        self.conn.close()

    def load_embeddings(self):
        if self.mode == "partial":
            self._load_partial()
        else:
            self._load_full()

    def _load_partial(self):
        self.c.execute("SELECT id, embedding FROM log_examples")
        rows = self.c.fetchall()

        self.db_ids = []
        embeddings = []

        for db_id, emb_blob in rows:
            emb = np.frombuffer(emb_blob, dtype=np.float32)
            embeddings.append(emb)
            self.db_ids.append(db_id)

        if embeddings:
            embeddings_np = np.array(embeddings).astype('float32')
            dim = embeddings_np.shape[1]
            self.index = faiss.IndexFlatIP(dim)
            self.index.add(embeddings_np)
            self.faiss_id_to_db_id = {i: self.db_ids[i] for i in range(len(self.db_ids))}
        else:
            dim = 1536  # default for 'text-embedding-ada-002'
            print("No data found in DB. Initializing empty FAISS index.")
            self.index = faiss.IndexFlatIP(dim)
            self.faiss_id_to_db_id = {}

    def _load_full(self):
        self.c.execute("""
        SELECT id, log_line, template_generic, template_annotated, variables_json, embedding
        FROM log_examples
        """)
        rows = self.c.fetchall()

        self.db_ids = []
        embeddings = []
        self.db_records = {} # db_id -> record dict

        for db_id, log_line, template_generic, template_annotated, variables_json, emb_blob in rows:
            emb = np.frombuffer(emb_blob, dtype=np.float32)
            embeddings.append(emb)
            self.db_ids.append(db_id)
            self.db_records[db_id] = {
                "id": db_id,
                "original_log": log_line,
                "template_generic": template_generic,
                "template_annotated": template_annotated,
                "variables_json": variables_json
            }

        if embeddings:
            embeddings_np = np.array(embeddings).astype('float32')
            dim = embeddings_np.shape[1]
            self.index = faiss.IndexFlatIP(dim)
            self.index.add(embeddings_np)
            self.faiss_id_to_db_id = {i: self.db_ids[i] for i in range(len(self.db_ids))}
        else:
            dim = 1536  # default for 'text-embedding-ada-002'
            print("No data found in DB. Initializing empty FAISS index.")
            self.index = faiss.IndexFlatIP(dim)
            self.faiss_id_to_db_id = {}

    def get_normalized_embedding(self, text: str):
        """
        Embedding needs to be normalized to use cosine similarity via faiss
        """
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=[text],
        )
        emb = np.array(response.data[0].embedding, dtype="float32")
        emb /= np.linalg.norm(emb)
        return emb

    def retrieve_top_k(self, query_log, k=3, score_threshold=0.8):
        query_emb = self.get_normalized_embedding(query_log).astype("float32").reshape(1, -1)

        safe_k = min(k, self.index.ntotal)
        # if k > self.index.ntotal:
        #     print(f"WARN: Only {self.index.ntotal} candidate examples available, returning top-{safe_k} examples")
        
        similarities, indices = self.index.search(query_emb, k)

        results = []
        for sim, idx in zip(similarities[0], indices[0]):
            if idx < 0:
                continue
            
            db_id = self.faiss_id_to_db_id[idx]
            record = None
            if self.mode == "full":
                record = self.db_records[db_id]
            else:
                self.c.execute("""
                SELECT log_line, template_generic, template_annotated, variables_json
                FROM log_examples
                WHERE id = ?
                """, (db_id,))
                row = self.c.fetchone()
                if row:
                    log_line, template_generic, template_annotated, variables_json = row
                    record = {
                        "id": db_id,
                        "original_log": log_line,
                        "template_generic": template_generic,
                        "template_annotated": template_annotated,
                        "variables_json": variables_json
                    }
                else:
                    raise Exception(f"Cannot find row with id {db_id}")    
            
            if record:
                record["similarity"] = float(sim)
                results.append(record)

        if results:
            top_score = results[0]["similarity"]
            # print(f"top score: {top_score}")
            if top_score < score_threshold:
                print(f"WARN: Top score similarity {top_score:.2f} below threshold {score_threshold}. Fallback to zero-shot.")
            
        return results




In [29]:
# Prompt creation

PROMPT_REQUIREMENT = """You are a log parser. For the given log line:
1. Identify a generalized log template by replacing variable parts (timestamp, usernames, IPs, IDs, numbers, etc.) with clearly typed placeholders like <TIMESTAMP>, <USERNAME>, <IP_ADDRESS>.
2. Extract the values of those variables in a JSON dictionary.
3. Only output the packed JSON object (no triple backticks and no new line) with fields: "template", "variables", "original_log"
4. The "template" field should be a plain string with <...> placeholders — **do not wrap the entire template with curly braces {}**.
5. If variables have the same name, separate them with an underscore and number suffix."""

def create_prompt(log_line: str, few_shot_string: str = "") -> str:
    if not log_line:
        raise Exception("Cannot create prompt for empty log_line.")
    fse_prompt = f"Few Shot Examples:\n{few_shot_string}" if few_shot_string else ""
    ll_prompt = f"Process the log line:\n{log_line}"
    prompt = textwrap.dedent(f"{PROMPT_REQUIREMENT}\n\n{fse_prompt}\n\n{ll_prompt}")
    # print(prompt)
    return prompt

def create_few_shot_examples_string(examples: list) -> str:
    if not examples:
        return ""
    s = ""
    for idx, e in enumerate(examples, start=1):
        s += f"""
        Example {idx}:
        Log Line: {e["original_log"]}
        Template: {e["template_annotated"]}
        """
    return textwrap.dedent(s)

In [84]:
# Key extraction function

MODEL = "gpt-4o"

class Extractor:
    def __init__(self,
                 log_retriever: LogRetriever = None,
                 few_shot_enabled=True,
                 k = 3,
                 few_shot_threshold=0.0,
                 debug=False):
        self.client = lt.client
        self.log_retriever = log_retriever
        self.few_shot_enabled = few_shot_enabled
        self.k = k
        self.few_shot_threshold = few_shot_threshold
        self.debug = debug
        
    def extract_log(self, log_line: str) -> dict:
        few_shot_examples = self.log_retriever.retrieve_top_k(log_line, self.k, self.few_shot_threshold) if self.few_shot_enabled else ""
        few_shot_string = create_few_shot_examples_string(few_shot_examples)
        prompt = create_prompt(log_line, few_shot_string)
        response = self.client.chat.completions.create(
            model=MODEL,
            temperature=0,
            messages=[{"role": "user", "content":prompt}],
        )
        content = response.choices[0].message.content.strip()
        if self.debug:
            print(prompt)
            print(content)
        return content



# Experiments

In [None]:
## Zero-shot
pd.set_option('mode.chained_assignment', None)
lt = LogRetriever(db_path=DBNAME, mode="full")
ex = Extractor(log_retriever=lt, few_shot_enabled=False, debug=True)
df = get_gold()
df["Output"] = None
df["OutputTemplateGeneric"] = None
df["OutputTemplateAnnotated"] = None
df["OutputVariablesJson"] = None
for idx, row in tqdm(df.iterrows(), total=len(df), desc="Zero-shot"):
    output = ex.extract_log(row["OriginalLog"])
    df.loc[idx, "Output"] = output
    try:
        d = json.loads(output)
        df.loc[idx, "OutputTemplateGeneric"] = create_generic_template(d["template"])
        df.loc[idx, "OutputTemplateAnnotated"] = d["template"]
        df.loc[idx, "OutputVariablesJson"] = json.dumps(d["variables"])
    except json.JSONDecodeError as e:
        pass
    
    if idx % 10 == 0:
        df.to_csv("zero_shot.csv", index=False)

# df.to_csv("zero_shot.csv", index=False)
print("Done with zero shot")
lt.close_conn()

In [None]:
## Perfect few-shot learning with example of same template provided
def insert_perfect_candidate_logs(log_retriever, gold_df):
    candidate_log_ids = [74, 912, 78, 928, 1765, 3, 16, 292, 73, 1, 10, 1439, 12, 29]

    candidate_set = []
    for line_id in candidate_log_ids:
        row = gold_df[gold_df['LineId'] == line_id]
        if not row.empty:
            row_dict = row.iloc[0].to_dict()
            candidate_set.append({
                "log": row_dict.get("OriginalLog", ""),
                "at": row_dict.get("TemplateAnnotated", ""),
                "v": row_dict.get("VariablesJson", ""),
                "nl" : log_retriever.get_normalized_embedding(row_dict.get("OriginalLog", ""))
            })

    for candidate in candidate_set:
        insert_log_example(
            candidate.get("log"),
            candidate.get("at"),
            candidate.get("v"),
            candidate.get("nl"),
            log_retriever.conn
        )
        


pd.set_option('mode.chained_assignment', None)

df = get_gold()
df["Output"] = None
df["OutputTemplateGeneric"] = None
df["OutputTemplateAnnotated"] = None
df["OutputVariablesJson"] = None

client = OpenAI() 
lt = LogRetriever(db_path=DBNAME, mode="full")
lt.truncate_log_examples_db()
insert_perfect_candidate_logs(lt, df)
lt.load_embeddings()
ex = Extractor(log_retriever=lt, few_shot_enabled=True, k=3, few_shot_threshold = 0.0, debug=True)


for idx, row in tqdm(df.iterrows(), total=len(df), desc="Perfect few-shot"):
    output = ex.extract_log(row["OriginalLog"])
    df.loc[idx, "Output"] = output
    try:
        d = json.loads(output)
        df.loc[idx, "OutputTemplateGeneric"] = create_generic_template(d["template"])
        df.loc[idx, "OutputTemplateAnnotated"] = d["template"]
        df.loc[idx, "OutputVariablesJson"] = json.dumps(d["variables"])
    except json.JSONDecodeError as e:
        pass
    
    
    if idx % 10 == 0:
        df.to_csv("perfect_few_shot.csv", index=False)

df.to_csv("perfect_few_shot.csv", index=False)
print("Done with perfect few shot")
lt.close_conn()

In [None]:
## Partial few-shot learning with unseen test log templates
def insert_partial_candidate_logs(log_retriever, gold_df):
    # Template E1 - E10 in our test
    # Will test on samples E11 - E14
    candidate_log_ids = [74, 912, 78, 928, 1765, 3, 16, 292, 73]

    candidate_set = []
    for line_id in candidate_log_ids:
        row = gold_df[gold_df['LineId'] == line_id]
        if not row.empty:
            row_dict = row.iloc[0].to_dict()
            candidate_set.append({
                "log": row_dict.get("OriginalLog", ""),
                "at": row_dict.get("TemplateAnnotated", ""),
                "v": row_dict.get("VariablesJson", ""),
                "nl" : log_retriever.get_normalized_embedding(row_dict.get("OriginalLog", ""))
            })

    for candidate in candidate_set:
        insert_log_example(
            candidate.get("log"),
            candidate.get("at"),
            candidate.get("v"),
            candidate.get("nl"),
            log_retriever.conn
        )
        



df = get_gold()
df["Output"] = None
df["OutputTemplateGeneric"] = None
df["OutputTemplateAnnotated"] = None
df["OutputVariablesJson"] = None

client = OpenAI() 
lt = LogRetriever(db_path=DBNAME, mode="full")
lt.truncate_log_examples_db()
insert_partial_candidate_logs(lt, df)
lt.load_embeddings()
ex = Extractor(log_retriever=lt, few_shot_enabled=True, k=3, few_shot_threshold = 0.0, debug=True)


## Only test on log E11-E14
ids_to_keep = ["E11", "E12", "E13", "E14"]
df = df[df["EventId"].isin(ids_to_keep)]

for idx, row in tqdm(df.iterrows(), total=len(df), desc="Partial few-shot"):
    output = ex.extract_log(row["OriginalLog"])
    df.loc[idx, "Output"] = output
    try:
        d = json.loads(output)
        df.loc[idx, "OutputTemplateGeneric"] = create_generic_template(d["template"])
        df.loc[idx, "OutputTemplateAnnotated"] = d["template"]
        df.loc[idx, "OutputVariablesJson"] = json.dumps(d["variables"])
    except json.JSONDecodeError as e:
        pass
    
    
    if idx % 10 == 0:
        df.to_csv("partial_few_shot_e11_to_14.csv", index=False)

df.to_csv("partial_few_shot_e11_to_14.csv", index=False)
print("Done with partial few shot")
lt.close_conn()

In [None]:
## Partial few-shot learning with unseen test log templates
## K = 1
def insert_partial_candidate_logs(log_retriever, gold_df):
    # Template E1 - E10 in our test
    # Will test on samples E11 - E14
    candidate_log_ids = [74, 912, 78, 928, 1765, 3, 16, 292, 73]

    candidate_set = []
    for line_id in candidate_log_ids:
        row = gold_df[gold_df['LineId'] == line_id]
        if not row.empty:
            row_dict = row.iloc[0].to_dict()
            candidate_set.append({
                "log": row_dict.get("OriginalLog", ""),
                "at": row_dict.get("TemplateAnnotated", ""),
                "v": row_dict.get("VariablesJson", ""),
                "nl" : log_retriever.get_normalized_embedding(row_dict.get("OriginalLog", ""))
            })

    for candidate in candidate_set:
        insert_log_example(
            candidate.get("log"),
            candidate.get("at"),
            candidate.get("v"),
            candidate.get("nl"),
            log_retriever.conn
        )
        



df = get_gold()
df["Output"] = None
df["OutputTemplateGeneric"] = None
df["OutputTemplateAnnotated"] = None
df["OutputVariablesJson"] = None

client = OpenAI() 
lt = LogRetriever(db_path=DBNAME, mode="full")
lt.truncate_log_examples_db()
insert_partial_candidate_logs(lt, df)
lt.load_embeddings()
ex = Extractor(log_retriever=lt, few_shot_enabled=True, k=1, few_shot_threshold = 0.0, debug=True)


## Only test on log E11-E14
ids_to_keep = ["E11", "E12", "E13", "E14"]
df = df[df["EventId"].isin(ids_to_keep)]

for idx, row in tqdm(df.iterrows(), total=len(df), desc="Partial few-shot"):
    output = ex.extract_log(row["OriginalLog"])
    df.loc[idx, "Output"] = output
    try:
        d = json.loads(output)
        df.loc[idx, "OutputTemplateGeneric"] = create_generic_template(d["template"])
        df.loc[idx, "OutputTemplateAnnotated"] = d["template"]
        df.loc[idx, "OutputVariablesJson"] = json.dumps(d["variables"])
    except json.JSONDecodeError as e:
        pass
    
    
    if idx % 10 == 0:
        df.to_csv("partial_few_shot_e11_to_14.csv", index=False)

df.to_csv("partial_few_shot_e11_to_14.csv", index=False)
print("Done with partial few shot")
lt.close_conn()

In [None]:
## Partial few-shot learning stratified log templates
## K = 1
def insert_one_out_candidate_logs(log_retriever, gold_df, exclude_index=None):
    candidate_log_ids = [74, 912, 78, 928, 1765, 3, 16, 292, 73, 1, 10, 1439, 12, 29]
    if exclude_index < len(candidate_log_ids) and exclude_index >= 0:
        print(f"Removed candidate log of type E{exclude_index+1}")
        candidate_log_ids.pop(exclude_index)
    
    candidate_set = []
    for line_id in candidate_log_ids:
        row = gold_df[gold_df['LineId'] == line_id]
        if not row.empty:
            row_dict = row.iloc[0].to_dict()
            candidate_set.append({
                "log": row_dict.get("OriginalLog", ""),
                "at": row_dict.get("TemplateAnnotated", ""),
                "v": row_dict.get("VariablesJson", ""),
                "nl" : log_retriever.get_normalized_embedding(row_dict.get("OriginalLog", ""))
            })

    for candidate in candidate_set:
        insert_log_example(
            candidate.get("log"),
            candidate.get("at"),
            candidate.get("v"),
            candidate.get("nl"),
            log_retriever.conn
        )
        



df = get_gold()
df["Output"] = None
df["OutputTemplateGeneric"] = None
df["OutputTemplateAnnotated"] = None
df["OutputVariablesJson"] = None

client = OpenAI() 
lt = LogRetriever(db_path=DBNAME, mode="full")

for i in range(14):
    event_id = f"E{i+1}"
    print(f"Working on {event_id}")
    lt.truncate_log_examples_db()
    insert_one_out_candidate_logs(lt, df, i)
    lt.load_embeddings()
    ex = Extractor(log_retriever=lt, few_shot_enabled=True, k=1, few_shot_threshold = 0.0, debug=True)

    for idx, row in tqdm(df.iterrows(), total=len(df), desc="Partial few-shot"):
        if row["EventId"] != event_id:
            continue
            
        output = ex.extract_log(row["OriginalLog"])
        df.loc[idx, "Output"] = output
        try:
            d = json.loads(output)
            df.loc[idx, "OutputTemplateGeneric"] = create_generic_template(d["template"])
            df.loc[idx, "OutputTemplateAnnotated"] = d["template"]
            df.loc[idx, "OutputVariablesJson"] = json.dumps(d["variables"])
        except json.JSONDecodeError as e:
            pass
        
        
        if idx % 10 == 0:
            df.to_csv("partial_few_shot_leave_one_out.csv", index=False)

df.to_csv("partial_few_shot_leave_one_out.csv", index=False)
print("Done")
lt.close_conn()

# Analysis

In [None]:
def compare_generic_vs_annotated(df: pd.DataFrame):
    result = {}

    for variant, gt_col, pred_col in [
        ("Generic", "TemplateGeneric", "OutputTemplateGeneric"),
        ("Annotated", "TemplateAnnotated", "OutputTemplateAnnotated"),
    ]:
        # Compute per-record parsing accuracy
        df[f'ParsingAccuracy_{variant}'] = df[gt_col] == df[pred_col]
        parsing_accuracy = df[f'ParsingAccuracy_{variant}'].mean()

        # Compute Template Recall
        event_group = df.groupby('EventId')[f'ParsingAccuracy_{variant}'].agg(lambda x: x.all())
        template_recall = event_group.mean()

        # Compute Template Precision
        unique_pred_templates = df[pred_col].unique()
        template_precision = event_group.sum() / len(unique_pred_templates)

        result[variant] = {
            "Parsing Accuracy": round(float(parsing_accuracy), 4),
            "Recall Template Accuracy": round(float(template_recall), 4),
            "Precision Template Accuracy": round(float(template_precision), 4),
        }

    # Display nicely
    print("\n=== ✅ Comparison: Generic vs Annotated ===\n")
    print(f"{'Metric':40} | {'Generic':8} | {'Annotated':8}")
    print("-" * 65)
    for metric in result['Generic'].keys():
        generic_val = result['Generic'][metric]
        annotated_val = result['Annotated'][metric]
        print(f"{metric:40} | {generic_val:<8} | {annotated_val:<8}")

    return result


def analyze_template_breakdown_generic(df: pd.DataFrame):
    """
    Breaks down parsing errors to illustrate Recall and Precision issues.
    Prints failed EventIds with mismatched records and fragmented output templates.
    """

    # Compute per-record accuracy
    df['ParsingAccuracyGeneric'] = df['TemplateGeneric'] == df['OutputTemplateGeneric']

    # 1. Recall breakdown
    event_group = df.groupby('EventId')['ParsingAccuracyGeneric'].agg(all_matched=lambda x: x.all())
    failed_event_ids = event_group[event_group['all_matched'] == False].index.tolist()

    print("\n=== Recall Failures (EventIds with mismatched records) ===")
    for event_id in failed_event_ids:
        group_df = df[df['EventId'] == event_id]
        ground_truth_template = group_df['TemplateGeneric'].iloc[0]

        print(f"\nEventId: {event_id}")
        print(f"  Ground Truth Template: {ground_truth_template}\n")
        
        mismatches = group_df[~group_df['ParsingAccuracyGeneric']]
        unique_wrong_preds = mismatches['OutputTemplateGeneric'].unique()

        for i, pred in enumerate(unique_wrong_preds, 1):
            print(f"    Incorrect Pred {i}: {pred}")

    # 2. Precision breakdown
    template_counts = df.groupby('EventId')['OutputTemplateGeneric'].nunique()
    over_fragmented_event_ids = template_counts[template_counts > 1]

    print("\n=== Precision Issues (EventIds mapped to multiple templates) ===")
    for event_id in over_fragmented_event_ids.index:
        templates = df[df['EventId'] == event_id]['OutputTemplateGeneric'].unique().tolist()
        print(f"\nEventId: {event_id} generated {len(templates)} templates:")
        for i, t in enumerate(templates, 1):
            print(f"  {i}. {t}")

    return {
        "FailedEventIds_Recall": failed_event_ids,
        "OverFragmentedEventIds_Precision": over_fragmented_event_ids.to_dict()
    }

def analyze_template_breakdown_annotated(df: pd.DataFrame):
    """
    Breaks down parsing errors for annotated templates.
    Uses fields: TemplateAnnotated (GT) and OutputTemplateAnnotated (Prediction).
    """

    # Compute per-record parsing accuracy for annotated templates
    df['ParsingAccuracyAnnotated'] = df['TemplateAnnotated'] == df['OutputTemplateAnnotated']

    # 1. Recall breakdown (EventIds where not all rows matched)
    event_group = df.groupby('EventId')['ParsingAccuracyAnnotated'].agg(all_matched=lambda x: x.all())
    failed_event_ids = event_group[event_group['all_matched'] == False].index.tolist()

    print("\n=== Recall Failures (EventIds with mismatched records) ===")
    for event_id in failed_event_ids:
        group_df = df[df['EventId'] == event_id]
        ground_truth_template = group_df['TemplateAnnotated'].iloc[0]

        print(f"\nEventId: {event_id}")
        print(f"  Ground Truth Template: {ground_truth_template}\n")
        
        mismatches = group_df[~group_df['ParsingAccuracyAnnotated']]
        unique_wrong_preds = mismatches['OutputTemplateAnnotated'].unique()

        for i, pred in enumerate(unique_wrong_preds, 1):
            print(f"    Predicted {i}: {pred}")

    # 2. Precision breakdown (EventIds that map to multiple predicted templates)
    template_counts = df.groupby('EventId')['OutputTemplateAnnotated'].nunique()
    over_fragmented_event_ids = template_counts[template_counts > 1]

    print("\n=== Precision Issues (EventIds mapped to multiple templates) ===")
    for event_id in over_fragmented_event_ids.index:
        templates = df[df['EventId'] == event_id]['OutputTemplateAnnotated'].unique().tolist()
        print(f"\nEventId: {event_id} generated {len(templates)} templates:")
        for i, t in enumerate(templates, 1):
            print(f"  {i}. {t}")

    return {
        "FailedEventIds_Recall": failed_event_ids,
        "OverFragmentedEventIds_Precision": over_fragmented_event_ids.to_dict()
    }



In [174]:
ZERO_SHOT_OUTPUT = "zero_shot_final.csv"
PERFECT_FEW_SHOT_OUTPUT = "perfect_few_shot_final.csv"
PARTIAL_FEW_SHOT_LEAVE_ONE_OUT = "partial_few_shot_leave_one_out_final.csv"

zero_df = pd.read_csv(ZERO_SHOT_OUTPUT)
perfect_df = pd.read_csv(PERFECT_FEW_SHOT_OUTPUT)
partial_df = pd.read_csv(PARTIAL_FEW_SHOT_LEAVE_ONE_OUT)



In [179]:
"""
To analyze, use these functions:

compare_generic_vs_annotated(df)
analyze_template_breakdown_generic(df)
analyze_template_breakdown_annotated(df)
"""


=== Recall Failures (EventIds with mismatched records) ===

EventId: E1
  Ground Truth Template: <DATE> <TIME> <PID> <LOG_LEVEL> <COMPONENT>: <IP_ADDRESS_1>:<PORT> Served block blk_<BLOCK_ID> to /<IP_ADDRESS_2>

    Predicted 1: <DATE> <TIME> <PID> <LOG_LEVEL> <COMPONENT>: <IP_ADDRESS>:<PORT> Served block blk_<BLOCK_ID> to /<CLIENT_IP_ADDRESS>
    Predicted 2: <DATE> <TIME> <PID> <LOG_LEVEL> <COMPONENT>: <IP_ADDRESS>:<PORT> Served block blk_<BLOCK_ID> to /<DEST_IP_ADDRESS>
    Predicted 3: <DATE> <TIME> <PID> <LOG_LEVEL> <COMPONENT>: <IP_ADDRESS_1>:<PORT_1> Served block blk_<BLOCK_ID> to /<IP_ADDRESS_2>

EventId: E10
  Ground Truth Template: <DATE> <TIME> <PID> <LOG_LEVEL> <COMPONENT>: PacketResponder <PACKET_RESPONDER_NUM> for block blk_<BLOCK_ID> terminating

    Predicted 1: <DATE> <TIME> <PID> <LOG_LEVEL> <COMPONENT>: PacketResponder <RESPONDER_ID> for block blk_<BLOCK_ID> terminating
    Predicted 2: <DATE> <TIME> <PID> <LOG_LEVEL> <COMPONENT>: PacketResponder <NUMBER> for block 

{'FailedEventIds_Recall': ['E1',
  'E10',
  'E11',
  'E2',
  'E3',
  'E4',
  'E5',
  'E7',
  'E9'],
 'OverFragmentedEventIds_Precision': {'E1': 4,
  'E10': 2,
  'E11': 3,
  'E3': 2,
  'E7': 25}}