# Import Library

In [25]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

# Load MNIST Dataset

In [26]:
# Load MNIST dataset
mnist = fetch_openml('mnist_784', version=1)
X = mnist['data'].values
y = mnist['target'].astype(np.int32).values

# Normalize the data
X = X / 255.0

# Binarize the labels
y = (y == 0).astype(np.int32)

# Split into training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Ensure the data is in the right shape for MLP (N, D) where N is number of samples, D is number of features
X_train = X_train.reshape(X_train.shape[0], -1)
X_test = X_test.reshape(X_test.shape[0], -1)

  warn(


In [27]:
# Shuffle data
permutation = np.random.permutation(X_train.shape[0])
X_train = X_train[permutation][:1000]
y_train = y_train[permutation][:1000]

X_test = X_test[:100]
y_test = y_test[:100]

# MLP Model

## Triplet Loss

In [28]:
def triplet_loss(anchor, positive, negative, margin=1.0):
    """
    Tính toán Triplet Loss.

    Parameters:
    - anchor: np.ndarray, vector đặc trưng của anchor.
    - positive: np.ndarray, vector đặc trưng của positive.
    - negative: np.ndarray, vector đặc trưng của negative.
    - margin: float, margin để tính toán loss.

    Returns:
    - loss: float, giá trị của triplet loss.
    """
    # Tính toán khoảng cách bình phương giữa anchor và positive
    pos_dist = np.sum(np.square(anchor - positive), axis=-1)

    # Tính toán khoảng cách bình phương giữa anchor và negative
    neg_dist = np.sum(np.square(anchor - negative), axis=-1)

    # Tính toán Triplet Loss
    loss = np.maximum(0, pos_dist - neg_dist + margin)

    return loss

## Architecture

In [29]:
import numpy as np

class MLP:
    def __init__(self, input_size, hidden_size, output_size):
        self.W1 = np.random.randn(input_size, hidden_size) * 0.01
        self.b1 = np.zeros((1, hidden_size))
        self.W2 = np.random.randn(hidden_size, output_size) * 0.01
        self.b2 = np.zeros((1, output_size))

    def relu(self, z):
        return np.maximum(0, z)

    def forward(self, X):
        self.z1 = np.dot(X, self.W1) + self.b1
        self.a1 = self.relu(self.z1)
        self.z2 = np.dot(self.a1, self.W2) + self.b2
        return self.z2

    def compute_loss(self, anchor, positive, negative, alpha=0.2):
        anchor_output = self.forward(anchor)
        positive_output = self.forward(positive)
        negative_output = self.forward(negative)

        # Ensure there are no NaN values in outputs
        if np.isnan(anchor_output).any() or np.isnan(positive_output).any() or np.isnan(negative_output).any():
            print("NaN detected in forward pass outputs")

        loss = triplet_loss(anchor_output, positive_output, negative_output, alpha)
        return loss

    def backward(self, anchor, positive, negative, alpha=0.2, learning_rate=0.01):
        # Forward pass
        anchor_output = self.forward(anchor)
        positive_output = self.forward(positive)
        negative_output = self.forward(negative)

        # Calculate gradients (simple backpropagation)
        pos_dist = 2 * (anchor_output - positive_output)
        neg_dist = 2 * (anchor_output - negative_output)

        dloss_da = pos_dist - neg_dist
        dloss_dp = -pos_dist
        dloss_dn = neg_dist

        # Ensure there are no NaN values in gradients
        if np.isnan(dloss_da).any() or np.isnan(dloss_dp).any() or np.isnan(dloss_dn).any():
            print("NaN detected in gradients")

        # Update weights and biases (simplified gradient descent)
        self.W2 -= learning_rate * np.dot(self.a1.T, dloss_da)
        self.b2 -= learning_rate * np.sum(dloss_da, axis=0, keepdims=True)

        dW1_a = np.dot(anchor.T, np.dot(dloss_da, self.W2.T) * (self.z1 > 0))
        db1_a = np.sum(np.dot(dloss_da, self.W2.T) * (self.z1 > 0), axis=0, keepdims=True)

        dW1_p = np.dot(positive.T, np.dot(dloss_dp, self.W2.T) * (self.z1 > 0))
        db1_p = np.sum(np.dot(dloss_dp, self.W2.T) * (self.z1 > 0), axis=0, keepdims=True)

        dW1_n = np.dot(negative.T, np.dot(dloss_dn, self.W2.T) * (self.z1 > 0))
        db1_n = np.sum(np.dot(dloss_dn, self.W2.T) * (self.z1 > 0), axis=0, keepdims=True)

        self.W1 -= learning_rate * (dW1_a + dW1_p + dW1_n)
        self.b1 -= learning_rate * (db1_a + db1_p + db1_n)

# Train

In [30]:
input_size = 28 * 28
hidden_size = 128
output_size = 64
model = MLP(input_size, hidden_size, output_size)

In [31]:
learning_rate = 0.01
num_epochs = 10
batch_size = 32

In [32]:
# Train
for epoch in range(num_epochs):
    LOSS = []
    for i in range(0, X_train.shape[0], batch_size):
        # Ensure the batch size is consistent
        end = i + batch_size
        if end > X_train.shape[0]:
            break

        anchor_batch = X_train[i:end]
        positive_batch = X_train[i:end]
        negative_batch = X_train[(i+batch_size) % X_train.shape[0]: (i+2*batch_size) % X_train.shape[0]]

        if len(negative_batch) < len(anchor_batch):
            continue

        loss = model.compute_loss(anchor_batch, positive_batch, negative_batch)
        LOSS.append(loss)
        model.backward(anchor_batch, positive_batch, negative_batch, learning_rate=learning_rate)

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {np.mean(LOSS)}')

Epoch 1/10, Loss: 0.19589026263522016
Epoch 2/10, Loss: 0.1958584359238322
Epoch 3/10, Loss: 0.1970650692754378
Epoch 4/10, Loss: 0.19775026682885705
Epoch 5/10, Loss: 0.19811557494455567
Epoch 6/10, Loss: 0.19836189140650695
Epoch 7/10, Loss: 0.19854594905470904
Epoch 8/10, Loss: 0.1986925931264116
Epoch 9/10, Loss: 0.19881436013012715
Epoch 10/10, Loss: 0.19891933055689073


# Inference & Evaluate

In [33]:
anchor_test = X_test[:batch_size]
positive_test = X_test[:batch_size]
negative_test = X_test[batch_size:2*batch_size]

In [34]:
test_loss = model.compute_loss(anchor_test, positive_test, negative_test)