In [2]:
from openai import OpenAI
from dotenv import load_dotenv
import pandas as pd
import os


current_dir = os.getcwd()
project_root = os.path.dirname(current_dir)
os.chdir(project_root)
from utils.helpers import parse_log_labels


In [3]:
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
MODEL = "meta-llama/llama-3-8b-instruct:free"

In [4]:
def read_logs(file_path):
    with open(file_path, "r") as f:
        return f.read()

In [5]:
def generate_chat_response(message, model = MODEL):
    client = OpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key=OPENROUTER_API_KEY,
    )

    completion = client.chat.completions.create(
    model=model,
    messages=[
        {
        "role": "user",
        "content": message
        }
    ]
    )
    return completion.choices[0].message.content


## **Line-level Log anomaly Explanation**

In [7]:
df = pd.read_csv("data/Hadoop_2k.log_structured.csv")
df.Level.value_counts()

Level
INFO     1040
WARN      808
ERROR     150
FATAL       2
Name: count, dtype: int64

In [8]:
df

Unnamed: 0,LineId,Date,Time,Level,Process,Component,Content,EventId,EventTemplate
0,1,2015-10-18,"18:01:47,978",INFO,main,org.apache.hadoop.mapreduce.v2.app.MRAppMaster,Created MRAppMaster for application appattempt...,E29,Created MRAppMaster for application appattempt...
1,2,2015-10-18,"18:01:48,963",INFO,main,org.apache.hadoop.mapreduce.v2.app.MRAppMaster,Executing with tokens:,E42,Executing with tokens:
2,3,2015-10-18,"18:01:48,963",INFO,main,org.apache.hadoop.mapreduce.v2.app.MRAppMaster,"Kind: YARN_AM_RM_TOKEN, Service: , Ident: (app...",E61,"Kind: YARN_AM_RM_TOKEN, Service: , Ident: (app..."
3,4,2015-10-18,"18:01:49,228",INFO,main,org.apache.hadoop.mapreduce.v2.app.MRAppMaster,Using mapred newApiCommitter.,E111,Using mapred newApiCommitter.
4,5,2015-10-18,"18:01:50,353",INFO,main,org.apache.hadoop.mapreduce.v2.app.MRAppMaster,OutputCommitter set in config null,E76,OutputCommitter set in config null
...,...,...,...,...,...,...,...,...,...
1995,1996,2015-10-18,"18:10:54,202",WARN,LeaseRenewer:msrabi@msra-sa-41:9000,org.apache.hadoop.ipc.Client,Address change detected. Old: msra-sa-41/10.19...,E10,Address change detected. Old: <*>/<*>:<*> New:...
1996,1997,2015-10-18,"18:10:54,202",WARN,LeaseRenewer:msrabi@msra-sa-41:9000,org.apache.hadoop.hdfs.LeaseRenewer,Failed to renew lease for [DFSClient_NONMAPRED...,E44,Failed to renew lease for [DFSClient_NONMAPRED...
1997,1998,2015-10-18,"18:10:54,546",INFO,RMCommunicator Allocator,org.apache.hadoop.ipc.Client,Retrying connect to server: msra-sa-41:8030. A...,E91,Retrying connect to server: <*>:<*>. Already t...
1998,1999,2015-10-18,"18:10:54,546",ERROR,RMCommunicator Allocator,org.apache.hadoop.mapreduce.v2.app.rm.RMContai...,ERROR IN CONTACTING RM.,E38,ERROR IN CONTACTING RM.


In [9]:
categories = df.Level.value_counts().keys().tolist()
contents = []
for cat in categories:
    sample_logs = df[df.Level == cat].sample(2)
    for i, row in sample_logs.iterrows():
        contents.append(f"Log Level: {row.Level}. Process: {row.Process}. Component: {row.Component}. Content: {row.Content}. EventTemplate: {row.EventTemplate}")
contents



['Log Level: INFO. Process: main. Component: org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor. Content: nodeBlacklistingEnabled:true. EventTemplate: nodeBlacklistingEnabled:true',
 'Log Level: INFO. Process: RMCommunicator Allocator. Component: org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator. Content: Reduce slow start threshold not met. completedMapsForReduceSlowstart 1. EventTemplate: Reduce slow start threshold not met. completedMapsForReduceSlowstart <*>',
 'Log Level: WARN. Process: LeaseRenewer:msrabi@msra-sa-41:9000. Component: org.apache.hadoop.hdfs.LeaseRenewer. Content: Failed to renew lease for [DFSClient_NONMAPREDUCE_1537864556_1] for 238 seconds.  Will retry shortly .... EventTemplate: Failed to renew lease for [DFSClient_NONMAPREDUCE_<*>_<*>] for <*> seconds.  Will retry shortly ...',
 'Log Level: WARN. Process: LeaseRenewer:msrabi@msra-sa-41:9000. Component: org.apache.hadoop.hdfs.LeaseRenewer. Content: Failed to renew lease for [DFSClient_NONMAPREDU

In [20]:
PROMPT = "Detect whether or not there is an anomaly is the log. If so, explain the anomaly and why it happened?" + "\n Log: " + contents[0]
generate_chat_response(PROMPT)

'The joy of analyzing log messages!\n\nIn this log message, we are dealing with the Hadoop MapReduce framework. Specifically, this is a log message from the `RMContainerAllocator` component, which is responsible for allocating containers (i.e., tasks) to reduce nodes in a Hadoop cluster.\n\nThe log message is an `INFO` log level, which indicates that this is not an error message, but rather a informative message about the behavior of the system.\n\nThe anomaly that stands out is the fact that the "Reduce slow start threshold not met". This threshold refers to the maximum number of completed maps (i.e., map tasks) that a reduce task (i.e., reduce process) should allow before it starts processing. This threshold is designed to prevent a reduce task from being overwhelmed by too many map tasks completing too quickly, which could lead to a denial-of-service (DoS) attack or other performance issues.\n\nThe log message indicates that the reduce task has not met this threshold, which suggests

In [21]:
PROMPT = "Detect whether or not there is an anomaly is the log. If so, explain the anomaly and why it happened?" + "\n Log: " + contents[-1]
generate_chat_response(PROMPT)



## **Run-level Log anomaly Explanation**

first let's find the labels

In [17]:
#First method: Find the files given the labels

lables_dict = parse_log_labels("data/Hadoop/abnormal_label.txt")
records = []
for job_type, issues in lables_dict.items():
    for issue, applications in issues.items():
        for app in applications:
            records.append({"Application": app, "Job Type": job_type, "Issue": issue})

df_logs = pd.DataFrame(records)

print(len(df_logs))
print(df_logs.head())

55
                      Application   Job Type         Issue
0  application_1445087491445_0005  WordCount        Normal
1  application_1445087491445_0007  WordCount        Normal
2  application_1445175094696_0005  WordCount        Normal
3  application_1445087491445_0001  WordCount  Machine down
4  application_1445087491445_0002  WordCount  Machine down


In [62]:
# Second method: Given the files, parse the file names and find the corresponding labels
files  = os.listdir("data/Hadoop/")
data = []
for f in files: 
    if f.startswith("WordCount") or f.startswith("PageRank"):
        job_type = f.split("_")[0]
        issue = f.split("_")[1]
        id = "_".join(f.split("_")[2:])
        for container in os.listdir(f"data/Hadoop/{f}"):
            if container.endswith(".log"):
                content = read_logs(f"data/Hadoop/{f}/{container}")
                data.append({"Job Type": job_type, "Issue": issue, "run_id": id, "container_id": container, "Content": content})
df_files = pd.DataFrame(data)
print(len(df_files))
        
print(df_files.head())
df_files.to_csv("data/Hadoop_logs_combined.csv", index=False)



978
    Job Type     Issue                          run_id  \
0  WordCount  DiskFull  application_1445182159119_0001   
1  WordCount  DiskFull  application_1445182159119_0001   
2  WordCount  DiskFull  application_1445182159119_0001   
3  WordCount  DiskFull  application_1445182159119_0001   
4  WordCount  DiskFull  application_1445182159119_0001   

                                 container_id  \
0  container_1445182159119_0001_01_000001.log   
1  container_1445182159119_0001_01_000002.log   
2  container_1445182159119_0001_01_000003.log   
3  container_1445182159119_0001_01_000007.log   
4  container_1445182159119_0001_01_000006.log   

                                             Content  
0  2015-10-19 14:21:32,887 INFO [main] org.apache...  
1  2015-10-19 14:21:43,552 INFO [main] org.apache...  
2  2015-10-19 14:21:43,739 INFO [main] org.apache...  
3  2015-10-19 14:21:43,614 INFO [main] org.apache...  
4  2015-10-19 14:21:43,739 INFO [main] org.apache...  


In [69]:
df_files

Unnamed: 0,Job Type,Issue,run_id,container_id,Content
0,WordCount,DiskFull,application_1445182159119_0001,container_1445182159119_0001_01_000001.log,"2015-10-19 14:21:32,887 INFO [main] org.apache..."
1,WordCount,DiskFull,application_1445182159119_0001,container_1445182159119_0001_01_000002.log,"2015-10-19 14:21:43,552 INFO [main] org.apache..."
2,WordCount,DiskFull,application_1445182159119_0001,container_1445182159119_0001_01_000003.log,"2015-10-19 14:21:43,739 INFO [main] org.apache..."
3,WordCount,DiskFull,application_1445182159119_0001,container_1445182159119_0001_01_000007.log,"2015-10-19 14:21:43,614 INFO [main] org.apache..."
4,WordCount,DiskFull,application_1445182159119_0001,container_1445182159119_0001_01_000006.log,"2015-10-19 14:21:43,739 INFO [main] org.apache..."
...,...,...,...,...,...
973,PageRank,MachineDown,application_1445062781478_0014,container_1445062781478_0014_01_000011.log,"2015-10-17 15:41:50,715 INFO [main] org.apache..."
974,PageRank,MachineDown,application_1445062781478_0014,container_1445062781478_0014_01_000010.log,"2015-10-17 15:41:15,588 INFO [main] org.apache..."
975,PageRank,MachineDown,application_1445062781478_0014,container_1445062781478_0014_01_000004.log,"2015-10-17 15:40:39,486 INFO [main] org.apache..."
976,PageRank,MachineDown,application_1445062781478_0014,container_1445062781478_0014_01_000009.log,"2015-10-17 15:41:16,586 INFO [main] org.apache..."


In [63]:
df_files.Issue.value_counts()

Issue
MachineDown             523
Normal                  167
DiskFull                152
NetworkDisconnection    136
Name: count, dtype: int64

In [66]:
HADOOP_ANOMALIES = ["Machine Down", "Network Disconnection", "Disk Full"]

PROMPT = f"""You are a professional Hadoop software engineer. You are tasked with classifying the log types into one of the following categories: {HADOOP_ANOMALIES}.
Do not provide any explanation. Your output should be the class of the log.
"""

generate_chat_response(PROMPT + "\n" + df_files.Content[0])

"'Disk Full'"

In [67]:
print(df_files[df_files.Issue == "NetworkDisconnection"].iloc[0])

generate_chat_response(PROMPT + "\n" + df_files[df_files.Issue == "NetworkDisconnection"].iloc[0].Content)

Job Type                                                 PageRank
Issue                                        NetworkDisconnection
run_id                             application_1445144423722_0022
container_id           container_1445144423722_0022_01_000011.log
Content         2015-10-18 18:02:10,840 INFO [main] org.apache...
Name: 127, dtype: object


'Disk Full'

In [68]:
print(df_files[df_files.Issue == "MachineDown"].iloc[0])
generate_chat_response(PROMPT + "\n" + df_files[df_files.Issue == "MachineDown"].iloc[0].Content)

Job Type                                                WordCount
Issue                                                 MachineDown
run_id                             application_1445087491445_0008
container_id           container_1445087491445_0008_01_000013.log
Content         2015-10-17 22:31:25,692 INFO [main] org.apache...
Name: 12, dtype: object


"'Disk Full'"

# RAG based Anomaly Explanation