In [None]:
import ast
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE, trustworthiness
from sklearn.metrics import pairwise_distances
from scipy.stats import spearmanr
import umap
from sklearn.neighbors import NearestNeighbors

# CSV 데이터 로드
df = pd.read_csv('./ref_origin_emb.csv')

# 적용
df['bert_embeddings'] = df['bert_embeddings'].apply(ast.literal_eval)
df['sbert_embeddings'] = df['sbert_embeddings'].apply(ast.literal_eval)

bert_embeddings = np.stack(df['bert_embeddings'].values)
sbert_embeddings = np.stack(df['sbert_embeddings'].values)

In [None]:
# Autoencoder 정의
class Autoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, encoding_dim):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, encoding_dim)
        )
        self.decoder = nn.Sequential(
            nn.Linear(encoding_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, input_dim)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

# 평가지표 함수
def continuity(X, X_embedded, n_neighbors=5):
    n = NearestNeighbors(n_neighbors=n_neighbors).fit(X_embedded)
    dist_X = pairwise_distances(X)
    original_neighbors = np.argsort(dist_X, axis=1)[:, 1:n_neighbors+1]
    continuity_sum = 0
    for i in range(X.shape[0]):
        neighbors_embedded = n.kneighbors([X_embedded[i]], return_distance=False)[0]
        continuity_sum += np.sum([
            np.where(original_neighbors[i] == j)[0][0] if j in original_neighbors[i] else n_neighbors
            for j in neighbors_embedded
        ])
    return 1 - (2 / (X.shape[0] * n_neighbors * (2 * X.shape[0] - 3 * n_neighbors - 1))) * continuity_sum

def mrre(X, X_embedded, n_neighbors=5):
    orig_dist = pairwise_distances(X)
    emb_dist = pairwise_distances(X_embedded)
    orig_rank = np.argsort(orig_dist, axis=1)
    emb_rank = np.argsort(emb_dist, axis=1)
    mrre_sum = 0
    for i in range(X.shape[0]):
        for j in range(1, n_neighbors+1):
            rank_orig = np.where(orig_rank[i] == emb_rank[i, j])[0][0]
            mrre_sum += abs(rank_orig - j) / j
    return mrre_sum / (X.shape[0] * n_neighbors)

def rank_correlation(X, X_embedded):
    dist_X = pairwise_distances(X).flatten()
    dist_embedded = pairwise_distances(X_embedded).flatten()
    rho, _ = spearmanr(dist_X, dist_embedded)
    return rho

In [None]:
# 입력 데이터 준비
bert_embeddings = np.stack(df['bert_embeddings'].values)
sbert_embeddings = np.stack(df['sbert_embeddings'].values)

embeddings_dict = {
    "BERT": bert_embeddings,
    "SBERT": sbert_embeddings
}

results_all = []
encoding_dim = 64

In [None]:
for model_name, embedding_matrix in embeddings_dict.items():
    print(f"\n==== {model_name} 처리 시작 ====")

    # 정규화
    scaler = MinMaxScaler()
    X = scaler.fit_transform(embedding_matrix)

    # TensorDataset 구성
    tensor_X = torch.tensor(X, dtype=torch.float32)
    dataset = TensorDataset(tensor_X, tensor_X)
    dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

    # PCA
    print(f"{model_name} - PCA")
    pca = PCA(n_components=encoding_dim)
    X_pca = pca.fit_transform(X)
    df[f"{model_name}_PCA_{encoding_dim}d"] = [",".join(map(str, row)) for row in X_pca]

    results_all.append({
        "Model": model_name,
        "Method": "PCA",
        "Dim": encoding_dim,
        "ExplainedVariance": pca.explained_variance_ratio_.sum(),
        "Trustworthiness": trustworthiness(X, X_pca),
        "Continuity": continuity(X, X_pca),
        "MRRE": mrre(X, X_pca),
        "Spearman": rank_correlation(X, X_pca)
    })

    # UMAP
    print(f"{model_name} - UMAP")
    reducer = umap.UMAP(n_components=encoding_dim, random_state=42)
    X_umap = reducer.fit_transform(X)
    df[f"{model_name}_UMAP_{encoding_dim}d"] = list(X_umap)
    results_all.append({
        "Model": model_name,
        "Method": "UMAP",
        "Dim": encoding_dim,
        "ExplainedVariance": None,
        "Trustworthiness": trustworthiness(X, X_umap),
        "Continuity": continuity(X, X_umap),
        "MRRE": mrre(X, X_umap),
        "Spearman": rank_correlation(X, X_umap)
    })

    # # t-SNE
    # print(f"{model_name} - t-SNE")
    # tsne = TSNE(n_components=encoding_dim, method='exact', random_state=42, perplexity=30)
    # X_tsne = tsne.fit_transform(X)
    # df[f"{model_name}_TSNE_{encoding_dim}d"] = list(X_tsne)
    # results_all.append({
    #     "Model": model_name,
    #     "Method": "t-SNE",
    #     "Dim": encoding_dim,
    #     "ExplainedVariance": None,
    #     "Trustworthiness": trustworthiness(X, X_tsne),
    #     "Continuity": continuity(X, X_tsne),
    #     "MRRE": mrre(X, X_tsne),
    #     "Spearman": rank_correlation(X, X_tsne)
    # })

    # Autoencoder
    print(f"{model_name} - Autoencoder")
    model = Autoencoder(input_dim=X.shape[1], hidden_dim=128, encoding_dim=encoding_dim)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    for epoch in range(50):
        epoch_loss = 0
        for batch, _ in dataloader:
            optimizer.zero_grad()
            reconstructed = model(batch)
            loss = criterion(reconstructed, batch)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        if (epoch + 1) % 10 == 0 or epoch == 0 or epoch == 49:
            print(f"Epoch [{epoch+1}/50], Loss: {epoch_loss/len(dataloader):.6f}")

    with torch.no_grad():
        encoded = model.encoder(tensor_X).numpy()
    df[f"{model_name}_AE_{encoding_dim}d"] = [",".join(map(str, row)) for row in encoded]

    results_all.append({
        "Model": model_name,
        "Method": "Autoencoder",
        "Dim": encoding_dim,
        "ExplainedVariance": None,
        "Trustworthiness": trustworthiness(X, encoded),
        "Continuity": continuity(X, encoded),
        "MRRE": mrre(X, encoded),
        "Spearman": rank_correlation(X, encoded)
    })

    # 모델 저장
    model_path = f"autoencoder_{model_name}_{encoding_dim}d.pt"
    torch.save(model.state_dict(), model_path)
    print(f"Autoencoder 저장 완료 → {model_path}")

# 결과 저장
results_df = pd.DataFrame(results_all)
results_df.to_csv("./emb_red_result_ref.csv", index=False)
df.to_csv("./ref_origin_emb_red.csv", index=False)
print("모든 차원 축소 및 저장 완료!")

In [None]:
results_df