In [1]:
from ipywidgets import Button, HBox, VBox, Output
from IPython.display import display, clear_output
import json
import os
import textwrap

# Tool for Manual Tag of Experiments

In [2]:
# Indexes of questions to tag
TAG_RANGE = [0,10] # Ramiro RC
# TAG_RANGE = [10,20] # Ramiro RC
# TAG_RANGE = [20,30] # Joaquin
# TAG_RANGE = [30,40] # Mauro
# TAG_RANGE = [40,50] # Ramiro Caso
# TAG_RANGE = [50,60] # Nicolas

# Select a model's responses to tag 
# MODEL_NAME = None
# MODEL_NAME = "Meta-Llama-3-3-70B-Instruct-AWQ-INT4"
# MODEL_NAME = "gpt-oss-20b"
MODEL_NAME = "Qwen3-30B-A3B-GPTQ-Int4"

SHOW_FULL_QUESTION = False

if MODEL_NAME is None:
    raise ValueError("Selecciona un modelo!")

### Seu-up

In [3]:
# Set paths
DATA_PATH = f'../data/implemented/{MODEL_NAME}/'
OUTPUT_FILE = os.path.join(DATA_PATH, "human_labels.jsonl")   # Where results are saved

# Load previously labeled entries (if any)
labeled_set = set()
if os.path.exists(OUTPUT_FILE):
    with open(OUTPUT_FILE, "r") as f:
        for line in f:
            try:
                entry = json.loads(line)
                key = (entry["dataset"], entry["dataset_task"], entry["line"])
                labeled_set.add(key)
            except json.JSONDecodeError:
                pass  # Ignore corrupted lines

# Load all jsonl entries from all files
jsonl_files = []
for dataset in os.listdir(DATA_PATH):
    if os.path.isdir(os.path.join(DATA_PATH, dataset)):
        print(dataset)
        jsonl_files += [[os.path.join(DATA_PATH, dataset, f), dataset, f] for f in os.listdir(os.path.join(DATA_PATH, dataset)) if f.endswith(".jsonl")]

# Prepare all iterations
all_entries = []
running_count = 0
process_already = 0
for fpath, fdataset, fname in jsonl_files:
    fname_strip = fname.split('_2025')[0].split('samples_')[-1]
    with open(fpath, 'r') as f:
        for i, line in enumerate(f):
            if i < TAG_RANGE[0] or i>=TAG_RANGE[1]:
                # Out of scope in this run
                continue
            skip = False
            for proc_dataset, proc_file, proc_line in labeled_set:
                if (fdataset == proc_dataset) and (fname_strip==proc_file) and (i == proc_line):
                    #already processed
                    process_already += 1
                    skip = True
                    break
            if not skip:
                try:
                    obj = json.loads(line)
                    all_entries.append((fdataset, fname, i, obj, running_count))
                    running_count+=1
                except json.JSONDecodeError:
                    print(f"Skipping invalid JSON in dataset {fdataset}, file {fname}, line {i}")
total_run = len(all_entries)

print(f"{process_already} are already tagged")

mmlu_chat
mmlu_pro_categories
babi_tasks
910 are already tagged


In [4]:
# Construct UI elements
out = Output()
btn_correct = Button(description="Correct", button_style="success")
btn_wrong = Button(description="Wrong", button_style="danger")
btn_refusal = Button(description="Refusal", button_style="danger")
btn_qst_mistake = Button(description="Question Mistake", button_style="danger")
btn_undo = Button(description="Undo", button_style="warning")
buttons_box = HBox([btn_correct, btn_wrong, btn_refusal, btn_qst_mistake, btn_undo])
main_box = VBox([buttons_box, out])

# Results and state
results = []
# entry_iter = iter(all_entries)
iter_idx = 0
current_entry = None

def save_entry_to_file(entry):
    with open(OUTPUT_FILE, "a") as f:
        f.write(json.dumps(entry) + "\n")

