In [1]:
import torch
from unixcoder import UniXcoder

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UniXcoder("microsoft/unixcoder-base")
model.to(device)

def unixcoder_sim(text1,text2):
# Encode maximum function
    tokens_ids = model.tokenize([text1],max_length=512,mode="<encoder-only>")
    source_ids = torch.tensor(tokens_ids).to(device)
    tokens_embeddings,max_func_embedding = model(source_ids)
    
    # Encode minimum function
    tokens_ids = model.tokenize([text2],max_length=512,mode="<encoder-only>")
    source_ids = torch.tensor(tokens_ids).to(device)
    tokens_embeddings,min_func_embedding = model(source_ids)

    norm_max_func_embedding = torch.nn.functional.normalize(max_func_embedding, p=2, dim=1)
    norm_min_func_embedding = torch.nn.functional.normalize(min_func_embedding, p=2, dim=1)
    return torch.einsum("ac,bc->ab",norm_min_func_embedding,norm_max_func_embedding)


In [2]:
import pandas
import numpy as np
import os
import javalang
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge import Rouge 
import re
import numpy as np
from sklearn import metrics

In [3]:
def level_acc(classification_pred, classification_label) -> float:
    level_map = {'trace':0., 'debug':1., 'info':2., 'warn':3., 'error':4.}
    new_pred = []
    new_label = []
    length = len(classification_pred)
    for idx in range(length):
        predict = classification_pred[idx]
        label = classification_label[idx]
        if predict in level_map.keys() and label in level_map.keys():
            pred_sum = level_map[predict]
            label_sum = level_map[label]
            new_pred.append(pred_sum)
            new_label.append(label_sum)
    matches = sum(x == y for x, y in zip(new_pred, new_label))
    total_elements = len(new_pred)
    accuracy = matches / total_elements
    return accuracy

def query_level(level: float) -> str:
    if level == 1.:
        return 'trace'
    elif level == 2.:
        return 'debug'
    elif level == 3.:
        return 'info'
    elif level == 4.:
        return 'warn'
    elif level == 5.:
        return 'error'
    else:
        return ''
        
def aod(classification_pred, classification_label) -> float:
    level_map = {'trace':1., 'debug':2., 'info':3., 'warn':4., 'error':5.}
    max_distance = {'trace':4., 'debug':3., 'info':2., 'warn':3., 'error':4.}

    distance_sum = 0.
    noise = 0.
    length = len(classification_pred)
    
    for idx in range(length):
        try:
            predict = classification_pred[idx]
            label = classification_label[idx]
            pred_sum = level_map[predict]
            label_sum = level_map[label]
            level = query_level(label_sum)
            _distance = abs(label_sum - pred_sum)
            distance_sum = distance_sum + (1 - _distance / max_distance[level])
        except Exception as e:
            noise = noise+1
    aod = distance_sum / (length-noise)    
    return aod

def precision_recall_f1(gt, pd):
    intersection = len(gt.intersection(pd))
    
    if len(gt) == 0 and len(pd) == 0:
        precision = 1
        recall = 1
    elif len(gt) == 0:
        precision = intersection / len(pd)
        recall = 1
    elif len(pd) == 0:
        precision = 1
        recall = intersection / len(gt)
    else:
        precision = intersection / len(pd)
        recall = intersection / len(gt)
    
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    return precision, recall, f1



def tokenize_java_code(code):
    tokens = list(javalang.tokenizer.tokenize(code))
    return tokens

def is_java_string(token):
    return isinstance(token, javalang.tokenizer.String)

def get_list4bleu(java_code):
    tokenized_java_code = tokenize_java_code(java_code)
    my_list = []
    for token in tokenized_java_code:
        if is_java_string(token):
            my_list.extend(token.value.split())
        else:
            my_list.append(token.value)
    return my_list

from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

def calculate_bleu_scores(reference, candidate):
    reference = get_list4bleu(reference)
    candidate = get_list4bleu(candidate)
    smooth = SmoothingFunction().method1

    bleu1 = sentence_bleu([reference], candidate, weights=(1, 0, 0, 0), smoothing_function=smooth)
    bleu2 = sentence_bleu([reference], candidate, weights=(0.5, 0.5, 0, 0), smoothing_function=smooth)
    bleu3 = sentence_bleu([reference], candidate, weights=(0.33, 0.33, 0.33, 0), smoothing_function=smooth)
    bleu4 = sentence_bleu([reference], candidate, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smooth)

    return bleu1, bleu2, bleu3, bleu4


