In [1]:
import re
import ast
import math
import json
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from collections import Counter, deque
from nltk.translate.bleu_score import corpus_bleu
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# Compute group level code generation scores

In [2]:
def EM(codes_generated, codes_ground_truth):
    """
    Compute exact match rate for the given instances.
    """
    if codes_generated == codes_ground_truth:
        return 1
    else:
        return 0

def BLEU_corpus(codes_generated, codes_ground_truth):
    """
    Compute corpus-level BLEU score for the given instances.
    """
    all_candidates = []  # each will be a tokenized candidate code snippet
    all_references = []  # each will be a list containing one tokenized reference code snippet
    # Tokenize; this is a simplistic approach!
    candidate_tokens = codes_generated.split()
    reference_tokens = codes_ground_truth.split()
    all_candidates.append(candidate_tokens)
    # NLTK’s corpus_bleu expects each reference to be a list-of-lists of tokens
    # here we only have one reference (the ground truth)
    all_references.append([reference_tokens])
    # Calculate corpus-level BLEU
    bleu_score = corpus_bleu(all_references, all_candidates, weights=(0.5, 0.5, 0, 0))
    return bleu_score # [0, 1]

def BLEU_sentence(codes_generated, codes_ground_truth):
    """
    Compute sentence-level BLEU score for the given instances.
    """
    def tokenize_code(code_str):
        tokens = re.findall(r"[A-Za-z0-9_]+|\S", code_str)
        return tokens
    smooth_fn = SmoothingFunction().method4  # method4 is a common smoothing method
    
    # Tokenize; this is a simplistic approach!
    candidate_tokens = tokenize_code(codes_generated)
    reference_tokens = tokenize_code(codes_ground_truth)
    # In NLTK's BLEU, reference is a list-of-lists
    # here we only have one reference (the ground truth)
    references = [reference_tokens]
    # Calculate *sentence-level* BLEU with 1 reference
    score = sentence_bleu(
        references,               # list of references
        candidate_tokens,         # candidate tokens
        smoothing_function=smooth_fn  # method4 is a common smoothing method
    )
    return score # [0, 1]

##############################################################################
# 1) Simple Code Tokenizer
##############################################################################
def tokenize_code(code_str):
    """
    Splits code on boundaries between alphanumeric/underscore and non-alphanumeric.
    E.g., "def foo(x): return x+1" -> ["def", "foo", "(", "x", ")", ":", "return", "x", "+", "1"]
    """
    tokens = re.findall(r"[A-Za-z0-9_]+|\S", code_str)
    return tokens

##############################################################################
# 2) N-Gram BLEU (token-based), approximate from-scratch
##############################################################################
def ngram_counts(tokens, n):
    """
    Return a Counter of all n-grams of length `n` in `tokens`.
    Example: tokens = ['def','foo','(',')']
             n=2 -> Counter({('def','foo'):1, ('foo','('):1, ('(',')'):1})
    """
    counts = Counter()
    for i in range(len(tokens) - n + 1):
        ngram = tuple(tokens[i:i+n])
        counts[ngram] += 1
    return counts

def clipped_ngram_count(candidate_tokens, reference_tokens, n):
    """
    Compute clipped count = sum of min(candidate_count(ngram), reference_count(ngram)) over all n-grams.
    """
    cand_counter = ngram_counts(candidate_tokens, n)
    ref_counter  = ngram_counts(reference_tokens, n)
    clipped_sum = 0
    for ngram, cand_val in cand_counter.items():
        clipped_sum += min(cand_val, ref_counter.get(ngram, 0))
    return clipped_sum

def sentence_bleu_self(candidate_tokens, reference_tokens, max_n=4, smooth_eps=1e-14):
    """
    Approximate sentence-level BLEU (no brevity penalty in this simplified version),
    with basic smoothing to avoid zero counts.
    """
    # Precision for each n-gram
    precisions = []
    for n in range(1, max_n+1):
        numerator = clipped_ngram_count(candidate_tokens, reference_tokens, n)
        denominator = max(len(candidate_tokens) - n + 1, 1)
        # Add smoothing to avoid 0
        prec_n = (numerator + smooth_eps) / (denominator + smooth_eps)
        precisions.append(prec_n)

    # Geometric mean of precisions
    # BLEU = exp( (1/N) * sum(log p_n) ), N=4 typically
    if any(p == 0 for p in precisions):
        bleu = 0.0
    else:
        avg_log = sum(math.log(p) for p in precisions) / len(precisions)
        bleu = math.exp(avg_log)

    # Very naive length penalty if you want (optional).
    # Let's skip or do a minimal ratio:
    ratio = len(candidate_tokens) / (len(reference_tokens) + smooth_eps)
    if ratio < 1.0:
        # Simple brevity penalty
        bleu *= math.exp(1 - 1/ratio)

    return bleu

