In [None]:
%load_ext autoreload
%autoreload 2

import os
from typing import Tuple, List, Optional
import torch

import matplotlib.pyplot as plt

from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torch import nn
from torchvision import transforms
from torchsummary import summary

from tqdm import tqdm

# import tensorflow as tf
# from tensorflow import keras
# from tensorflow.keras.datasets import mnist
# from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense,Input,BatchNormalization,Layer
# from tensorflow.keras import regularizers
# import  tensorflow.keras.backend  as K
# import tensorflow as tf
# from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization, Dropout, Flatten, Dense
# from tensorflow.keras.models import Model
# from tensorflow.keras.optimizers import Adam
# from tensorflow.keras.callbacks import ModelCheckpoint
# import math
# import plotly.express as px

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

device

In [11]:
class FoodDataset(Dataset):
    def __init__(self,image_paths: List[str],classes: List[str],root_dir:str,transforms:Optional[transforms.Compose]=None):

        self.image_paths = [os.path.join(root_dir, image_path)+".jpg" for image_path in image_paths]
        self.labels = [image_path.split("/")[0] for image_path in image_paths]
        self.classes = classes
        self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(self.classes)}
        self.transforms = transforms

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):

        image = Image.open(self.image_paths[idx]).convert('RGB')
        label = self.labels[idx]

        if self.transforms:
            image = self.transforms(image)
        
        return image, self.class_to_idx[label]


transforms = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor()])

In [None]:
#

root_dir = "data/food-101/images"
number_of_classes = 3
batch_size = 2

classes = [i.strip() for i in open("data/food-101/meta/classes.txt","r").readlines()]
print(classes[:2])

train_image_paths = [i.strip() for i in open("data/food-101/meta/train.txt","r").readlines()]
print(train_image_paths[:2])

test_image_paths = [i.strip() for i in open("data/food-101/meta/test.txt","r").readlines()]

print(f"train image count: {len(train_image_paths)} test image count: {len(test_image_paths)}")


In [None]:
cl = [i.split("/")[0] for i in train_image_paths]

chosen_class = list(set(cl))[:number_of_classes]
classes = chosen_class
train_image_paths = [i for i in train_image_paths if i.split("/")[0] in chosen_class]
test_image_paths = [i for i in test_image_paths if i.split("/")[0] in chosen_class]

len(train_image_paths), len(test_image_paths)


In [14]:
train_dataset = FoodDataset(image_paths=train_image_paths,classes=classes,root_dir=root_dir,transforms=transforms)
test_dataset = FoodDataset(image_paths=test_image_paths,classes=classes,root_dir=root_dir,transforms=transforms)

train_dataloader = DataLoader(train_dataset,batch_size=batch_size,shuffle=True,num_workers=0)
test_dataloader = DataLoader(test_dataset,batch_size=batch_size,shuffle=True,num_workers=0)


In [None]:
train_dataset.class_to_idx

In [None]:
img = train_dataset[0][0].permute(1, 2, 0)
plt.imshow(img)

In [None]:
class VisionModel(nn.Module):
    
    def __init__(self, num_classes=3):
        super().__init__()
        
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv4 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256,512),#(128 * 28 * 28, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        
       

    def forward(self, x):
        # Apply convolutional blocks
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.global_pool(x)
        
        # Flatten the output for the fully connected layer
     
        
        # Apply fully connected layers with ReLU and dropout
        x = self.fc(x) 
        return x

model = VisionModel(num_classes=number_of_classes)
model.to(device)
#out = model(torch.randn(1,3,224,224))
#out.shape

summary(
    model,input_size=(3,224,224),batch_size=1
)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:

train_loss =0
train_acc = 0

for epoch in range(10):
    # train step
    model.train()
    
    train_loss = 0
    train_acc = 0
    for step, (X, y) in  tqdm(enumerate(train_dataloader),total=len(train_dataloader)):
        X = X.to(device)
        y = y.to(device)
        #break

        y_pred = model(X)
        loss = loss_fn(y_pred, y)
        optimizer.zero_grad()       # 2
        loss.backward()             # 3
        optimizer.step()            # 4
        train_loss += loss.item()
        train_acc += (y_pred.argmax(1) == y).sum().item()

    
    # val step
    model.eval()
    val_loss = 0 
    val_acc = 0
    for step, (X, y) in  tqdm(enumerate(test_dataloader),total=len(test_dataloader)):
        X = X.to(device)
        y = y.to(device)
        with torch.no_grad():
            y_pred = model(X)
            loss = loss_fn(y_pred, y)
            val_loss += loss.item()
            val_acc += (y_pred.argmax(1) == y).sum().item()

    train_loss /= len(train_dataloader)
    train_acc /= len(train_dataset)
    val_loss /= len(test_dataloader)
    val_acc /= len(test_dataset)

    print(f"epoch: {epoch}, train_loss: {train_loss:.3f}, train_acc: {train_acc:.3f},val_loss: {val_loss:.3f}, val_acc: {val_acc:.3f}")
    
    
    #    break
    #break    