def extract_quoted_strings(s):
    quoted_strings = re.findall(r'"([^"]*)"', s)
    " ".join(quoted_strings)
    remaining = re.sub(r'"[^"]*"', '', s)
    char_to_remove = ['+', ',']
    for char in char_to_remove:
        remaining = remaining.replace(char, '')
    var_list_origin = remaining.split(' ')
    var_list = [item for item in var_list_origin if (not item == ' ')]
    var_list = [item for item in var_list if item]
    return quoted_strings, var_list

def extract_outer_brackets(s):
    stack = []
    result = []

    for m in re.finditer(r"[()]", s):
        char, pos = m.group(0), m.start(0)
        if char == "(":
            stack.append(pos)
        elif char == ")":
            if len(stack) == 1:
                result.append(s[stack.pop() + 1:pos])
            else:
                stack.pop()
    return result

def average_precision_recall_f1(gt_list, pd_list):
    total_precision, total_recall, total_f1 = 0, 0, 0
    n = len(gt_list)

    for gt, pd in zip(gt_list, pd_list):
        precision, recall, f1 = precision_recall_f1(gt, pd)
        total_precision += precision
        total_recall += recall
        total_f1 += f1

    avg_precision = total_precision / n
    avg_recall = total_recall / n
    avg_f1 = total_f1 / n

    return avg_precision, avg_recall, avg_f1

In [4]:
import glob


In [6]:
#code_whisperer
#tabnine
#codeGeex

path = "./codeGeex_function_result/"
ground_truth_folder = './LogBench-O_prefix_1point/'
def extract_numbers(s):
    return re.findall(r'\d+', s)

results = {}

def parse_directory(dir_path):
    for filename in os.listdir(dir_path):
        file_path = os.path.join(dir_path, filename)
        if os.path.isfile(file_path) and file_path.endswith('.java'):
            ground_truth_path = ground_truth_folder+file_path.split('/')[-1][:-5]+'_config.txt'
            with open(ground_truth_path) as f:
                    lines = f.readlines()
                    if len(lines) >= 1:
                        line_number = int(extract_numbers(lines[0].strip(' ')[:-1])[0])
            with open(file_path) as f:
                lines = f.readlines()
                if len(lines) >= 4:
                    results[file_path.split("/")[-1]] = lines[line_number-1].strip(' ')[:-2]
                else:
                    pass
        elif os.path.isdir(file_path):
            parse_directory(file_path)
parse_directory(path)

In [7]:
#incoder
import os
path = "./Incoder_function_level_what2log_1point_infill_result/"

results = {}

def parse_directory(dir_path):
    for filename in os.listdir(dir_path):
        file_path = os.path.join(dir_path, filename)
        if os.path.isfile(file_path) and file_path.endswith('.java'):
            with open(file_path) as f:
                lines = f.readlines()
                if len(lines)>0:
                    results[file_path.split("/")[-1]] = "log."+lines[0].strip(' ')[:-1]
        elif os.path.isdir(file_path):
            parse_directory(file_path)
parse_directory(path)

In [13]:
#copilot
path = "./Copilot_Function-copilot_result/"

results = {}

def parse_directory(dir_path):
    for filename in os.listdir(dir_path):
        file_path = os.path.join(dir_path, filename)
        if os.path.isfile(file_path) and file_path.endswith('.txt'):
            with open(file_path) as f:
                lines = f.readlines()
                if len(lines) > 4:
                    results[file_path.split("/")[-1][:-4]] = lines[3].strip(' ')[:-1]
                else:
                    continue
        elif os.path.isdir(file_path):
            parse_directory(file_path)
parse_directory(path)

In [21]:
#llama
path = "./results_llamas/llama2/"

results = {}

def parse_directory(dir_path):
    for filename in os.listdir(dir_path):
        file_path = os.path.join(dir_path, filename)
        if os.path.isfile(file_path) and file_path.endswith('.java'):
            with open(file_path) as f:
                lines = f.readlines()
                if len(lines) == 1:
                    results[file_path.split("/")[-1]] = lines[0].strip(' ').split('statement: ')[-1]
                elif len(lines) == 3:
                    results[file_path.split("/")[-1]] = lines[2].strip(' ').split('statement: ')[-1]
                else:
                    continue
        elif os.path.isdir(file_path):
            parse_directory(file_path)
parse_directory(path)

In [5]:
#StarCoder
path = "./StarCoder_LogBench-O_prefix_1point/"

results = {}

def parse_directory(dir_path):
    for filename in os.listdir(dir_path):
        file_path = os.path.join(dir_path, filename)
        if os.path.isfile(file_path) and file_path.endswith('.java'):
            with open(file_path) as f:
                lines = f.readlines()
                for line in lines:
                    if '<fim_middle>' in line:
                        if '<|endoftext|>' in line:
                            results[file_path.split("/")[-1]] ='log.' + line.strip().split('<fim_middle>')[-1].split(';')[0]
                        else:
                            results[file_path.split("/")[-1]] ='log.' + line.strip().split('<fim_middle>')[-1]
        elif os.path.isdir(file_path):
            parse_directory(file_path)
