Overview
===

- 目標
    - 利用Contrastive Representation Learning 訓練 Encoder，將品牌與名人向量映射至同一語意空間，找到適合品牌的代言人
- 模型特徵
    - 品牌描述向量 (以LLM生成品牌描述，再透過 Voyage AI 的embedding model 將品牌描述轉換為向量): 1024 dimensions
    - 品牌 brand personality （Jennifer Aaker 品牌人格五大構面）
    - 品牌類別: 13 dimensions
    - 代言人性別: 1 dimension
    - 代言人年齡區間: 8 dimensions
    - 代言人人格特質向量  (以LLM生成代言人人格特質描述，再透過 Voyage AI 的embedding model 將代言人的人格特質描述轉換為向量): 1024 dimensions
- 模型評估
    - 對於測試集中的每個品牌，模型會輸出預測的名人向量
    - 計算該預測向量與所有候選名人向量的 cosine similarity
    - 選擇最相似(前10)的名人，作為「預測名人」
      - 如果這個(前10)名人確實跟該品牌有代言關係，則視為正確
    - 最後計算Top-1 Accuracy, Top-10 Accuracy

# Prepare data for modeling

In [5]:
from sklearn.model_selection import train_test_split


# train test split
train_df, test_df = train_test_split(df_joined, test_size=0.2, random_state=42)

X_train = train_df[brand_cols + demo_col + product_cat].to_numpy().astype(np.float32) # 1046 dimensions
y_train = train_df[celeb_cols].to_numpy().astype(np.float32) # 1024 dimensions

X_test = test_df[brand_cols + demo_col + product_cat].to_numpy().astype(np.float32) # 1046 dimensions
y_test = test_df[celeb_cols].to_numpy().astype(np.float32) # 1024 dimensions
test_celeb_ids = test_df[celeb_id_col].to_numpy()

# Define loss, model architecture, evaluation metrics

In [6]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, regularizers


def build_brand_encoder(input_dim, embed_dim=128):
    inputs = layers.Input(shape=(input_dim,))
    x = layers.Dense(64, activation="relu", kernel_regularizer=regularizers.l2(5e-4))(inputs)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(embed_dim)(x)
    outputs = layers.Lambda(lambda t: tf.nn.l2_normalize(t, axis=-1))(outputs)
    return models.Model(inputs, outputs, name="brand_encoder")


def triplet_loss(margin=0.2):
    def loss_fn(y_true, y_pred):
        embed_dim = tf.shape(y_pred)[1] // 3
        anchor = y_pred[:, 0:embed_dim]
        positive = y_pred[:, embed_dim:2*embed_dim]
        negative = y_pred[:, 2*embed_dim:3*embed_dim]

        # cosine similarity
        def cos_sim(x, y):
            x = tf.nn.l2_normalize(x, axis=-1)
            y = tf.nn.l2_normalize(y, axis=-1)
            return tf.reduce_sum(x * y, axis=-1)

        pos_sim = cos_sim(anchor, positive)
        neg_sim = cos_sim(anchor, negative)

        losses = tf.maximum(0.0, neg_sim - pos_sim + margin)
        return tf.reduce_mean(losses)
    return loss_fn


def build_triplet_model(input_dim=27, celeb_dim=1024, embed_dim=128):
    brand_encoder = build_brand_encoder(input_dim, embed_dim)

    # input
    brand_in = layers.Input(shape=(input_dim,))
    celeb_pos_in = layers.Input(shape=(celeb_dim,))
    celeb_neg_in = layers.Input(shape=(celeb_dim,))

    # brand → embedding
    anchor = brand_encoder(brand_in)

    # encode celeb to the same dimensions
    celeb_proj = models.Sequential([
        layers.Dense(embed_dim),
        layers.Lambda(lambda t: tf.nn.l2_normalize(t, axis=-1))
    ])
    positive = celeb_proj(celeb_pos_in)
    negative = celeb_proj(celeb_neg_in)


    merged = layers.Concatenate(axis=-1)([anchor, positive, negative])

    model = models.Model([brand_in, celeb_pos_in, celeb_neg_in], merged)
    return model


