In [1]:
from google.colab import drive
drive.mount('/content/drive')
!pip install transformers
!pip install datasets
!huggingface-cli login

KeyboardInterrupt: 

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from datasets import load_dataset
import pandas as pd
import os
from tqdm import tqdm
import traceback

# Load tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-1B-Instruct")
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-3.2-1B-Instruct")
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

# Load MBPP dataset
ds = load_dataset("google-research-datasets/mbpp", "full")
test_data = ds["test"]

# Debugging, testing on single row

In [None]:
# For example, select a single sample from the MBPP test dataset.
# (Adjust the index as needed.)
sample = test_data[1]
prompt = sample["text"]       # The task prompt (or question)
reference = sample["code"]    # The reference solution/code
test_list = sample.get("test_list", [])  # List of tests (if present)

# Optional: Print these out for confirmation.
print("=== Defined Variables ===")
print("Task ID:", sample["task_id"])
print("Prompt:\n", prompt)
print("\nReference Code:\n", reference)
print("\nTest List:", test_list)

=== Defined Variables ===
Task ID: 12
Prompt:
 Write a function to sort a given matrix in ascending order according to the sum of its rows.

Reference Code:
 def sort_matrix(M):
    result = sorted(M, key=sum)
    return result

Test List: ['assert sort_matrix([[1, 2, 3], [2, 4, 5], [1, 1, 1]])==[[1, 1, 1], [1, 2, 3], [2, 4, 5]]', 'assert sort_matrix([[1, 2, 3], [-2, 4, -5], [1, -1, 1]])==[[-2, 4, -5], [1, -1, 1], [1, 2, 3]]', 'assert sort_matrix([[5,8,9],[6,4,3],[2,1,4]])==[[2, 1, 4], [6, 4, 3], [5, 8, 9]]']


# Improved Iteration Testing

# With Pass@k Interpretation

In [None]:
import re
import traceback
import os
import json
import time
import math
import torch
import ast
import signal
from contextlib import contextmanager
from transformers import AutoTokenizer, AutoModelForCausalLM
from datasets import load_dataset
import pandas as pd
from tqdm import tqdm

@contextmanager
def time_limit(seconds):
    """
    Raises TimeoutError if the code block inside the context
    takes more than `seconds` to execute.
    """
    def signal_handler(signum, frame):
        raise TimeoutError("Code execution timed out.")
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)


def truncate_to_function_body(snippet: str, func_name: str) -> str:
    """
    Given a code snippet (triple-backtick block) and a function name (func_name),
    try to extract just the definition of `def func_name(...):` plus its body lines,
    ignoring any trailing lines that are outside or after the function.

    Raises ValueError if we can't find such a function or still can't parse it.
    """
    # Find the line that says: def <func_name>(...):
    pattern = rf"^(\s*)def\s+{re.escape(func_name)}\s*\(.*?\):"
    lines = snippet.splitlines()
    func_start_idx = None
    base_indent = None

    for idx, line in enumerate(lines):
        m = re.match(pattern, line)
        if m:
            func_start_idx = idx
            # The indentation level of the 'def' line
            base_indent = len(m.group(1).replace('\t', '    '))
            break

    if func_start_idx is None:
        raise ValueError(f"Could not find 'def {func_name}(...):' in snippet.")

    # Collect lines from func_start_idx forward while they are indented or blank.
    # Stop if we see another top-level "def" with the same or lesser indentation
    # or if we run out of lines.
    collected = []
    collected.append(lines[func_start_idx])

    for i in range(func_start_idx + 1, len(lines)):
        line = lines[i]
        # If we see a top-level "def" or class at the same indentation, stop:
        #    e.g. ^(\s*)def ...
        next_def_m = re.match(r"^(\s*)def\s+\w+\s*\(.*?\):", line)
        if next_def_m:
            # Compare indentation
            next_def_indent = len(next_def_m.group(1).replace('\t','    '))
            if next_def_indent <= base_indent:
                # It's a new top-level function -> stop collecting
                break

        # If it looks like a statement with indentation at least base_indent+4,
        # or it might be blank, we keep it. If it's unindented or too shallow, we stop.
        # We'll do a small check: actual indentation is number of leading spaces
        leading_spaces = len(line) - len(line.lstrip(' '))  # only counting spaces, ignoring tabs
        if line.strip() == '':
            # Blank lines are allowed
            collected.append(line)
        elif leading_spaces > base_indent:
            collected.append(line)
        else:
            # We found a line that is not sufficiently indented -> top-level code or code for another function
            break

    truncated_snippet = "\n".join(collected)

    # Test parse
    try:
        ast.parse(truncated_snippet)
    except SyntaxError as e:
        raise ValueError(f"Truncated function body still not parseable: {e}")

    return truncated_snippet