In [None]:

batch_size = 8
img_size = (224,224)
class_size = len(os.listdir("data/min_celeb_face"))
class_size

In [None]:
# Prepare the data
# Example: (uncomment and replace with actual data loading)
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train[..., tf.newaxis].astype('float32') / 255.0
x_test = x_test[..., tf.newaxis].astype('float32') / 255.0
y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)


In [None]:
# import tensorflow as tf
# from tensorflow import keras



# # Load datasets
# train_ds = keras.utils.image_dataset_from_directory(
#     "data/min_celeb_face",
#     validation_split=0.2,
#     subset="training",
#     seed=123,
#     image_size=img_size,
#     batch_size=batch_size
# )

# val_ds = keras.utils.image_dataset_from_directory(
#     "data/min_celeb_face",
#     validation_split=0.2,
#     subset="validation",
#     seed=123,
#     image_size=img_size,
#     batch_size=batch_size
# )

# # Preprocess datasets
# def preprocess_dataset(dataset):
#     def preprocess(images, labels):
#         labels = tf.one_hot(labels, depth=class_size)
#         return ((images,labels), labels)
#     return dataset.map(preprocess)

# train_ds = preprocess_dataset(train_ds)
# val_ds = preprocess_dataset(val_ds)

# # Example to iterate through the dataset and check shapes
# for inputs, labels in train_ds.take(1):
#     print(inputs[0].shape)
#     print(inputs[1].shape)
#     print(labels.shape)
""

In [None]:
class ArcFace(Layer):
    def __init__(self, n_classes=10, s=64.0, m=0.5, regularizer=None, **kwargs):
        super(ArcFace, self).__init__(**kwargs)
        self.n_classes = n_classes
        self.s = s
        self.m = m
        self.regularizer = regularizers.get(regularizer)
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def build(self, input_shape):
        super(ArcFace, self).build(input_shape[0])
        self.W = self.add_weight(name='W',
                                 shape=(input_shape[0][-1], self.n_classes),
                                 initializer='glorot_uniform',
                                 trainable=True,
                                 regularizer=self.regularizer)

    def call(self, inputs):
        x, y = inputs
        # Normalize feature
        x = tf.nn.l2_normalize(x, axis=1)
        # Normalize weights
        W = tf.nn.l2_normalize(self.W, axis=0)
        # Dot product
        logits = tf.matmul(x, W)
        # Add margin
        theta = tf.acos(K.clip(logits, -1.0 + K.epsilon(), 1.0 - K.epsilon()))
        target_logits = tf.cos(theta + self.m)
        
        # Replace logits for the target classes with the margin-adjusted logits
        logits = y * target_logits + (1 - y) * logits

        # Feature re-scale
        logits *= self.s
        return tf.nn.softmax(logits)

    def compute_output_shape(self, input_shape):
        return (input_shape[0][0], self.n_classes)

    def get_config(self):
        config = super(ArcFace, self).get_config()
        config.update({
            'n_classes': self.n_classes,
            's': self.s,
            'm': self.m,
            'regularizer': self.regularizer
        })
        return config


class CosFace(Layer):
    def __init__(self, n_classes=10, s=64.0, m=0.35, regularizer=None, **kwargs):
        super(CosFace, self).__init__(**kwargs)
        self.n_classes = n_classes
        self.s = s
        self.m = m
        self.regularizer = regularizers.get(regularizer)
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def build(self, input_shape):
        super(CosFace, self).build(input_shape[0])
        self.W = self.add_weight(name='W',
                                 shape=(input_shape[0][-1], self.n_classes),
                                 initializer='glorot_uniform',
                                 trainable=True,
                                 regularizer=self.regularizer)

    def call(self, inputs):
        x, y = inputs
        # Normalize feature
        x = tf.nn.l2_normalize(x, axis=1)
        # Normalize weights
        W = tf.nn.l2_normalize(self.W, axis=0)
        # Dot product
        logits = tf.matmul(x, W)
        # Add margin
        target_logits = logits * (1 - y) + (self.th * y + self.mm) * y
        # Feature re-scale
        logits *= self.s
        out = tf.nn.softmax(logits)

        return out

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization, Dropout, Flatten, Dense, Activation
from tensorflow.keras.models import Model
from tensorflow.keras import regularizers
from tensorflow.keras import backend as K
import numpy as np