##############################################################################
# 3) Weighted N-Gram BLEU: example weighting by token type
##############################################################################
# A minimal set of Python keywords
PY_KEYWORDS = {
    "def", "class", "return", "for", "while", "if", "else", "elif", "try", "except",
    "with", "import", "as", "from", "pass", "break", "continue", "lambda", "yield",
    "raise", "in", "and", "or", "not", "is", "global", "del", "assert"
}

def token_type(token):
    """
    Rough classification:
    - "KEY" for Python keywords
    - "ID" for variable/function identifiers (alphanumeric + underscore, not keyword)
    - "NUM" if purely numeric
    - "OP" for punctuation/operators
    - "STR" if token starts/ends with quotes (naive check)
    """
    if token in PY_KEYWORDS:
        return "KEY"
    # naive string check
    if (token.startswith("'") and token.endswith("'")) or (token.startswith('"') and token.endswith('"')):
        return "STR"
    # numeric check
    if token.isdigit():
        return "NUM"
    # check if alphanumeric+_ (identifier-like)
    if re.match(r"^[A-Za-z0-9_]+$", token):
        return "ID"
    # otherwise operator/punctuation
    return "OP"

def weighted_ngram_bleu(candidate_tokens, reference_tokens, max_n=4, smooth_eps=1e-14):
    """
    Weighted BLEU: if an n-gram is "key" or "id" or "num", we can weight it differently.
    For simplicity, we do: KEY=1.2, ID=1.0, NUM=1.0, OP=0.8, STR=1.0
    Then we do a clipped count but accumulate those weights.
    """
    # weights for each token-type
    weight_map = {"KEY":1.2, "ID":1.0, "NUM":1.0, "OP":0.8, "STR":1.0}

    # We'll define a function to get the "type-weight product" for an n-gram
    def ngram_weight(ngram):
        # average or product? Let's do average of token-type weights
        # so the n-gram weight is an average of the token-type weights.
        tw_sum = 0.0
        for tok in ngram:
            tw_sum += weight_map.get(token_type(tok), 1.0)
        return tw_sum / len(ngram)

    # Count n-grams in candidate and reference
    cand_counts = ngram_counts(candidate_tokens, 1)  # for denominator
    precisions = []

    for n in range(1, max_n+1):
        cand_counter = ngram_counts(candidate_tokens, n)
        ref_counter  = ngram_counts(reference_tokens, n)
        # Weighted clipped sum
        weighted_clip_sum = 0.0
        total_candidate_ngrams = 0.0

        for ngram, cand_val in cand_counter.items():
            # how many times does it appear in cand vs ref?
            clip_amount = min(cand_val, ref_counter.get(ngram, 0))
            if clip_amount > 0:
                weighted_clip_sum += clip_amount * ngram_weight(ngram)
            total_candidate_ngrams += cand_val * ngram_weight(ngram)

        if total_candidate_ngrams < smooth_eps:
            prec_n = 0.0
        else:
            prec_n = (weighted_clip_sum + smooth_eps) / (total_candidate_ngrams + smooth_eps)
        precisions.append(prec_n)

    # geometric mean
    if any(p == 0 for p in precisions):
        wbleu = 0.0
    else:
        avg_log = sum(math.log(p) for p in precisions) / len(precisions)
        wbleu = math.exp(avg_log)

    # naive brevity penalty
    ratio = len(candidate_tokens) / (len(reference_tokens) + smooth_eps)
    if ratio < 1.0:
        wbleu *= math.exp(1 - 1/ratio)

    return wbleu

##############################################################################
# 4) Syntax (AST) Match: parse Python code + BFS of node types, do n-gram similarity
##############################################################################
def ast_node_bfs_types(root):
    """
    BFS traversal of AST node types (as strings).
    Returns a list of node type names, e.g. ["Module", "FunctionDef", "arguments", "arg", ...].
    """
    queue = deque([root])
    types = []
    while queue:
        node = queue.popleft()
        types.append(type(node).__name__)
        for child in ast.iter_child_nodes(node):
            queue.append(child)
    return types

def syntax_match_score(candidate_code, reference_code, smooth_eps=1e-14):
    """
    Parse code into Python AST. Then BFS node types. Then measure 1-gram to 4-gram overlap 
    (similar to BLEU). Return a float in [0..1].
    """
    try:
        cand_root = ast.parse(candidate_code)
        ref_root  = ast.parse(reference_code)
    except Exception:
        # If parse fails, return 0
        return 0.0

    cand_types = ast_node_bfs_types(cand_root)
    ref_types  = ast_node_bfs_types(ref_root)

    # We'll just reuse `sentence_bleu` logic on these "type tokens"
    return sentence_bleu(cand_types, ref_types, max_n=4, smooth_eps=smooth_eps)