def show_next_entry():
    global current_entry
    # while iter_idx < len(all_entries):
    try:
        # current_entry = next(entry_iter)
        current_entry = all_entries[iter_idx]

        _, filename, line_number, obj, running_count = current_entry

        with out:
            clear_output(wait=True)
            print(f"Element {running_count+1} of {total_run}")
            print(f"File: {filename}\tQuestion: {line_number}")
            target = obj.get('target', None)
            if target is None:
                target = obj.get('answer', None)
            if target is None:
                raise ValueError(f"Target not found in: File: {filename}\tQuestion: {line_number}")
            extend_answer = None
            if obj['doc'].get('answer', None) is not None:
                extend_answer = obj['doc'].get('choices', None)
                if extend_answer is not None:
                    extend_answer = obj['doc']['choices'][obj['doc']['answer']]
            if extend_answer is None:
                if obj['doc'].get('answer_index', None) is not None:
                    extend_answer = obj['doc'].get('options', None)
                    if extend_answer is not None:
                        extend_answer = obj['doc']['options'][obj['doc']['answer_index']]
            question = obj['doc'].get('question', None)
            if question is None:
                raise ValueError(f"Question not found in: File: {filename}\tQuestion: {line_number}")
            print("------------------------------------------------------")
            if SHOW_FULL_QUESTION:
                lines = question.splitlines()
                wrapped = "\n".join(textwrap.fill(line, width=120) for line in lines)
                print(f"Question: {wrapped}")
            else:
                question = question.replace('\n', "")
                print(f"Question: {question}")
            print("------------------------------------------------------")
            if extend_answer is None:
                print(f"Target: {target}")
            else:
                print(f"Target: \"{target}\" {extend_answer}")
            lines = obj['resps'][0][0].splitlines()
            wrapped = "\n".join(textwrap.fill(line, width=120) for line in lines)
            print("------------------------------------------------------")
            print("Response:\n", wrapped)

    except IndexError:
        with out:
            clear_output()
            print("✅ All lines processed.")
            print("Final results:")
            for r in results:
                print(r)


def remove_last_line_from_file(path):
    """Remove the last non-empty line from a file safely."""
    with open(path, "rb+") as f:
        f.seek(0, os.SEEK_END)
        end = f.tell()
        if end == 0:
            return  # empty file

        pos = end - 1

        # Step 1: Skip trailing newlines at the end of file
        while pos >= 0:
            f.seek(pos)
            if f.read(1) != b"\n":
                break
            pos -= 1

        if pos < 0:
            f.truncate(0)  # file was only newlines
            return

        # Step 2: Walk backwards to find the newline before this line
        while pos >= 0:
            f.seek(pos)
            if f.read(1) == b"\n":
                break
            pos -= 1

        # Step 3: Truncate just after that newline
        f.truncate(pos + 1 if pos >= 0 else 0)


def undo_last(_):
    global iter_idx
    remove_last_line_from_file(OUTPUT_FILE)
    iter_idx -= 1
    show_next_entry()

def on_click(label):
    def handler(_):
        global iter_idx
        iter_idx += 1
        if current_entry:
            dataset_name, filename, line_number, _ , _ = current_entry
            result = {
                "dataset": dataset_name,
                "dataset_task": filename.split('_2025')[0].split('samples_')[-1],
                "line": line_number,
                "label": label
            }
            results.append(result)
            save_entry_to_file(result)
        show_next_entry()
    return handler

btn_correct.on_click(on_click("Correct"))
btn_wrong.on_click(on_click("Wrong"))
btn_refusal.on_click(on_click("Refusal"))
btn_qst_mistake.on_click(on_click("Question Mistake"))
btn_undo.on_click(undo_last)

# Run the UI

In [5]:
# Display interface once
display(main_box)
show_next_entry()


VBox(children=(HBox(children=(Button(button_style='success', description='Correct', style=ButtonStyle()), Butt…