# Define a simple model
def simple_model(input_shape = (224,224,3)):
    input_layer = Input(shape=input_shape)
    label_layer = Input(shape=(class_size,))

    x = Conv2D(32, kernel_size=(3, 3), activation='relu')(input_layer)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Conv2D(64, kernel_size=(3, 3), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    x = Flatten()(x)
    x = Dense(512, kernel_initializer='he_normal')(x)
    x = BatchNormalization()(x)
    output_layer = ArcFace(n_classes=class_size)([x, label_layer])

    model = Model([input_layer, label_layer], output_layer)

    return model

# Define the VGG-like model
def vgg_block(x, num_filters, num_convs):
    for _ in range(num_convs):
        x = Conv2D(num_filters, kernel_size=(3, 3), padding='same')(x)
        x = BatchNormalization()(x)
        x = Activation("relu")(x)
    return x

def vgg8_arcface(args,input_shape=(224,224,3)):
    inputs = Input(shape=input_shape)
    y = Input(shape=(class_size,))

    x = vgg_block(inputs, 16, 2)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = vgg_block(x, 64, 2)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = vgg_block(x, 128, 2)
    x = vgg_block(x, 256, 2)
    
    x =  keras.layers.GlobalAveragePooling2D()(x)

    #x = MaxPooling2D(pool_size=(2, 2))(x)

    # x = BatchNormalization()(x)
    #x = Dropout(0.5)(x)
    x = Flatten()(x)
    x = Dense(args.num_features, kernel_initializer='he_normal', kernel_regularizer=regularizers.l2(args.weight_decay))(x)
    x = BatchNormalization()(x)
    output = ArcFace(n_classes=class_size, regularizer=regularizers.l2(args.weight_decay))([x, y])

    return Model([inputs, y], output)

In [None]:
### Arguments
class Args:
    num_features = 3
    weight_decay = 1e-4

args = Args()


tf.keras.backend.clear_session()
# Create the model
model = vgg8_arcface(args)
#model = simple_model()


# Compile the model
model.compile(loss='categorical_crossentropy',
              optimizer=tf.keras.optimizers.Adam(),
              metrics=['accuracy'])

# Assuming x_train, y_train, x_test, and y_test are your datasets
# Ensure y_train and y_test are one-hot encoded

# Train the model
model.compile(optimizer=Adam(learning_rate=0.01), loss='categorical_crossentropy', metrics=['accuracy'])

# Create a checkpoint callback
checkpoint = ModelCheckpoint('best_model.h5', save_best_only=True, monitor='val_loss', mode='min')

model.fit(train_ds, validation_data=val_ds, epochs=20, callbacks=[checkpoint])

# model.fit([x_train, y_train],
#           y_train,
#           batch_size=32,
#           epochs=10,
#           verbose=1,
#           validation_data=([x_test, y_test], y_test),
#           callbacks=[tf.keras.callbacks.ModelCheckpoint('model.hdf5', verbose=1, save_best_only=True)])

In [None]:
feature_extractor = tf.keras.Model(inputs=model.input, outputs=model.get_layer('dense').output)

In [None]:
# Extract features
image_features = []
labels = []

for i,t in val_ds.take(len(val_ds)):

    features = feature_extractor([i[0], i[1]])
    image_features.append(features.numpy())
    labels.extend(np.argmax(t, axis=1))

image_features = np.concatenate(image_features, axis=0)
labels = np.array(labels)

tsne = TSNE(n_components=3, verbose=1, perplexity=40, n_iter=2500)
tsne_results = tsne.fit_transform(image_features)




In [None]:
### Extract Features and Perform t-SNE


# # Extract features
# image_features = []
# labels = []a

# for i in range(len(x_test[:5000])):
#     features = feature_extractor([x_test[i:i+1], y_test[i:i+1]])
#     image_features.append(features.numpy())
#     labels.append(np.argmax(y_test[i]))

# image_features = np.concatenate(image_features, axis=0)
# labels = np.array(labels)

# # Perform t-SNE
# tsne = TSNE(n_components=3, verbose=1, perplexity=40, n_iter=250)
# tsne_results = tsne.fit_transform(image_features)



In [None]:
labels

In [None]:
# Plot t-SNE results
plt.figure(figsize=(10, 8))
scatter = plt.scatter(tsne_results[:, 0], tsne_results[:, 1], c=labels, cmap='jet', alpha=0.5)
plt.legend(*scatter.legend_elements(), title="Classes")
plt.title('t-SNE of image featuresq')
plt.xlabel('t-SNE feature 1')
plt.ylabel('t-SNE feature 2')
plt.show()# Plot t-SNE results


In [None]:
px.scatter_3d(x=tsne_results[:, 0], y=tsne_results[:, 1], z=tsne_results[:, 2], color=labels, title='t-SNE of image features')

In [None]:
fig = px.scatter(x=tsne_results[:, 0], y=tsne_results[:, 1], color=labels)
fig.show()