def create_triplets(X, y, celeb_ids, num_triplets=10000):
    triplets = []
    n = len(X)
    for _ in range(num_triplets):
        i = np.random.randint(0, n)  # index of anchor and positive sample
        while True:
            k = np.random.randint(0, n)  # index of negative sample (randomly choose a negative sample)
            if celeb_ids[k] != celeb_ids[i]:  # negative sample should be different from positive sample
                break
        triplets.append((X[i], y[i], y[k]))

    brand_arr = np.array([t[0] for t in triplets]) # anchor
    pos_arr = np.array([t[1] for t in triplets])
    neg_arr = np.array([t[2] for t in triplets])
    return [brand_arr, pos_arr, neg_arr]


def evaluate_topk_triplet(brand_encoder, celeb_proj, X_test, y_test, celeb_ids, df, K=10, detail=False):

  # Encode
  brand_embeds = brand_encoder.predict(X_test, verbose=0)
  celeb_embeds = celeb_proj.predict(y_test, verbose=0)

  brand_embeds /= np.linalg.norm(brand_embeds, axis=1, keepdims=True)
  celeb_embeds /= np.linalg.norm(celeb_embeds, axis=1, keepdims=True)

  top1, topk = 0, 0
  n = len(X_test)

  for i in range(n):
      sims = np.dot(celeb_embeds, brand_embeds[i])  # for every brand, find the cosine similarities of the brand and all the celebrities
      sorted_idx = np.argsort(sims)[::-1]

      # find the top 10 most similar celebrities of the brand
      ranked_ids = []
      for idx in sorted_idx:
          cid = celeb_ids[idx]
          if cid not in ranked_ids:
              ranked_ids.append(cid)
          if len(ranked_ids) >= K:
              break
      # Top-1
      if ranked_ids[0] == celeb_ids[i]:
          top1 += 1
      # Top-10
      if celeb_ids[i] in ranked_ids:
          topk += 1

  return top1 / n, topk / n


# Model Training

In [12]:
train_triplets = create_triplets(X_train, y_train, train_df[celeb_id_col].to_numpy(), num_triplets=20000)


model = build_triplet_model(input_dim=X_train.shape[1], celeb_dim=y_train.shape[1], embed_dim=128)
model.compile(optimizer=optimizers.Adam(1e-4), loss=triplet_loss(margin=0.2))

history = model.fit(
    train_triplets, np.zeros(len(train_triplets[0])),
    batch_size=64,
    epochs=20,
    verbose=1
)


Epoch 1/20
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 6ms/step - loss: 0.1815
Epoch 2/20
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 7ms/step - loss: 0.0933
Epoch 3/20
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 7ms/step - loss: 0.0675
Epoch 4/20
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 14ms/step - loss: 0.0546
Epoch 5/20
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 18ms/step - loss: 0.0477
Epoch 6/20
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 6ms/step - loss: 0.0411
Epoch 7/20
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 12ms/step - loss: 0.0372
Epoch 8/20
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 10ms/step - loss: 0.0334
Epoch 9/20
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 7ms/step - loss: 0.0325
Epoch 10/20
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 7ms/step 

# Evaluation

In [32]:
# get the encoders from model
brand_encoder = model.get_layer("brand_encoder")
celeb_proj = model.get_layer(index=-2)

# evaluate performance with test dataset
top1_acc, top10_acc = evaluate_topk_triplet(brand_encoder, celeb_proj, X_test, y_test, test_celeb_ids,test_df, K=10)
print(f"Evaluation with test dataset: Top-1 Accuracy: {top1_acc:.4f}")
print(f"Evaluation with test dataset: Top-10 Accuracy: {top10_acc:.4f}")


Evaluation with test dataset: Top-1 Accuracy: 0.1462
Evaluation with test dataset: Top-10 Accuracy: 0.6923
