In [None]:
import subprocess
import os
import re
from tqdm.auto import tqdm
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import librosa
import librosa.display

import torch
import torch.nn as nn
import torch.nn.functional as F

from skimage.transform import resize
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans

from pytorch_msssim import ssim

In [None]:
def get_resized_ssm(audio_path, target_size=128, threshold=0.8):
    y, sr = librosa.load(audio_path)

    chroma = librosa.feature.chroma_cqt(y=y, sr=sr) # (12, T)
    ssm = cosine_similarity(chroma.T) # (T, T)

    resized_ssm = resize(ssm, (target_size, target_size),
                          mode='reflect', anti_aliasing=True, preserve_range=True)

    return resized_ssm

ssm_imgs = []
labels = []
filenames = []

# emotional match + mismatch
for label in ["0", "1", "2", "3"]:
    dir_path = os.path.join("temp_wavs", label)
    for fname in tqdm(os.listdir(dir_path)):
        if not fname.endswith(".wav"):
            continue
        path = os.path.join(dir_path, fname)
        ssm = get_resized_ssm(path) # (128, 128)
        ssm_imgs.append(ssm[None, :, :]) # (1, 128, 128)
        labels.append(label)
        filenames.append(fname)

# chorus
chorus_dir = "chorus"
for fname in tqdm(os.listdir(chorus_dir)):
    if not fname.endswith(".mp3"):
        continue
    path = os.path.join(chorus_dir, fname)
    ssm = get_resized_ssm(path)
    ssm_imgs.append(ssm[None, :, :])
    labels.append("chorus")
    filenames.append(fname)

ssm_imgs = np.array(ssm_imgs) # (B, 1, 128, 128)

In [None]:
# np.savez("ssm_data.npz", imgs=ssm_imgs, labels=labels, filenames=filenames)

loaded = np.load("ssm_data.npz", allow_pickle=True)
ssm_imgs = loaded["imgs"]
labels = loaded["labels"]
filenames = loaded["filenames"]