parse_directory(path)

In [9]:
len(results)

3760

In [10]:
ground_truth_folder = './LogBench-O_prefix_1point/'
generated_level = []
real_level = []
generated_var = []
real_var = []
generated_string_list = []
truth_string_list = []
bleu_score_list = []
rouge_list = []
ulqi_list = []
emb_list = []
for key, value in results.items():
    temp_dict = dict()
    try:
        string = extract_outer_brackets(value)[0]
        quoted_strings, remaining = extract_quoted_strings(string)
        quoted_strings = ' '.join(quoted_strings)
        ground_truth_path = ground_truth_folder+key[:-5]+'_config.txt'
        with open(ground_truth_path) as f:
                lines = f.readlines()
                if len(lines) >= 2:
                    ground_truth = lines[2].strip(' ')[:-1]
                    # print(ground_truth)
                    # print(value)
        real_string = extract_outer_brackets(ground_truth)[0]
        real_quoted_strings, real_remaining = extract_quoted_strings(real_string)
        real_quoted_strings = ' '.join(real_quoted_strings)    
  
        try:
            real_log_level = ground_truth.split('(')[0].split('.')[1]
            log_level = value.split('(')[0].split('.')[1]
        except:
            log_level = ''
            real_log_level = ''
        generated_var.append(set(remaining))
        real_var.append(set(real_remaining))
        generated_level.append(log_level)
        real_level.append(real_log_level)
        quoted_strings = quoted_strings.replace('"', '')
        real_quoted_strings = real_quoted_strings.replace('"', '')
        quoted_strings = quoted_strings.lower()
        real_quoted_strings = real_quoted_strings.lower()
        generated_string_list.append(quoted_strings)
        truth_string_list.append(real_quoted_strings)
        if quoted_strings == real_quoted_strings:
            bleu_score_list.append((1.0,1.0,1.0,1.0))
            rouge_list.append([{
                'rouge-1': {'f': 1.0, 'p': 1.0, 'r': 1.0},
                'rouge-2': {'f': 1.0, 'p': 1.0, 'r': 1.0},
                'rouge-l': {'f': 1.0, 'p': 1.0, 'r': 1.0}
            }])
            emb_list.append(1)
        else:
            bleu_score_list.append(calculate_bleu_scores(quoted_strings, real_quoted_strings))
            rouge = Rouge()
            scores = rouge.get_scores(quoted_strings, real_quoted_strings)
            rouge_list.append(scores)
            emb_list.append(unixcoder_sim(quoted_strings,real_quoted_strings))
            #print(emb_list[-1])
    except Exception as e:
        pass

In [11]:
len(bleu_score_list)

3536

In [13]:
score_mean =  sum(emb_list) / len(emb_list)

In [12]:
len(emb_list)

2146

In [14]:
score_mean

tensor([[0.5978]], device='cuda:0', grad_fn=<DivBackward0>)

In [None]:
#BLEU
def column_averages(arr):
    return [sum(col) / len(col) for col in zip(*arr)]

print(column_averages(bleu_score_list))

In [None]:
#ROUGE
rouge1_f_scores = []
rouge2_f_scores = []
rougeL_f_scores = []
for score in rouge_list:
    scores = score[0]
    rouge1_f_scores.append(scores['rouge-1']['f'])
    rouge2_f_scores.append(scores['rouge-2']['f'])
    rougeL_f_scores.append(scores['rouge-l']['f'])
rouge1_f_score_mean = sum(rouge1_f_scores) / len(rouge1_f_scores)
rouge2_f_score_mean = sum(rouge2_f_scores) / len(rouge2_f_scores)
rougeL_f_score_mean = sum(rougeL_f_scores) / len(rougeL_f_scores)

print("ROUGE-1 F-score mean:", rouge1_f_score_mean)
print("ROUGE-2 F-score mean:", rouge2_f_score_mean)
print("ROUGE-L F-score mean:", rougeL_f_score_mean)

In [None]:
#level
print("AOD:", aod(generated_level,real_level))
print("Accuracy:", level_acc(generated_level,real_level))

In [None]:
#var avg_precision, avg_recall, avg_f1

pre = average_precision_recall_f1(real_var,generated_var)[0]
recall = average_precision_recall_f1(real_var,generated_var)[1]
f1 = 2*(pre*recall)/(pre+recall)
print(pre)
print(recall)
print(f1)