In [1]:
workdir = "/YOUR/LOCAL/DIR"
runname = "trials"


def model_name_normalize(model_name: str) -> str:
    return model_name.replace("/", "-").replace(" ", "-").replace(".", "-")

## CoT elicitation

In [None]:
from src.llm import query_llm_with_instructions_parallel
import os
import json

directory = os.path.join(workdir, "assets/instructions")

instruction_set = set()

for filename in os.listdir(directory):
    if filename.endswith(".json"):
        file_path = os.path.join(directory, filename)

        with open(file_path, "r", encoding="utf-8") as f:
            try:
                data = json.load(f)
                for item in data.get("instructions", []):
                    instruction = item.get("instruction", "").strip()
                    if instruction:
                        instruction_set.add(instruction)
            except json.JSONDecodeError:
                print(f"File {file_path} could not be parsed as JSON, skipped.")

instructions = list(instruction_set)

models = ["gpt-4o"]

gen_kwargs = {
    "temperature": 0.6,
    "top_p": 0.95,
}
output_dir_base = os.path.join(workdir, "assets/results", runname, "responses")

if not os.path.exists(output_dir_base):
    os.makedirs(output_dir_base, exist_ok=True)

for model in models:
    output_dir = os.path.join(output_dir_base)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    results = query_llm_with_instructions_parallel(
        model, instructions[:10], num_workers=32, output_dir_base=output_dir, **gen_kwargs
    )

## Habit Extraction

In [None]:
import json
import os
import re
import concurrent.futures
from collections import Counter, defaultdict
from src.tools import judge_habit_occurrence_by_showing_evidence_with_format_enforcement_system, habit_example_mapping

models = [
    "gpt-4o"
]

output_base_path = os.path.join(workdir, "assets/results", runname)
os.makedirs(output_base_path, exist_ok=True)
response_output_path = os.path.join(output_base_path, "responses")
if not os.path.exists(response_output_path):
    raise ValueError(f"Response output path {response_output_path} does not exist")
annotation_output_path = os.path.join(output_base_path, "annotations")
os.makedirs(annotation_output_path, exist_ok=True)
habit_count_dir = os.path.join(output_base_path, "habit_counts")
os.makedirs(habit_count_dir, exist_ok=True)


instructions_base = os.path.join(workdir, "assets/instructions")


def clean_habit_name(habit_key: str) -> str:
    return re.sub(r"[^a-zA-Z0-9_-]", "", habit_key.replace(" ", "_"))

# Build mapping from instructions to habits
instruction_to_habits = defaultdict(list)
for habit_key in habit_example_mapping.keys():
    clean_name = clean_habit_name(habit_key)
    file_path = os.path.join(instructions_base, f"{clean_name}.json")
    if not os.path.exists(file_path):
        print(f"Warning: Missing habit file {file_path}")
        continue
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
            for instruction_entry in data.get("instructions", []):
                instruction = instruction_entry["instruction"].strip()
                instruction_to_habits[instruction].append(habit_key)
    except Exception as e:
        print(f"Error processing {file_path}: {str(e)}")
        continue

def process_habit_for_model(r, model, instruction_to_habits, processed_keys, fout, global_counter):
    instr = r["instruction"].strip()
    thinking = r["thinking"][model].strip()
    related_habits = instruction_to_habits.get(instr, [])
    if not related_habits:
        return None

    results = []
    for habit in related_habits:
        uniq_key = (instr, habit)
        if uniq_key in processed_keys:
            return None
        
        try:
            llm_result = judge_habit_occurrence_by_showing_evidence_with_format_enforcement_system(thinking=thinking, habit=habit)
            is_refl = bool(llm_result.get("is_reflected", False))
            judge_flag = 1 if is_refl else 0
            if is_refl:
                global_counter[habit] += 1

            rec = {
                "instruction": instr,
                "thinking": r["thinking"],
                "llm_judge": llm_result,
                "judge": judge_flag,
                "habit": habit
            }
            results.append(rec)
            processed_keys.add(uniq_key)
        except Exception as e:
            print(f"Error judging habit {habit} for model {model}: {str(e)}")
            return None
    return results

def parallel_processing_for_model(model):
    input_file_path = f"{response_output_path}/{model_name_normalize(model)}.jsonl"
    os.makedirs(os.path.dirname(input_file_path), exist_ok=True)
    annotated_output_file = os.path.join(annotation_output_path, f"{model_name_normalize(model)}.jsonl")
    os.makedirs(os.path.dirname(annotated_output_file), exist_ok=True)

    # Load the original data
    results = []
    with open(input_file_path, "r", encoding="utf-8") as f:
        for line in f:
            try:
                result = json.loads(line.strip())
                if "thinking" in result and model in result["thinking"]:
                    thinking_str = result["thinking"][model]
                    if isinstance(thinking_str, str):
                        result["thinking"][model] = thinking_str.replace("\\", "\\\\")
                results.append(result)
            except json.JSONDecodeError as e:
                print(f"JSON decode error in {model_name_normalize(model)}: {e}")
                continue

    # Processed keys
    processed_keys = set()
    global_counter = Counter()

    if os.path.exists(annotated_output_file):
        with open(annotated_output_file, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    record = json.loads(line.strip())
                    instr = record.get("instruction", "").strip()
                    habit = record.get("habit", None)
                    judge = record.get("judge", None)
                    if habit is not None:
                        processed_keys.add((instr, habit))
                        if judge == 1:
                            global_counter[habit] += 1
                except Exception as e:
                    print(f"[WARN] Failed to load rec from {annotated_output_file}: {e}")

    fout = open(annotated_output_file, "a", encoding="utf-8")

    # Use ThreadPoolExecutor to process tasks in parallel
    num_total_to_process = 0
    num_skipped = 0
    num_processed_now = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
        futures = []
        for r in results:
            futures.append(executor.submit(process_habit_for_model, r, model, instruction_to_habits, processed_keys, fout, global_counter))
        
        for future in concurrent.futures.as_completed(futures):
            task_results = future.result()
            if task_results:
                num_processed_now += len(task_results)
                for rec in task_results:
                    fout.write(json.dumps(rec, ensure_ascii=False) + "\n")
                    fout.flush()

    fout.close()

    # Print the summary
    print(f"\n[{model}]")
    print(f"Total tasks related to habits: {num_total_to_process}")
    print(f"Skipped (already processed): {num_skipped}")
    print(f"Processed newly this run: {num_processed_now}")
    for habit, count in global_counter.items():
        print(f"{habit}: {count}")

    output_file = os.path.join(habit_count_dir, f"{model_name_normalize(model)}.json")
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(dict(global_counter), f, ensure_ascii=False, indent=2)

# Main driver to process all models
for model in models:
    parallel_processing_for_model(model)