def fallback_extract_definition(generated_text: str, func_name: str) -> str:
    """
    Fallback: If no triple-backtick code blocks are valid, attempt to extract
    a function definition matching `def <func_name>( ... ):` from the *raw*
    generation, plus its body lines (until next top-level 'def' or no more lines).

    Raises ValueError if not found or not parseable.
    """
    pattern = rf"(def\s+{re.escape(func_name)}\s*\(.*?\):)"
    match = re.search(pattern, generated_text)
    if not match:
        raise ValueError(f"No 'def {func_name}(...)' line found in fallback search.")

    start_idx = match.start()
    # We'll parse from that line forward:
    relevant_text = generated_text[start_idx:]

    # Now do the same approach as `truncate_to_function_body` but on relevant_text
    # because we might have trailing code that breaks parse.
    truncated_snippet = truncate_to_function_body(relevant_text, func_name)
    return truncated_snippet


def extract_valid_code_block(
    generated_text: str,
    fallback_func_name: str = ""
) -> str:
    """
    Extracts *all* triple-backtick code blocks from the text.
    For each block in order:
      1) Try `ast.parse(block)` directly.
      2) If that fails, look for the function named fallback_func_name in that block,
         extract just that function's body, try parse again.
      3) If that also fails, skip and move to the next code block.

    If no block is valid, we try the fallback approach on the *raw* text:
      - Searching for `def <fallback_func_name>(...):` and isolating its body.

    Raises ValueError if nothing works.
    """
    pattern = r"```(.*?)```"
    blocks = re.findall(pattern, generated_text, flags=re.DOTALL)

    if blocks:
        for block in blocks:
            candidate = block.strip()

            # Some model outputs start with "python", "bash", etc. on the first line
            lines = candidate.split("\n", 1)
            if len(lines) == 2:
                first_line = lines[0].lower().strip()
                if first_line in ["python", "python3", "bash", "shell", "sql", "sh"]:
                    candidate = lines[1].strip()

            # Skip empty or pure language name blocks
            lower_candidate = candidate.lower()
            if not candidate or lower_candidate in ["python", "python3", "bash", "shell", "sql", "sh"]:
                continue

            # 1) Try parse as-is
            try:
                ast.parse(candidate)
                # If parse works, we accept it
                return candidate
            except SyntaxError:
                # 2) If there's an expected function name, try to isolate it from that snippet
                if fallback_func_name:
                    try:
                        truncated = truncate_to_function_body(candidate, fallback_func_name)
                        return truncated
                    except ValueError:
                        pass
                # If that also fails, we move on to next block

    # If we get here, no code block parsed successfully
    # Attempt the fallback approach in the *full* generated_text
    if fallback_func_name:
        try:
            fallback_snippet = fallback_extract_definition(generated_text, fallback_func_name)
            return fallback_snippet
        except ValueError as e:
            raise ValueError(f"No parseable triple-backtick block, fallback also failed. Reason: {e}")
    else:
        raise ValueError("No parseable triple-backtick block found, no fallback function name provided.")


def extract_expected_signature(text):
    """
    Extracts the function name and parameters from a Python function signature.
    Example: "def first_repeated_char(str1): ..." returns ("first_repeated_char", "str1")
    """
    pattern = r"def\s+(\w+)\s*\((.*?)\):"
    match = re.search(pattern, text)
    if match:
        return match.group(1), match.group(2)
    return None, None