##############################################################################
# 5) Data-Flow Match (naive variable "def" lines)
##############################################################################
class DefUseVisitor(ast.NodeVisitor):
    """
    Collect line info of variable definitions in Python code.
    e.g. for i in range(5): i is "defined" or assigned. 
    We'll store them as (varname, first_def_line).
    """
    def __init__(self):
        self.defs = {}

    def visit_Name(self, node):
        # If this name is being assigned, store line number if not present
        if isinstance(node.ctx, ast.Store):
            var = node.id
            if var not in self.defs:
                self.defs[var] = node.lineno
        self.generic_visit(node)

def dataflow_match_score(candidate_code, reference_code):
    """
    We do a naive approach: gather (var, line_of_def) for each code. 
    Then measure the overlap. Score ~ overlap / union in [0..1].
    """
    def get_var_defs(code):
        try:
            root = ast.parse(code)
        except:
            return set()
        v = DefUseVisitor()
        v.visit(root)
        # Return set of (var, line)
        return set(v.defs.items())

    cand_defs = get_var_defs(candidate_code)
    ref_defs  = get_var_defs(reference_code)

    if len(cand_defs) == 0 and len(ref_defs) == 0:
        return 1.0  # trivially no variables in both
    overlap = len(cand_defs.intersection(ref_defs))
    union  = len(cand_defs.union(ref_defs))
    return overlap / (union + 1e-14)

##############################################################################
# 6) Combine the 4 sub-scores into a final CodeBLEU
##############################################################################
def calc_codebleu(candidate_code, reference_code, alpha=0.25, beta=0.25, gamma=0.25, theta=0.25):
    """
    Returns a dict with sub-scores + final code_bleu in [0..1].
    """
    # 1) Tokenize
    cand_tokens = tokenize_code(candidate_code)
    ref_tokens  = tokenize_code(reference_code)
    # 2) n-gram BLEU
    ngram_score = sentence_bleu_self(cand_tokens, ref_tokens, max_n=4)
    # 3) Weighted n-gram BLEU
    wngram_score = weighted_ngram_bleu(cand_tokens, ref_tokens, max_n=4)
    # 4) Syntax (AST) match
    syntax_score = syntax_match_score(candidate_code, reference_code)
    # 5) Data-flow
    dataflow_score = dataflow_match_score(candidate_code, reference_code)
    # Weighted sum
    codebleu = (alpha*ngram_score 
               + beta*wngram_score
               + gamma*syntax_score
               + theta*dataflow_score)
    return {
        "ngram_match_score": ngram_score,
        "weighted_ngram_match_score": wngram_score,
        "syntax_match_score": syntax_score,
        "dataflow_match_score": dataflow_score,
        "code_bleu": codebleu
    }

"""
# Group level corpus BLEU score
from nltk.translate.bleu_score import corpus_bleu
from tqdm import tqdm

all_candidates = []  # each will be a tokenized candidate code snippet
all_references = []  # each will be a list containing one tokenized reference code snippet
for i in tqdm(range(instances_num), desc="BLEU scoring..."):
    # Find index of highest raw score
    best_index = np.argmax(scores_raw_all[i])

    candidate_str = codes_generated_all[i]
    reference_str = codes_ground_truth_all[i]

    # Tokenize; this is a simplistic approach!
    candidate_tokens = candidate_str.split()
    reference_tokens = reference_str.split()

    all_candidates.append(candidate_tokens)
    # NLTK corpus_bleu expects each reference to be a list-of-lists of tokens
    all_references.append([reference_tokens])

# Calculate corpus-level BLEU
bleu_score = corpus_bleu(all_references, all_candidates)
print("Corpus-level BLEU score:", bleu_score)
"""

