In [1]:
import openai
import statistics
import time
import random
import gc
import re
import pandas as pd
from tqdm import tqdm
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix

In [2]:
tqdm.pandas()

In [3]:
# 1 for abnormal and 0 for normal
ABNORMAL = 1
NORMAL = 0

class base_CFG:
    data_path = 'data/bgl/bgl_structured_unique.csv'
    tmp_fold = 'cache/'

    ##########################################
    # openai.api setting
    ##########################################
    openai.organization = 'YOUR-ORG-ID'
    openai.api_key = 'YOUR-API-KEY'

    ##########################################
    # openai model parameters setting
    ##########################################
    openai_model = "gpt-3.5-turbo"
    temperature = 0.3  # between 0 and 2
    response_num = 5   # How many chat completion choices to generate for each input message.
    frequency_penalty = 0.5

    pos_num = 0
    neg_num = 0

    user_prompt = 'explain_log'

    ##########################################
    # exponential fallback retry setting
    ##########################################
    max_retries = 10
    exponential_base = 2
    jitter = True

# read data

In [5]:
def read_and_preprocess_data(cfg):
    df = pd.read_csv(cfg.data_path)
    df['target'] = df['label_info'].apply(lambda x : NORMAL if x == '-' else ABNORMAL)
    df['full_log'] = df['full_log'].str.lower()
    df['log_message'] = df.full_log.apply(lambda x : str(' '.join(x.split(' ')[5:])).lower())
    

    print(f"data shape: {df.shape}, data columns: {df.columns}")
    return df

# request

In [6]:
class RegenerateError(Exception):
    """
    Exception: 
        incomplete response need to be regenerated.
    """
    def __init__(self, regenerate_cnt, *args: object) -> None:
        super().__init__(*args)
        self.regenerate_cnt = regenerate_cnt

    def __str__(self) -> str:
        return f'Error: {self.regenerate_cnt} responses need to be regenerated.'
    
class MaxRegenerateError(Exception):
    """
    Exception: maximum number of regenerations reached.
    """
    def __init__(self, *args: object) -> None:
        super().__init__(*args)

In [8]:
def completion_one_log(system_prompt, user_prompt, model='gpt-3.5-turbo', 
                       temperature=0.5, response_num=1, frequency_penalty=0):
    """
    call openai.ChatCompletion and return completion.choices
    """
    completion = openai.ChatCompletion.create(
        model = model,
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature = temperature,
        n = response_num,
        frequency_penalty = frequency_penalty
    )

    """Chat Completion Response
    {
        "choices": [
        {
            "finish_reason": "stop", # or 'length' or 'function_call' or 'content_filter'
            "index": 0,
            "message": {
                "content": "The 2020 World Series was played in Texas at Globe Life Field in Arlington.",
                "role": "assistant"
            }
        }],
        "created": 1677664795,
        "id": "chatcmpl-7QyqpwdfhqwajicIEznoc6Q47XAyW",
        "model": "gpt-3.5-turbo-0613",
        "object": "chat.completion",
        "usage": {
            "completion_tokens": 17,
            "prompt_tokens": 57,
            "total_tokens": 74
        }
    }
    """

    return completion.choices

In [22]:
def request_one_log_with_retry(system_prompt, user_prompt, cfg:base_CFG):
    num_retries = 0
    delay_seconds = 10
    regenerate_retries = 0

    res = []
    response_num_cache = cfg.response_num

    while True:
        try:
            choices = completion_one_log(system_prompt, user_prompt, model=cfg.openai_model,
                                         temperature=cfg.temperature, response_num=response_num_cache,
                                         frequency_penalty=cfg.frequency_penalty)
            
            ##############################
            # Check the generated text. Regenerate if necessary.
            ##############################
            regenerate_cnt = 0
            for choice in choices:
                # generated text is truncated
                if choice['finish_reason'] == 'length':
                    print(f'trunc : ' + choice['message']['content'])
                    regenerate_cnt += 1
                    continue
                # save response
                res.append(choice['message']['content'])

            if regenerate_cnt:
                raise RegenerateError(regenerate_cnt)
            else:
                return res
        
        except RegenerateError as e:
            response_num_cache = e.regenerate_cnt

            regenerate_retries += 1
            if regenerate_retries > cfg.max_retries:
                raise MaxRegenerateError()
            time.sleep(delay_seconds)
        
        except Exception as e:
            num_retries += 1
            if num_retries > cfg.max_retries:
                raise Exception(
                        f"Maximum number of retries ({cfg.max_retries}) exceeded."
                    )
            
            print(f"{e}, retry No.{num_retries}, delay {delay_seconds}s")
            
            # Increment the delay
            delay_seconds *= cfg.exponential_base * (1 + cfg.jitter * random.random())
            time.sleep(delay_seconds)

