In [1]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model

# =============================
# 1) 数据增强 & 构造正样本对
# =============================
def augment_window(x):
    return x + np.random.normal(0, 0.01, x.shape).astype(np.float32)

def make_contrastive_pairs(X):
    anchors, positives = [], []
    for w in X:
        anchors.append(w)
        positives.append(augment_window(w))
    return np.stack(anchors).astype(np.float32), np.stack(positives).astype(np.float32)

# =============================
# 2) 简单 Encoder 模型
# =============================
def build_encoder(input_dim, feature_dim=64):
    inp = layers.Input(shape=(input_dim,))
    x = layers.Dense(128, activation="relu")(inp)
    x = layers.Dense(feature_dim)(x)       # 输出 embedding 向量
    x = tf.nn.l2_normalize(x, axis=1)      # 单位化，方便算余弦相似度
    return Model(inp, x, name="encoder")

# =============================
# 3) InfoNCE Loss (对比学习常用)
# =============================
def info_nce_loss(z_anchor, z_positive, temperature=0.1):
    batch_size = tf.shape(z_anchor)[0]

    # 拼接所有向量 [2N, d]
    z = tf.concat([z_anchor, z_positive], axis=0)

    # 余弦相似度 [2N, 2N]
    sim = tf.matmul(z, z, transpose_b=True) / temperature

    # 构造标签：正样本对应索引
    labels = tf.concat([
        tf.range(batch_size, batch_size*2),
        tf.range(0, batch_size)
    ], axis=0)

    # 交叉熵 Loss
    loss = tf.reduce_mean(
        tf.keras.losses.sparse_categorical_crossentropy(
            labels, sim, from_logits=True
        )
    )
    return loss

# =============================
# 4) 训练 Demo
# =============================
if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)

    # 伪造数据 (1000 个样本，每个 32 维)
    X = np.random.rand(1000, 32).astype(np.float32)

    # 构造 (anchor, positive) pair
    anchors, positives = make_contrastive_pairs(X)

    # 构建 encoder
    encoder = build_encoder(input_dim=32, feature_dim=64)
    optimizer = tf.keras.optimizers.Adam(1e-3)

    # 简单训练 10 个 epoch
    for epoch in range(10):
        with tf.GradientTape() as tape:
            z_anchor = encoder(anchors, training=True)
            z_positive = encoder(positives, training=True)
            loss = info_nce_loss(z_anchor, z_positive)

        grads = tape.gradient(loss, encoder.trainable_variables)
        optimizer.apply_gradients(zip(grads, encoder.trainable_variables))

        print(f"Epoch {epoch+1}, Loss = {loss.numpy():.4f}")

    # 训练完成后 encoder 就能提取“更有意义”的 embedding 表示
    feats = encoder.predict(X[:5])
    print("Sample embeddings shape:", feats.shape)


2025-08-19 00:48:43.398826: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-08-19 00:48:43.400094: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-08-19 00:48:43.426403: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-08-19 00:48:43.426988: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Epoch 1, Loss = 5.5613
Epoch 2, Loss = 5.1600
Epoch 3, Loss = 4.6934
Epoch 4, Loss = 4.1651
Epoch 5, Loss = 3.5883
Epoch 6, Loss = 2.9919
Epoch 7, Loss = 2.4253
Epoch 8, Loss = 1.9470
Epoch 9, Loss = 1.5970
Epoch 10, Loss = 1.3774
Sample embeddings shape: (5, 64)