def run_tests_on_extracted_code(expected_func_name, generated_text, test_list):
    """
    Executes the extracted code and runs test cases.
    """
    local_namespace = {}
    test_details = []

    # 1) Extract
    try:
        code_to_execute = extract_valid_code_block(generated_text, fallback_func_name=expected_func_name)
    except ValueError as e:
        error_msg = f"No valid code block found: {str(e)}"
        return False, [False]*len(test_list), error_msg, []

    print("=== DEBUG: Extracted/Chosen code block (or fallback) ===")
    print(code_to_execute)
    print("=========================================================")

    # 2) We already know it likely parses, but do final parse check:
    try:
        ast.parse(code_to_execute)
    except SyntaxError as e:
        error_msg = f"Code execution error (syntax invalid): {e}\n{traceback.format_exc()}"
        return False, [False]*len(test_list), error_msg, []

    # 3) Execute with time limit
    try:
        with time_limit(5):
            exec(code_to_execute, {}, local_namespace)
    except TimeoutError as e:
        error_msg = f"Code execution error (timeout): {e}"
        return False, [False]*len(test_list), error_msg, []
    except Exception as e:
        error_msg = f"Code execution error: {e}\n{traceback.format_exc()}"
        return False, [False]*len(test_list), error_msg, []

    # 4) If the function name is missing, try to rename from the single callable
    if expected_func_name and expected_func_name not in local_namespace:
        callables = [(n, obj) for n, obj in local_namespace.items() if callable(obj)]
        if len(callables) == 1:
            candidate_name, candidate_obj = callables[0]
            local_namespace[expected_func_name] = candidate_obj
        else:
            # If multiple callables or zero, no easy fix
            for n, obj in callables:
                if expected_func_name.lower() in n.lower() or n.lower() in expected_func_name.lower():
                    local_namespace[expected_func_name] = obj
                    break

    # 5) Evaluate each test
    test_pass_list = []
    for test_code in test_list:
        info = {
            "test": test_code,
            "passed": False,
            "expected": None,
            "actual": None,
            "error": None
        }

        # If it's `assert X == Y`
        m = re.match(r'assert\s+(.+?)\s*==\s*(.+)', test_code)
        if m:
            lhs_expr = m.group(1).strip()
            rhs_expr = m.group(2).strip()
            try:
                with time_limit(5):
                    lhs_val = eval(lhs_expr, {}, local_namespace)
                    rhs_val = eval(rhs_expr, {}, local_namespace)
                info["actual"] = lhs_val
                info["expected"] = rhs_val
                if lhs_val == rhs_val:
                    info["passed"] = True
                else:
                    info["error"] = (
                        f"AssertionError: {lhs_expr} = {lhs_val} != {rhs_expr} = {rhs_val}"
                    )
            except TimeoutError:
                info["error"] = "Test execution timed out."
            except Exception as e:
                info["error"] = f"Error evaluating expressions: {e}"
        else:
            # Possibly "assert condition" or some snippet
            try:
                with time_limit(5):
                    exec(test_code, {}, local_namespace)
                info["passed"] = True
            except TimeoutError:
                info["error"] = "Test execution timed out."
            except AssertionError as e:
                info["error"] = f"AssertionError: {str(e)}"
            except Exception as e:
                info["error"] = f"Exception during test: {e}"

        test_pass_list.append(info["passed"])
        test_details.append(info)

    all_passed = all(test_pass_list)
    return all_passed, test_pass_list, "", test_details


# ---------------------- Main code + model inference ---------------------------

system_prompt = (
    "You are a Python code generator.\n"
    "Return exactly one code block in triple backticks.\n"
    "No explanation, no extra text, just the function implementation.\n"
    "DO NOT EVER WRITE CODE PROMPTING FOR AN USER INPUT (LIKE DO NOT USE 'input()'), read in information from the function parameters.\n"
    "Inside that code block, define the function as requested.\n"
    "Do not look to test the implementation.\n"
    "All other text will be discarded."
)