In [10]:
def get_user_prompt(log_df, log_row, cfg):
    # get few shot
    few_shot_prompt = ""
    if cfg.pos_num != 0:
        few_shot_prompt += '\n'.join(log_df[log_df.target == NORMAL].head(cfg.pos_num).full_log.apply(lambda x: '- "' + x + '" is a normal log').values)
        few_shot_prompt += '\n'
    if cfg.neg_num != 0:
        few_shot_prompt += '\n'.join(log_df[log_df.target == ABNORMAL].head(cfg.neg_num).full_log.apply(lambda x: '- "' + x + '" is an abnormal log').values)
        few_shot_prompt += '\n'
    if few_shot_prompt != "":
        few_shot_prompt = "examples:\n" + few_shot_prompt

    # full log as prompt
    if cfg.user_prompt == 'full_log':
        return few_shot_prompt + "question:\n" + log_row['full_log']
    
    # ask for reason with a log and its label
    if cfg.user_prompt == 'ask_reason':
        full_log, label = log_row['full_log'], log_row['target']
        label = 'normal' if label == NORMAL else 'abnormal'
        prompt = few_shot_prompt + f'why does log "{full_log}" is labeled as {label}?'
        if label == 'abnormal':
            prompt += ' how to solve this abnormal log?'
        return prompt

    # ChatGPT expand and explain log. Log message include log level.
    if cfg.user_prompt == 'explain_log':
        log_message = log_row['log_message']
        prompt = f'expand and explain this log : "{log_message}"'
        return prompt

In [11]:
def request_logs(log_df, cfg, past_res=None):
    # restore past result
    if past_res == None:
        res = []
    else:
        res = past_res 

    # requests
    for index, row in tqdm(log_df.iterrows(), total = len(log_df)):
        if index < len(res):
            continue
        try:
            user_prompt = get_user_prompt(log_df, row, cfg)
            res.append(request_one_log_with_retry(cfg.system_prompt, user_prompt, cfg))
        except MaxRegenerateError as e:
            print(e)
            res.append(['MaxRegenerateError'])
        except Exception as e:
            print(e)
            print(f"save temp result to {cfg.temp_fold}response.csv, successful length is {len(res)}.")
            pd.DataFrame(res).to_csv(f'{cfg.temp_fold}response.csv', index=False, header=False)
            res.append(['OtherError'])
        time.sleep(10)
            
    log_df['responses'] = res

In [12]:
def get_past_res(cfg):
    res_df = pd.read_csv(f'{cfg.temp_fold}response.csv', header=None)
    res = []
    for _, row in res_df.iterrows():
        res.append(list(row))
    del res_df
    gc.collect()
    return res

# pipeline

In [17]:
class CFG1(base_CFG):
    exp_name = 'June_exp1_explain_and_expand'
    system_prompt = 'You are a log analysis expert. Most of the raw logs are concise, so based on your extensive experience in log analysis, you should expand and explain the original logs. The better the information you provide, the more helpful it will be for downstream log anomaly detection tasks and operations engineers. To provide better information, you need to focus on the following aspects:\n\n1. Convert abbreviations to full words or use parentheses to explain them.\n2. Convert unatural string to natual language. For example convert "2005-06-03-15.42.50.363779" into "timestamp(2005-06-03-15.42.50.363779)"\n3. Pay attention to the content after the last colon of log message. Explain what is it and why does it happen and what will it cause potentially.\n4. Pay attention to the parameter part and the number part. For example, exit code, indicating 0 is normal, otherwise it is an exception. 0 and 1 may represent down and up in command. For file path parameters, explain the meaning of every sub-path. For other strings, explaining their meaning.\n5. For very short logs without colons nor parameters, focus on providing as much relevant information as possible related to that particular log entry.\n6. Do not summarize'

CFG_list = [
    CFG1
]

In [23]:
df = read_and_preprocess_data(base_CFG)
df.head()

data shape: (302, 7), data columns: Index(['line_id', 'label_info', 'full_log', 'log_message', 'template',
       'target', 'weight'],
      dtype='object')


Unnamed: 0,line_id,label_info,full_log,log_message,template,target,weight
0,1,-,1117838570 2005.06.03 r02-m1-n0-c:j12-u11 2005...,ras kernel info instruction cache parity error...,instruction cache parity error corrected,0,0.022309
1,1477,-,1117838840 2005.06.03 r27-m1-l3-u18-c 2005-06-...,ras linkcard info midplaneswitchcontroller per...,MidplaneSwitchController performing bit sparin...,0,0.000203
2,2735,-,1117839085 2005.06.03 r20-m1-n5-c:j17-u01 2005...,ras kernel info generating core.304,generating (.*),0,0.35947
3,4882,-,1117839710 2005.06.03 r16-m1-n2-c:j17-u01 2005...,ras kernel info 1 ddr errors(s) detected and c...,(.*) ddr errorss detected and corrected on ran...,0,0.007148
4,4883,-,1117839710 2005.06.03 r13-m0-na-c:j14-u01 2005...,ras kernel info 3450051 l3 edram error(s) (dcr...,(.*) L3 (.*) errors dcr (.*) detected and corr...,0,0.001048


In [24]:
get_user_prompt(df, df.loc[0], CFG1)

'expand and explain this log : "ras kernel info instruction cache parity error corrected"'

In [None]:
# run
for index, cfg in enumerate(CFG_list):
    request_logs(df, cfg, past_res=None)
    df.to_csv(f'data/chatgpt_explain_data/{cfg.exp_name}_NO.{index+1}.csv', index=False)
    # df.to_csv(f'out/{cfg.exp_name}_temp{cfg.temperature}_n{cfg.response_num}_freq{cfg.frequency_penalty}_pos{cfg.pos_num}_neg{cfg.neg_num}.csv', index=None)
    df.drop('responses', axis=1, inplace=True)