In [None]:
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
from sklearn.decomposition import PCA
from datetime import datetime
import matplotlib
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import Dataset

In [None]:
run_name = f"PerfumeEmbedding_{datetime.now().strftime('%m%d_%H%M%S')}"

In [None]:
matplotlib.rcParams['font.sans-serif'] = ['Noto Sans CJK JP']
matplotlib.rcParams['axes.unicode_minus'] = False

In [None]:
class PerfumeDataset(Dataset):
    def __init__(self, names, top_notes_list, middle_notes_list, base_notes_list, fragrance_labels, note2vec):
        self.names = names
        self.top_notes_list = top_notes_list
        self.middle_notes_list = middle_notes_list
        self.base_notes_list = base_notes_list
        self.fragrance_labels = fragrance_labels
        self.note2vec = note2vec
        
    def __len__(self):
        return len(self.names)
    
    def __getitem__(self, idx):
        name = self.names[idx]
        top_notes = self.notes_to_vector(self.top_notes_list[idx])
        middle_notes = self.notes_to_vector(self.middle_notes_list[idx])
        base_notes = self.notes_to_vector(self.base_notes_list[idx])
        fragrance = self.fragrance_labels[idx]
        return {
            'name': name,
            'top_notes': top_notes,
            'middle_notes': middle_notes,
            'base_notes': base_notes,
            'fragrance': fragrance
        }
        
    def notes_to_vector(self, notes):
        vectors = []
        for note in notes:
            if note in self.note2vec:
                vectors.append(self.note2vec[note])
        if len(vectors) == 0:
            return torch.zeros(0, 768)
        return torch.tensor(np.stack(vectors), dtype=torch.float32)

In [3]:
class PerfumeEmbedding(nn.Module):
    def __init__(self, note_dim=768, hidden=256, z_dim=128, num_classes=8):
        super().__init__()
        # φ for each layer
        self.phi_top = nn.Sequential(nn.Linear(note_dim, hidden), nn.ReLU())
        self.phi_mid = nn.Sequential(nn.Linear(note_dim, hidden), nn.ReLU())
        self.phi_base = nn.Sequential(nn.Linear(note_dim, hidden), nn.ReLU())
        
        # ρ for each layer
        self.rho_top = nn.Linear(hidden, hidden)
        self.rho_mid = nn.Linear(hidden, hidden)
        self.rho_base = nn.Linear(hidden, hidden)
        
        # final ρ
        self.rho = nn.Sequential(
            nn.Linear(hidden*3, z_dim),
            nn.ReLU()
        )
        
        # classifier
        self.classifier = nn.Linear(z_dim, num_classes)

    def aggregate(self, phi, rho, notes):
        h = phi(notes)
        agg = h.mean(dim=1)
        return rho(agg)

    def forward(self, notes_top, notes_mid, notes_base):
        h_top = self.aggregate(self.phi_top, self.rho_top, notes_top)
        h_mid = self.aggregate(self.phi_mid, self.rho_mid, notes_mid)
        h_base = self.aggregate(self.phi_base, self.rho_base, notes_base)
        
        h_all = torch.cat([h_top, h_mid, h_base], dim=-1)
        z = self.rho(h_all)
        logits = self.classifier(z)
        return logits, z

In [4]:
# Build note2vec dictionary
df_note = pd.read_csv("data/note_embedding.csv")
note2vec = {}
for _, row in df_note.iterrows():
    note = row['note']
    vec = row.iloc[1:].values.astype(np.float32)
    note2vec[note] = vec

In [5]:
def perfume_collate_fn(batch):
    names = [item['name'] for item in batch]
    top_notes = [item['top_notes'] for item in batch]
    middle_notes = [item['middle_notes'] for item in batch]
    base_notes = [item['base_notes'] for item in batch]
    fragrance = torch.tensor([item['fragrance'] for item in batch], dtype=torch.long)
    return {
        'name': names,
        'top_notes': top_notes,
        'middle_notes': middle_notes,
        'base_notes': base_notes,
        'fragrance': fragrance
    }

In [None]:
# Load perfume data
df_perfume = pd.read_csv("data/1976_clean.csv")
names = df_perfume['name'].tolist()
top_notes_list = df_perfume['top_notes'].apply(lambda x: x.split('、')).tolist()
middle_notes_list = df_perfume['middle_notes'].apply(lambda x: x.split('、')).tolist()
base_notes_list = df_perfume['base_notes'].apply(lambda x: x.split('、')).tolist()
le = LabelEncoder()
fragrance_labels = le.fit_transform(df_perfume['fragrance'])
dataset = PerfumeDataset(names, top_notes_list, middle_notes_list, base_notes_list, fragrance_labels, note2vec)

In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PerfumeEmbedding().to(device)
optimize = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

In [None]:
def train_one_epoch():
    model.train()
    total_loss = 0.0
    for i in tqdm(range(len(dataset)), desc="Train"):
        perfume = dataset[i]
        top_notes = perfume['top_notes'].to(device)
        middle_notes = perfume['middle_notes'].to(device)
        base_notes = perfume['base_notes'].to(device)
        label = torch.tensor([perfume['fragrance']], dtype=torch.long).to(device)
        
        logits, z = model(top_notes.unsqueeze(0), middle_notes.unsqueeze(0), base_notes.unsqueeze(0))
        loss = criterion(logits, label)
        
        optimize.zero_grad()
        loss.backward()
        optimize.step()
        
        total_loss += loss.item()
    
    return total_loss / len(dataset)

In [None]:
def evaluate(epoch):
    model.eval()
    embeddings = []
    with torch.no_grad():
        for i in tqdm(range(len(dataset)), desc="Evaluating"):
            perfume = dataset[i]
            top_notes = perfume['top_notes'].to(device)
            middle_notes = perfume['middle_notes'].to(device)
            base_notes = perfume['base_notes'].to(device)
            logits, z = model(top_notes.unsqueeze(0), middle_notes.unsqueeze(0), base_notes.unsqueeze(0))
            embeddings.append(z.cpu().numpy())
            
    embeddings = np.array(embeddings)
    
    # PCA
    pca = PCA(n_components=2)
    embeddings_2d = pca.fit_transform(embeddings)
    
    os.makedirs(f"PCA/{run_name}", exist_ok=True)
    plt.figure(figsize=(10,8))
    scatter = plt.scatter(
        embeddings_2d[:,0], 
        embeddings_2d[:,1], 
        c=fragrance_labels, 
        cmap='tab10', 
        alpha=0.7
    )
    plt.title(f"Epoch {epoch} - PCA")
    plt.xlabel('PC1')
    plt.ylabel('PC2')
    plt.legend(
        handles=scatter.legend_elements()[0],
        labels=[str(x) for x in le.classes_[:len(scatter.legend_elements()[0])]],
        title='香型',
        bbox_to_anchor=(1.05, 1),
        loc='upper left'
    )
    plt.tight_layout()
    plt.savefig(f"PCA/{run_name}/{epoch}.png")
    plt.close()

In [None]:
def save_model(epoch):
    os.makedirs(f"models/{run_name}", exist_ok=True)
    torch.save(model.state_dict(), f"models/{run_name}/bert_epoch_{epoch}.pth")

In [None]:
epochs = 10
for ep in range(1,epochs+1):
    print(f"====== Epoch {ep} ======")
    
    # Train
    loss = train_one_epoch()
    print(f"Loss: {loss}")
    
    # Evaluate
    evaluate(ep)
    
    # Save
    save_model(ep)