In [1]:
import os
import sys


sys.path.append(os.path.abspath(os.path.join(os.path.dirname("__file__"), "src")))


In [2]:
import torch
import numpy as np

env_list = [
        "button-press-topdown-v2",
        "box-close-v2",
        "dial-turn-v2",
        "sweep-v2",
        "button-press-topdown-wall-v2",
        "sweep-into-v2",
        "drawer-open-v2",
        "lever-pull-v2",
]

pair_algo_list = [
    "ternary-500-aug-10000-conf-0.999",
    # "ternary-500-aug-10000-conf-0.99",
    # "ternary-500-aug-10000-conf-0.8",
    "ternary-500-aug-10000-uncert-3.1",
    # "ternary-500-aug-10000-bucket-20-conf-0.999",
    "ternary-500-aug-10000-bucket-knn-ratio-10-20-conf-0.999",
    "ternary-500-aug-10000-bucket-knn-ratio-10-20-uncert-3.1",
]

pair_algo = "ternary-500"
reward_model_algo = "MR-linear"

os.environ["CUDA_VISIBLE_DEVICES"] = "0" 
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")

TRAJECTORY_LENGTH = 25

In [3]:
from data_generation.data_research import predict_rewards
from data_loading.load_data import load_dataset


def predict_only_rewards(
    env_name,
    exp_name,
    pair_algo,
    reward_model_algo,
):
    result = predict_rewards(
        env_name=env_name,
        exp_name=exp_name,
        pair_algo=pair_algo,
        reward_model_algo=reward_model_algo,
    )

    pred_reward_list = [r for (_, r, _, _) in result]
    pred_reward_list = np.array(pred_reward_list)

    return pred_reward_list


def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def get_total_reward(s, e, reward_cumsum):
    return reward_cumsum[e - 1] - (reward_cumsum[s - 1] if s > 0 else 0)

In [4]:
from matplotlib import gridspec, pyplot as plt
import numpy as np
import matplotlib.pyplot as plt
from data_loading.load_data import load_pair

def eval_feedbacks(feedbacks, env_name, unlabel_pair="unlabel-100000"):
    unlabel_pairs = load_pair(
        env_name=env_name,
        exp_name="CUDA-01-00",
        pair_type="train",
        pair_algo=unlabel_pair,
    )

    dataset = load_dataset(env_name=env_name)
    cumsum = np.cumsum(dataset["rewards"], dtype=np.float64)

    raw_traj_truth_rewards = []
    for p in unlabel_pairs:
        (s0, e0), (s1, e1), _ = p
        raw_traj_truth_rewards.append(get_total_reward(s0, e0, cumsum))
        raw_traj_truth_rewards.append(get_total_reward(s1, e1, cumsum))

    traj_truth_rewards = []
    total = correct = 0
    for (s0, e0), (s1, e1), mu in feedbacks:
        r0 = get_total_reward(s0, e0, cumsum)
        r1 = get_total_reward(s1, e1, cumsum)
        if mu != 0.5:
            total += 1
            if (mu == 1.0 and r0 < r1) or (mu == 0.0 and r0 > r1):
                correct += 1
        traj_truth_rewards.extend([(r0, r1), (r1, r0)])

    accuracy = correct / total if total > 0 else 0.0

    bins = 25
    bin_edges = np.linspace(0, 250, bins + 1)
    r0_list = [r0 for r0, _ in traj_truth_rewards]
    r1_list = [r1 for _, r1 in traj_truth_rewards]
    observed, _, _ = np.histogram2d(r0_list, r1_list, bins=[bin_edges, bin_edges])
    observed = observed.T

    traj_hist, _ = np.histogram(raw_traj_truth_rewards, bins=bin_edges)
    traj_prob = traj_hist / np.sum(traj_hist)
    expected = np.outer(traj_prob, traj_prob) * np.sum(observed)

    eps = 1e-8
    P = observed / (np.sum(observed) + eps)
    Q = expected / (np.sum(expected) + eps)
    mask = P > 0
    kl_divergence = np.sum(P[mask] * np.log(P[mask] / (Q[mask] + eps)))

    return kl_divergence, accuracy

In [5]:
from data_loading.load_data import load_pair

def sum_pairs(
    env,
    pair_algo,
):
    for i in range(10):
        exp_name = f"CUDA-01-{i:02d}"
        pairs = load_pair(
            env_name=env,
            exp_name=exp_name,
            pair_type="train",
            pair_algo=pair_algo,
        ).tolist()
        if i == 0:
            all_pairs = pairs
        else:
            all_pairs += pairs
        
    return all_pairs


In [6]:
import pandas as pd

kl_table = pd.DataFrame(index=pair_algo_list, columns=env_list)
acc_table = pd.DataFrame(index=pair_algo_list, columns=env_list)

for algo in pair_algo_list:
    for env in env_list:
        print(f"Evaluating {env} with {algo}")
        feedbacks = sum_pairs(env, algo)
        kl, acc = eval_feedbacks(feedbacks, env, algo)
        kl_table.loc[algo, env] = kl
        acc_table.loc[algo, env] = acc

# 행 기준 평균 추가
kl_table["mean"] = kl_table.astype(float).mean(axis=1)
acc_table["mean"] = acc_table.astype(float).mean(axis=1)

# 저장
kl_table.to_csv("kl_table.csv", float_format="%.4f")
acc_table.to_csv("acc_table.csv", float_format="%.4f")

Evaluating button-press-topdown-v2 with ternary-500-aug-10000-conf-0.999
Evaluating box-close-v2 with ternary-500-aug-10000-conf-0.999
Evaluating dial-turn-v2 with ternary-500-aug-10000-conf-0.999
Evaluating sweep-v2 with ternary-500-aug-10000-conf-0.999
Evaluating button-press-topdown-wall-v2 with ternary-500-aug-10000-conf-0.999
Evaluating sweep-into-v2 with ternary-500-aug-10000-conf-0.999
Evaluating drawer-open-v2 with ternary-500-aug-10000-conf-0.999
Evaluating lever-pull-v2 with ternary-500-aug-10000-conf-0.999
Evaluating button-press-topdown-v2 with ternary-500-aug-10000-uncert-3.1
Evaluating box-close-v2 with ternary-500-aug-10000-uncert-3.1
Evaluating dial-turn-v2 with ternary-500-aug-10000-uncert-3.1
Evaluating sweep-v2 with ternary-500-aug-10000-uncert-3.1
Evaluating button-press-topdown-wall-v2 with ternary-500-aug-10000-uncert-3.1
Evaluating sweep-into-v2 with ternary-500-aug-10000-uncert-3.1
Evaluating drawer-open-v2 with ternary-500-aug-10000-uncert-3.1
Evaluating lever-

In [7]:
# 소수점 세 자리까지 반올림
kl_table_rounded = kl_table.astype(float).round(4)
acc_table_rounded = acc_table.astype(float).round(4)

# CSV로 저장
kl_table_rounded.to_csv("kl_divergence_table.csv", index=True)
acc_table_rounded.to_csv("accuracy_table.csv", index=True)

print("✅ 소수점 3자리로 반올림 후 CSV 저장 완료:")
print("- kl_divergence_table.csv")
print("- accuracy_table.csv")

✅ 소수점 3자리로 반올림 후 CSV 저장 완료:
- kl_divergence_table.csv
- accuracy_table.csv