def main():
    # Load your model and tokenizer
    tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-1B-Instruct")
    model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-3.2-1B-Instruct")
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)

    # Load MBPP dataset
    ds = load_dataset("google-research-datasets/mbpp", "full")
    test_data = ds["test"]

    output_dir = "/content/drive/MyDrive/llama_results"
    os.makedirs(output_dir, exist_ok=True)

    results = []
    batch_size = 10

    pass_at_k_totals = {1: [], 5: [], 10: []}
    tpr_list = []

    total_gen_time = 0.0
    total_tokens_generated = 0
    num_samples = 10  # number of samples per task

    # If CUDA, measure memory before
    if torch.cuda.is_available():
        mem_before = torch.cuda.memory_allocated(device)
    else:
        mem_before = None

    # For demonstration, process the first N tasks
    test_samples = test_data.select(range(100))

    for idx, sample in enumerate(tqdm(test_samples, desc="Processing first 10 test samples")):
        prompt = sample["text"]
        reference = sample["code"]
        test_list = sample.get("test_list", [])

        # Extract function signature from reference
        expected_func_name, expected_params = extract_expected_signature(reference)
        if expected_func_name:
            expected_signature_str = f"def {expected_func_name}({expected_params}):"
        else:
            expected_signature_str = ""

        # Build user prompt
        user_prompt = (
            f"Task:\n{prompt}\n"
            f"Required function signature:\n{expected_signature_str}\n\n"
            "Remember: only valid Python code in triple backticks."
        )

        # Combine strict system prompt + user prompt
        full_prompt = f"{system_prompt}\n\nUser request:\n{user_prompt}"

        # Generate code samples
        inputs = tokenizer(full_prompt, return_tensors="pt").to(device)
        generation_config = dict(
            max_new_tokens=512,
            temperature=0.8,
            top_p=1.0,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id,
            eos_token_id=tokenizer.eos_token_id,
            num_return_sequences=num_samples,
        )

        start_time = time.time()
        with torch.no_grad():
            output_ids = model.generate(**inputs, **generation_config)
        gen_time = time.time() - start_time
        total_gen_time += gen_time

        # Decode the multiple outputs
        if num_samples == 1:
            predictions = [tokenizer.decode(output_ids[0], skip_special_tokens=True)]
        else:
            predictions = [
                tokenizer.decode(ids, skip_special_tokens=True)
                for ids in output_ids
            ]

        # Count tokens
        total_tokens_generated += sum(len(tokenizer.tokenize(pred)) for pred in predictions)

        # Evaluate each prediction
        correct_flags = []
        tpr_scores = []
        prediction_details = []

        for pred in predictions:
            all_passed, pass_list, load_error, test_detail = run_tests_on_extracted_code(
                expected_func_name, pred, test_list
            )
            correct_flags.append(all_passed)
            tpr = sum(pass_list) / len(pass_list) if pass_list else 0.0
            tpr_scores.append(tpr)
            prediction_details.append({
                "prediction": pred,
                "all_passed": all_passed,
                "pass_list": pass_list,
                "test_details": test_detail,
                "load_error": load_error
            })

        c = sum(correct_flags)
        n = num_samples

        # Compute pass@k
        task_pass_at_k = {}
        for k in [1, 5, 10]:
            if n < k:
                task_pass_at_k[k] = 1.0 if c > 0 else 0.0
            else:
                try:
                    comb_n = math.comb(n, k)
                    comb_n_minus_c = math.comb(n - c, k) if (n - c) >= k else 0
                    pass_at_k_value = 1 - comb_n_minus_c / comb_n
                except Exception:
                    pass_at_k_value = 0.0
                task_pass_at_k[k] = pass_at_k_value

        best_tpr = max(tpr_scores) if tpr_scores else 0.0
        tpr_list.append(best_tpr)
        for k in [1, 5, 10]:
            pass_at_k_totals[k].append(task_pass_at_k[k])

        best_idx = tpr_scores.index(best_tpr) if tpr_scores else 0
        best_pred = prediction_details[best_idx] if prediction_details else {}

        # Gather results
        results.append({
            "task_id": sample["task_id"],
            "prompt": prompt,
            "reference": reference,
            "expected_signature": expected_signature_str,
            "final_prompt": full_prompt,
            "best_prediction": best_pred.get("prediction", ""),
            "all_tests_passed": int(best_pred.get("all_passed", False)),
            "individual_test_results": best_pred.get("pass_list", []),
            "test_details": json.dumps(best_pred.get("test_details", []), indent=2),
            "load_error": best_pred.get("load_error", ""),
            "correct_count": c,
            "tpr": best_tpr,
            "pass@1": task_pass_at_k[1],
            "pass@5": task_pass_at_k[5],
            "pass@10": task_pass_at_k[10],
            "generation_time": gen_time,
            "num_tokens": sum(len(tokenizer.tokenize(pred)) for pred in predictions),
        })

        # Save partial CSV every batch_size tasks
        if (idx + 1) % batch_size == 0:
            df_batch = pd.DataFrame(results)
            batch_csv_path = os.path.join(output_dir, f"results_{idx + 1}.csv")
            df_batch.to_csv(batch_csv_path, index=False)
            print(f"Saved batch results to {batch_csv_path}")

    # Save final CSV
    df_final = pd.DataFrame(results)
    final_csv_path = os.path.join(output_dir, f"results_final_{len(test_samples)}.csv")
    df_final.to_csv(final_csv_path, index=False)
    print(f"Saved final results to {final_csv_path}")

    # Compute overall metrics
    overall_pass_at_1 = (
        sum(pass_at_k_totals[1]) / len(pass_at_k_totals[1])
        if pass_at_k_totals[1] else 0.0
    )
    overall_pass_at_5 = (
        sum(pass_at_k_totals[5]) / len(pass_at_k_totals[5])
        if pass_at_k_totals[5] else 0.0
    )
    overall_pass_at_10 = (
        sum(pass_at_k_totals[10]) / len(pass_at_k_totals[10])
        if pass_at_k_totals[10] else 0.0
    )
    atpr = sum(tpr_list) / len(tpr_list) if tpr_list else 0.0
    throughput = total_tokens_generated / total_gen_time if total_gen_time > 0 else 0.0

    # Memory overhead if CUDA
    if torch.cuda.is_available():
        mem_after = torch.cuda.memory_allocated(device)
        memory_overhead = mem_after - mem_before if mem_before is not None else None
    else:
        memory_overhead = None

    print("=== Metrics Summary ===")
    print(f"Pass@1: {overall_pass_at_1:.2f}")
    print(f"Pass@5: {overall_pass_at_5:.2f}")
    print(f"Pass@10: {overall_pass_at_10:.2f}")
    print(f"Average Test Pass Rate (ATPR): {atpr:.2f}")
    print(f"Throughput (tokens/sec): {throughput:.2f}")
    if memory_overhead is not None:
        print(f"Memory Overhead (bytes): {memory_overhead}")

    # Save summary
    summary_path = os.path.join(output_dir, f"summary_{len(test_samples)}.txt")
    with open(summary_path, "w") as f:
        f.write("=== Metrics Summary ===\n")
        f.write(f"Pass@1: {overall_pass_at_1:.2f}\n")
        f.write(f"Pass@5: {overall_pass_at_5:.2f}\n")
        f.write(f"Pass@10: {overall_pass_at_10:.2f}\n")
        f.write(f"Average Test Pass Rate (ATPR): {atpr:.2f}\n")
        f.write(f"Throughput (tokens/sec): {throughput:.2f}\n")
        if memory_overhead is not None:
            f.write(f"Memory Overhead (bytes): {memory_overhead}\n")
    print(f"Summary saved to {summary_path}")


if __name__ == "__main__":
    main()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/46.4k [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/4.24M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.5M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/636 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/818 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/24.2k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/3 [00:00<?, ?it/s]

model-00001-of-00003.safetensors:   0%|          | 0.00/4.99G [00:00<?, ?B/s]

model-00002-of-00003.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00003-of-00003.safetensors:   0%|          | 0.00/481M [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/168 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/6.52k [00:00<?, ?B/s]

ValueError: BuilderConfig 'full' not found. Available: ['openai_humaneval']