In [None]:
k_all = [0, 5, 10, 20, 40]
EM_k_all = np.zeros((len(k_all), 200))
BLEU_corpus_k_all = np.zeros((len(k_all), 200))
BLEU_sentence_k_all = np.zeros((len(k_all), 200))
CodeBLEU_k_all = np.zeros((len(k_all), 200))
for k in range(len(k_all)):
    # This cell loads the generated code from SCODE_G with the codexglue dataset
    top_k = 1
    instances_num = 200
    scores_raw_all = np.zeros((instances_num, top_k))
    codes_generated_all = [] # list of instances_num lists, each list has top_k items
    codes_ground_truth_all = [] # list of instances_num code strings
    row = 0
    with open("/Users/dunhan/Desktop/AdvancedNLP/retriever/fancy_retriever/eval/cxg200_Qwen1M_top" + str(k_all[k]) + ".json") as f:
            data = json.load(f)
            for i in range(len(data)):
                codes_ground_truth_all.append(data[i]["ground_truth_code"])
                scores_raw_all[row] = np.array([100.0])
                codes_generated_all.append(data[i]["generated_code"])
                row += 1
                if row == instances_num:
                    break
    EM_all = []
    BLEU_corpus_all = []
    BLEU_sentence_all = []
    CodeBLEU_all = []
    for i in range(instances_num):
        EM_all.append(EM(codes_generated_all[i], codes_ground_truth_all[i]))
        BLEU_corpus_all.append(BLEU_corpus(codes_generated_all[i], codes_ground_truth_all[i]))
        BLEU_sentence_all.append(BLEU_sentence(codes_generated_all[i], codes_ground_truth_all[i]))
        CodeBLEU_all.append(calc_codebleu(codes_generated_all[i], codes_ground_truth_all[i])["code_bleu"])
    EM_k_all[k, :] = np.array(EM_all)
    BLEU_corpus_k_all[k, :] = np.array(BLEU_corpus_all)
    BLEU_sentence_k_all[k, :] = np.array(BLEU_sentence_all)
    CodeBLEU_k_all[k, :] = np.array(CodeBLEU_all)
    print("EM: ", 100 * np.mean(EM_all))
    print("BLEU_corpus: ", 100 * np.mean(BLEU_corpus_all))
    print("BLEU_sentence: ", 100 * np.mean(BLEU_sentence_all))
    print("CodeBLEU: ", 100 * np.mean(CodeBLEU_all))

In [None]:
converge = np.zeros((200))
for i in range(200):
    for a in range(5):
        if BLEU_corpus_k_all[a, i] >= 0.98 * BLEU_corpus_k_all[:, i].max():
            break
    for b in range(5):
        if BLEU_sentence_k_all[b, i] >= 0.98 * BLEU_sentence_k_all[:, i].max():
            break
    for c in range(5):
        if CodeBLEU_k_all[c, i] >= 0.98 * CodeBLEU_k_all[:, i].max():
            break
    converge[i] = np.round(np.mean([a, b, c]))
print("converge in top_0:", np.sum(converge == 0)/2, "%")
print("converge in top_5:", np.sum(converge == 1)/2, "%")
print("converge in top_10:", np.sum(converge == 2)/2, "%")
print("converge in top_20:", np.sum(converge == 3)/2, "%")
print("converge in top_40:", np.sum(converge == 4)/2, "%")
np.save("/Users/dunhan/Desktop/AdvancedNLP/retriever/fancy_retriever/eval/converge.npy", converge)

In [None]:
prompt_all = []
for i in range(200):
    with open("/Users/dunhan/Desktop/AdvancedNLP/retriever/fancy_retriever/eval/cxg200_Qwen1M_top0.json") as f:
        data = json.load(f)
        for i in range(len(data)):
            prompt_all.append(data[i]["text_prompt"])

In [None]:
print("easy questions:")
for i in range(200):
    if converge[i] <= 1: # converge in top_0 or top_5
        print(i)
        print(prompt_all[i])

In [None]:
print("medium questions:")
for i in range(200):
    if converge[i] == 2 or converge[i] == 3: # converge in top_10 or top_20
        print(i)
        print(prompt_all[i])

In [None]:
print("hard questions:")
for i in range(200):
    if converge[i] == 4: # converge in top_40
        print(i)
        print(prompt_all[i])

In [None]:
# Data for visualization
x_labels = ['top0', 'top5', 'top10', 'top20', 'top40']
x = range(len(x_labels))

# Scores for each attribute
EM = [] # add your EM scores here
Corpus_BLEU = [] # add your Corpus_BLEU scores here
Sentence_BLEU = [] # add your Sentence_BLEU scores here
CodeBLEU = [] # add your CodeBLEU scores here
# Plotting
plt.figure(figsize=(8, 5))
plt.plot(x, EM, label='EM', color='blue', marker='o')
plt.plot(x, Corpus_BLEU, label='Corpus_BLEU', color='green', marker='s')
plt.plot(x, Sentence_BLEU, label='Sentence_BLEU', color='red', marker='^')
plt.plot(x, CodeBLEU, label='CodeBLEU', color='orange', marker='d')
# Formatting
plt.xticks(x, x_labels)
plt.xlabel('Top-k Retrievals')
plt.ylabel('Score')
plt.title('SCODE_G Top-200 on CodeXGLUE')
plt.legend()
plt.grid(True)

# Display the chart
plt.show()