In [None]:
import os
import cv2
import numpy as np
from tqdm import tqdm
from pygam import LinearGAM
from scipy.stats import entropy
from matplotlib import pyplot as plt

In [None]:
origin_path = "/nas/Public/experiment_result/FACE-image/img-vanGogh_est-diffusionV2/origin/"
generated_paths = [
    "/nas/Public/experiment_result/FACE-image/img-vanGogh_est-diffusionV2/v2-1_768-ema-pruned/",
    "/nas/Public/experiment_result/FACE-image/img-vanGogh_est-diffusionV2/vanGoghDiffusion_v1/",
    "/nas/Public/experiment_result/FACE-image/img-vanGogh_est-diffusionV2/juggernautXL_juggernautX/"
]

WIDTH = 512
HEIGHT = 512
SPLIT = 64

In [None]:
origin_dict = {}

for root, dirs, files in os.walk(origin_path):
    for file in files:
        if file.endswith(".png"):
            origin_dict.update({file.split('.')[0]: cv2.imread(os.path.join(root, file))[:, :, ::-1]})

In [None]:
# each subject model
for generated_path in generated_paths:
    print("listing path:", generated_path)
    sequences = {}
    # # each origin image
    # for name, origin in tqdm(origin_dict.items()):
    #     sequence = []
    #     # each mask strip
    #     for mask_idx in range(SPLIT - 1):
    #         generated_img = cv2.imread(os.path.join(generated_path, name + f"_mask-{mask_idx}.png"))[:, :, ::-1]
    #         # one strip
    #         columns = WIDTH // SPLIT * (mask_idx + 1)
    #         token = np.array(generated_img[:, columns:columns + WIDTH // SPLIT], dtype=np.int16)
    #         ground_truth = np.array(origin[:, columns:columns + WIDTH // SPLIT], dtype=np.int16)

    #         mse = ((token - ground_truth) ** 2).mean()
    #         sequence.append(mse)
    #     sequences.update({name: sequence})
    # # save entropies
    # with open(generated_path + "image_entropies.txt", "w") as f:
    #     for name, sequence in sequences.items():
    #         f.write(f"{name} {sequence}\n")
    # each subject image
    for idx in tqdm(range(len(origin_dict))):
        subject_img = cv2.imread(os.path.join(generated_path, f"subject-{idx}.png"))[:, :, ::-1]
        sequence = []
        # each mask strip
        for mask_idx in range(SPLIT - 1):
            generated_img = cv2.imread(os.path.join(generated_path, f"subject-{idx}_mask-{mask_idx}.png"))[:, :, ::-1]
            # one strip
            columns = WIDTH // SPLIT * (mask_idx + 1)
            token = np.array(generated_img[:, columns:columns + WIDTH // SPLIT], dtype=np.int16)
            ground_truth = np.array(subject_img[:, columns:columns + WIDTH // SPLIT], dtype=np.int16)

            mse = ((token - ground_truth) ** 2).mean()
            sequence.append(mse)
        sequences.update({f"subject-{idx}": sequence})
    # save entropies
    with open(generated_path + "subject_entropies.txt", "w") as f:
        for name, sequence in sequences.items():
            f.write(f"{name} {sequence}\n")

In [None]:
# read entropies
image_sequences_dict = {}
image_spectra_dict = {}
subject_sequences_dict = {}
subject_spectra_dict = {}
path_names = set()

for generated_path in generated_paths:
    with open(generated_path + "image_entropies.txt", "r") as f:
        sequences = {}
        spectra = {}
        for line in f:
            name, sequence = line.split('[')
            name = name.strip()
            sequence = [float(x) for x in sequence.strip().strip(']').split(', ') if x]
            spectrum = np.abs(np.fft.fft(sequence))
            sequences.update({name: sequence})
            spectra.update({name: spectrum})
        path_name = generated_path.split('/')[-2]
        image_sequences_dict.update({path_name: sequences})
        image_spectra_dict.update({path_name: spectra})
        path_names.add(path_name)
    with open(generated_path + "subject_entropies.txt", "r") as f:
        sequences = {}
        spectra = {}
        for line in f:
            name, sequence = line.split('[')
            name = name.strip()
            sequence = [float(x) for x in sequence.strip().strip(']').split(', ') if x]
            spectrum = np.abs(np.fft.fft(sequence))
            sequences.update({name: sequence})
            spectra.update({name: spectrum})
        path_name = generated_path.split('/')[-2]
        subject_sequences_dict.update({path_name: sequences})
        subject_spectra_dict.update({path_name: spectra})
        path_names.add(path_name)

In [None]:
# plot mean spectra
plt.style.use("default")
for path_name in path_names:
    # normalized mean spectrum
    mean_image_spectrum = np.mean([spectrum / np.linalg.norm(spectrum) for _, spectrum in image_spectra_dict[path_name].items()], axis=0)
    mean_subject_spectrum = np.mean([spectrum / np.linalg.norm(spectrum) for _, spectrum in subject_spectra_dict[path_name].items()], axis=0)

    mean_image_spectrum = mean_image_spectrum[1:]
    mean_subject_spectrum = mean_subject_spectrum[1:]

    plt.figure()
    plt.plot(mean_image_spectrum, label="Mean Spectrum ground truth".format(legend=path_name))
    plt.plot(mean_subject_spectrum, label="Mean Spectrum {legend}".format(legend=path_name))
    plt.legend()
    plt.savefig(f"./img/spectra/{path_name}_raw.pdf", bbox_inches="tight")

    plt.figure()
    X = np.linspace(0, 0.5, len(mean_image_spectrum))
    gam = LinearGAM().fit(X, mean_image_spectrum)
    XX = gam.generate_X_grid(term=0, n=len(mean_image_spectrum))
    pdep, confi = gam.partial_dependence(term=0, X=XX, width=0.95)
    plt.plot(
        XX[:, 0],
        np.abs(pdep),
        label="Partial Dependence ground truth".format(legend=path_name),
    )
    plt.fill_between(
        XX[:, 0],
        np.where(
            confi[:, 0] * confi[:, 1] > 0,
            np.minimum(np.abs(confi[:, 0]), np.abs(confi[:, 1])),
            0
        ),
        np.maximum(np.abs(confi[:, 0]), np.abs(confi[:, 1])),
        alpha=0.25
    )
    X = np.linspace(0, 0.5, len(mean_subject_spectrum))
    gam = LinearGAM().fit(X, mean_subject_spectrum)
    XX = gam.generate_X_grid(term=0, n=len(mean_subject_spectrum))
    pdep, confi = gam.partial_dependence(term=0, X=XX, width=0.95)
    plt.plot(
        XX[:, 0],
        np.abs(pdep),
        label="Partial Dependence {legend}".format(legend=path_name),
    )
    plt.fill_between(
        XX[:, 0],
        np.where(
            confi[:, 0] * confi[:, 1] > 0,
            np.minimum(np.abs(confi[:, 0]), np.abs(confi[:, 1])),
            0
        ),
        np.maximum(np.abs(confi[:, 0]), np.abs(confi[:, 1])),
        alpha=0.25
    )
    plt.legend()
    plt.savefig(f"./img/spectra/{path_name}.pdf", bbox_inches="tight")

In [None]:
def spectral_overlap(spectrum1, spectrum2):
    spectra = np.concatenate([spectrum1[None], spectrum2[None]], axis=0)
    return spectra.min(axis=0).sum() / spectra.max(axis=0).sum()

def pearson_correlation(spectrum1, spectrum2):
    return np.corrcoef(spectrum1, spectrum2)[0, 1]

def earth_mover_distance(spectrum1, spectrum2):
    p1 = [spectrum1[0]]
    p2 = [spectrum2[0]]
    for value in spectrum1[1:]:
        p1.append(p1[-1] + value)
    for value in spectrum2[1:]:
        p2.append(p2[-1] + value)
    p1 = np.array(p1)
    p2 = np.array(p2)
    p1 /= p1[-1]
    p2 /= p2[-1]
    return np.abs(p1 - p2).sum() / spectrum1.shape[0]

def kl_divergence(spectrum1, spectrum2):
    kl = entropy(abs(spectrum1), abs(spectrum2))
    return kl

def score(spectrum1, spectrum2):
    so = spectral_overlap(spectrum1, spectrum2)
    corr = pearson_correlation(spectrum1, spectrum2)
    emd = earth_mover_distance(spectrum1, spectrum2)
    kl = kl_divergence(spectrum1, spectrum2)
    return so, corr, emd, kl

def area_under_curve(spectrum):
    return np.abs(spectrum).sum()

In [None]:
# FACE scores
# estimator_name = "v2-1_768-ema-pruned"
# estimator_spectrum = np.mean([spectrum / np.linalg.norm(spectrum) for _, spectrum in image_spectra_dict[estimator_name].items()], axis=0)
# estimator_spectrum = estimator_spectrum[1:]
for path_name in path_names:
    # normalized mean spectrum
    mean_image_spectrum = np.mean([spectrum / np.linalg.norm(spectrum) for _, spectrum in image_spectra_dict[path_name].items()], axis=0)
    mean_subject_spectrum = np.mean([spectrum / np.linalg.norm(spectrum) for _, spectrum in subject_spectra_dict[path_name].items()], axis=0)
    # mean_spectrum = mean_spectrum[1:]
    so, corr, emd, kl = score(mean_subject_spectrum, mean_image_spectrum)
    auc = area_under_curve(mean_subject_spectrum)
    print('=' * 20)
    print(path_name)
    print("Spectral Overlap:", so)
    print("Pearson Correlation:", corr)
    print("Earth Mover Distance:", emd)
    print("KL Divergence:", kl)
    print("Area Under Curve:", auc)

In [None]:
# save_path = "/nas/Public/experiment_result/FACE-image/img-vanGogh_est-diffusionV2/origin/rename/"

# for filename, img in origin_dict.items():
#     idx = int(filename.split('-')[-1])
#     name = filename.split('-')[0] + '-' + str(idx - 1)
#     cv2.imwrite(os.path.join(save_path, name + ".png"), img[:, :, ::-1])