In [None]:
class SSM_AutoEncoder_Skip(nn.Module):
    def __init__(self, bottleneck_dim=64):
        super().__init__()

        self.enc1 = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(8),
            nn.LeakyReLU(0.2)
        )
        self.enc2 = nn.Sequential(
            nn.Conv2d(8, 16, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(16),
            nn.LeakyReLU(0.2)
        )
        self.enc3 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(0.2)
        )

        self.dropout = nn.Dropout(0.3)
        self.bottleneck = nn.Linear(32*16*16, bottleneck_dim)
        self.unbottleneck = nn.Linear(bottleneck_dim, 32*16*16)

        self.dec1 = nn.Sequential(
            nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(16),
            nn.LeakyReLU(0.2)
        )
        self.dec2 = nn.Sequential(
            nn.ConvTranspose2d(16, 8, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(8),
            nn.LeakyReLU(0.2)
        )
        self.dec3 = nn.Sequential(
            nn.ConvTranspose2d(8, 1, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid(),
        )
    
    def forward(self, x): # ssm_img (B, 1, 128, 128)
        e1 = self.enc1(x) # (B, 8, 64, 64)
        e2 = self.enc2(e1) # (B, 16, 32, 32)
        e3 = self.enc3(e2) # (B, 32, 16, 16)

        z = self.dropout(self.bottleneck(e3.view(x.size(0), -1)))
        x_hat =self.unbottleneck(z).view(-1, 32, 16, 16)
        x_hat = x_hat + e3 
       
        d1 = self.dec1(x_hat + e3) # (B, 16, 32, 32)
        d2 = self.dec2(d1 + e2) # (B, 8, 64, 64)
        d3 = self.dec3(d2 + e1) # (B, 1, 128, 128)

        return d3, z

In [None]:
def show_reconstruction(model, ssm_tensor, n_samples=5):
    """
    model: 오토인코더
    ssm_tensor: (B, 1, 128, 128) torch.Tensor, normalized (0~1)
    n_samples: 몇 개의 이미지 비교할지
    """
    model.eval()
    with torch.no_grad():
        decoded, _ = model(ssm_tensor)  # (B, 1, 128, 128)

    decoded = decoded.cpu().numpy()
    originals = ssm_tensor.cpu().numpy()

    for i in range(n_samples):
        plt.figure(figsize=(6, 3))
        
        # 원본
        plt.subplot(1, 2, 1)
        plt.imshow(originals[i][0], cmap='magma', origin='lower', aspect='auto')
        plt.title("Original SSM")
        plt.axis('off')

        # 복원본
        plt.subplot(1, 2, 2)
        plt.imshow(decoded[i][0], cmap='magma', origin='lower', aspect='auto')
        plt.title("Reconstructed SSM")
        plt.axis('off')

        plt.tight_layout()
        plt.show()

In [None]:
model = SSM_AutoEncoder_Skip(64)
model.load_state_dict(torch.load("ssm_autoencoder_sc_150.pt", map_location="cpu"))
model.eval()

In [None]:
show_reconstruction(model, ssm_tensor, n_samples=5)

In [None]:
def mixed_loss(recon, target):
    """
    recon, target: (B, 1, 128, 128)
    """
    mse = F.mse_loss(recon, target)
    ssim_loss = 1 - ssim(recon, target, data_range=1.0, size_average=True)
    return 0.7 * mse + 0.3 * ssim_loss

In [None]:
ssm_tensor = torch.tensor(ssm_imgs, dtype=torch.float32)

model = SSM_AutoEncoder_Skip(bottleneck_dim=64)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()

for epoch in range(150):
    model.train()
    output, z = model(ssm_tensor) # (128, 128) SSM, bottleneck layer
    loss = mixed_loss(output, ssm_tensor)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1} - Loss: {loss.item():.4f}")

model.eval()
with torch.no_grad():
    _, z = model(ssm_tensor) # (B, 64)
    z_np = z.detach().numpy()

# torch.save(model.state_dict(), "ssm_autoencoder_all.pt")
# np.save("z_np_all.npy", z_np)

tsne = TSNE(n_components=2, random_state=42, perplexity=30)
tsne_result = tsne.fit_transform(z_np)

In [None]:
# Filter valid labels (0,1,2,3 only)
valid_labels = ['0', '1', '2', '3']
valid_indices = [i for i, label in enumerate(labels) if label in valid_labels]

labels_filtered = np.array([int(labels[i]) for i in valid_indices])
z_filtered = z[valid_indices]

# t-SNE
tsne = TSNE(n_components=2)
z_2d = tsne.fit_transform(z_filtered)

label_map = {
    0: "PN",
    1: "NP",
    2: "PP", 
    3: "NN",
}

color_map = {
    0: "red",
    1: "orange",
    2: "blue",
    3: "green",
}

def normalize_z2d(z_2d):
    z_min = z_2d.min(axis=0)
    z_max = z_2d.max(axis=0)
    return (z_2d - z_min) / (z_max - z_min)

z_2d_norm = normalize_z2d(z_2d)

# Plot
plt.figure(figsize=(8, 6))
for i in sorted(label_map.keys()):
    idx = labels_filtered == i
    plt.scatter(
        z_2d_norm[idx, 0], z_2d_norm[idx, 1],
        c=color_map[i],
        label=label_map[i],
        alpha=0.7
    )
plt.title("t-SNE of SSM Bottleneck Embeddings")
plt.legend(title="Emotion Pair")
# plt.colorbar()
plt.show()

In [None]:
kmeans = KMeans(n_clusters=6, random_state=42)
clusters = kmeans.fit_predict(z_2d_norm)

df = pd.DataFrame({
    'x': z_2d_norm[:, 0],
    'y': z_2d_norm[:, 1],
    'label': labels_filtered,
    'cluster': clusters
})

label_map = {0: 'PN', 1: 'NP', 2: 'PP', 3: 'NN'}
df['emotion'] = df['label'].map(label_map)

cluster_dist = pd.crosstab(df['cluster'], df['emotion'], normalize='index')

plt.figure(figsize=(8, 5))
sns.heatmap(cluster_dist, annot=True, cmap='coolwarm')
plt.title("Cluster-wise Emotion Pair Distribution")
plt.xlabel("Emotion Pair")
plt.ylabel("Cluster ID")
plt.show()