In [30]:
import os
import random
import math
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from glob import glob
from sklearn.model_selection import train_test_split
from tqdm import tqdm


import tensorflow as tf
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from PIL import Image

In [22]:
# Cell 2 — Imports & GPU check (UPDATED)
import os
import random
import math
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from glob import glob
from sklearn.model_selection import train_test_split
from tqdm import tqdm

import tensorflow as tf
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from PIL import Image

print('TensorFlow:', tf.__version__)
print('GPUs:', tf.config.list_physical_devices('GPU'))

# Force GPU configuration
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        # Set memory growth and force GPU usage
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        # Set logical device configuration
        tf.config.set_visible_devices(gpus[0], 'GPU')
        print(f'Using GPU: {gpus[0]}')
        print('GPU memory growth enabled')
    except RuntimeError as e:
        print('GPU configuration error:', e)
else:
    print('No GPU found, using CPU')

TensorFlow: 2.10.1
GPUs: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
Using GPU: PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')
GPU memory growth enabled


In [23]:
DATASET_PATH = r"D:/Final_Semester_Project/AI_Attendance_System/AI_And_ML_Model/DataSets/RecognizeAgumented"
OUT_DIR = Path("runs/siamese_tf")
OUT_DIR.mkdir(parents=True, exist_ok=True)


IMG_SIZE = (224, 224) # HxW
EMBED_DIM = 128
BATCH_SIZE = 16
EPOCHS = 80
STEPS_PER_EPOCH = 100
VAL_STEPS = 60
LEARNING_RATE = 2e-4
MARGIN = 0.3


# identity-level split ratios
TEST_RATIO = 0.15
VAL_RATIO = 0.15


MODEL_BEST = str(OUT_DIR / 'best_embedding_model.h5')
MODEL_FINAL = str(OUT_DIR / 'final_embedding_model.h5')
print('Config loaded')

Config loaded


In [24]:
root = Path(DATASET_PATH)
if not root.exists():
    raise SystemExit(f"Dataset path not found: {DATASET_PATH}")


persons = [p for p in sorted(root.iterdir()) if p.is_dir()]
print('Found total folders:', len(persons))


person_to_images = {}
for p in persons:
    imgs = [str(x) for x in p.glob('*') if x.suffix.lower() in {'.jpg','.jpeg','.png'}]
    if len(imgs) >= 2: # need >=2 images for triplet/pair training
        person_to_images[p.name] = imgs


print('Usable persons (>=2 images):', len(person_to_images))
if len(person_to_images) < 2:
    raise SystemExit('Need at least 2 persons with >=2 images each')

Found total folders: 58
Usable persons (>=2 images): 58


In [25]:
all_persons = sorted(list(person_to_images.keys()))
train_and_val, test_persons = train_test_split(all_persons, test_size=TEST_RATIO, random_state=42)
train_persons, val_persons = train_test_split(train_and_val, test_size=VAL_RATIO/(1-TEST_RATIO), random_state=42)


train_dict = {p: person_to_images[p] for p in train_persons}
val_dict = {p: person_to_images[p] for p in val_persons}
test_dict = {p: person_to_images[p] for p in test_persons}


print('Split sizes — train persons:', len(train_dict), 'val persons:', len(val_dict), 'test persons:', len(test_dict))

Split sizes — train persons: 40 val persons: 9 test persons: 9


In [26]:
from tensorflow.keras.utils import Sequence

class TripletSequence(Sequence):
    def __init__(self, person_dict, batch_size=32, augment=False):
        self.persons = [p for p in person_dict.keys()]
        self.person_dict = person_dict
        self.batch_size = batch_size
        self.augment = augment
        if len(self.persons) == 0:
            raise ValueError('No persons available in provided dict')

    def __len__(self):
        return max(100, math.ceil((len(self.persons) * 10) / self.batch_size))

    def __getitem__(self, idx):
        A = np.zeros((self.batch_size, IMG_H, IMG_W, 3), dtype=np.float32)
        P = np.zeros_like(A)
        N = np.zeros_like(A)
        for i in range(self.batch_size):
            anchor_person = random.choice(self.persons)
            imgs = self.person_dict[anchor_person]
            a_path, p_path = random.sample(imgs, 2)
            neg_person = random.choice([x for x in self.persons if x != anchor_person])
            n_path = random.choice(self.person_dict[neg_person])

            a = safe_load(a_path); p = safe_load(p_path); n = safe_load(n_path)
            # fallback if any None (rare due to earlier cleanup)
            if a is None or p is None:
                a = safe_load(random.choice(self.person_dict[anchor_person]))
            if n is None:
                n = safe_load(random.choice(self.person_dict[neg_person]))

            # simple augmentation
            if self.augment:
                if random.random() < 0.5:
                    a = np.fliplr(a).copy(); p = np.fliplr(p).copy()
                if random.random() < 0.3:
                    a = np.clip(a * (0.9 + 0.2 * random.random()), 0, 1)
            A[i] = a; P[i] = p; N[i] = n
        # Keras expects (x, y). y is dummy because loss uses embeddings only.
        return [A, P, N], np.zeros((self.batch_size, 1), dtype=np.float32)


In [27]:
# Cell 8 — Build embedding model (UPDATED for GPU optimization)
from tensorflow.keras.applications import MobileNetV2

def build_embedding_model(embed_dim=EMBED_DIM, trainable_backbone=False):
    # Use mixed precision for better GPU performance
    policy = tf.keras.mixed_precision.Policy('mixed_float16')
    tf.keras.mixed_precision.set_global_policy(policy)
    
    with tf.device('/GPU:0' if tf.config.list_physical_devices('GPU') else '/CPU:0'):
        backbone = MobileNetV2(
            input_shape=(IMG_H, IMG_W, 3), 
            include_top=False, 
            pooling='avg', 
            weights='imagenet'
        )
        backbone.trainable = trainable_backbone
        
        inp = Input(shape=(IMG_H, IMG_W, 3), name='input')
        x = backbone(inp, training=False)
        # Use float32 for embedding layer to maintain precision
        x = tf.keras.layers.Dense(512, activation='relu', dtype='float32')(x)
        x = tf.keras.layers.Dense(embed_dim, dtype='float32')(x)
        x = tf.keras.layers.Lambda(lambda t: tf.math.l2_normalize(t, axis=1), dtype='float32')(x)
        
        return Model(inp, x, name='EmbeddingModel')

def build_siamese_triplet_model(embedding_model):
    with tf.device('/GPU:0' if tf.config.list_physical_devices('GPU') else '/CPU:0'):
        a_in = Input(shape=(IMG_H, IMG_W, 3), name='anchor')
        p_in = Input(shape=(IMG_H, IMG_W, 3), name='positive')
        n_in = Input(shape=(IMG_H, IMG_W, 3), name='negative')
        
        a_e = embedding_model(a_in)
        p_e = embedding_model(p_in)
        n_e = embedding_model(n_in)
        
        out = layers.Concatenate(axis=1)([
            tf.expand_dims(a_e, 1), 
            tf.expand_dims(p_e, 1), 
            tf.expand_dims(n_e, 1)
        ])
        return Model([a_in, p_in, n_in], out, name='siamese_triplet')

print('Building models on GPU...')
embedding_model = build_embedding_model(embed_dim=EMBED_DIM, trainable_backbone=False)
siamese_model = build_siamese_triplet_model(embedding_model)

# Custom triplet loss with GPU optimization
@tf.function
def triplet_loss(y_true, y_pred):
    anchor = y_pred[:, 0, :]
    positive = y_pred[:, 1, :]
    negative = y_pred[:, 2, :]
    
    pos_dist = tf.reduce_sum(tf.square(anchor - positive), axis=1)
    neg_dist = tf.reduce_sum(tf.square(anchor - negative), axis=1)
    basic_loss = pos_dist - neg_dist + MARGIN
    loss = tf.reduce_mean(tf.maximum(basic_loss, 0.0))
    return loss

# Use GPU-optimized Adam optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
siamese_model.compile(optimizer=optimizer, loss=triplet_loss)

# Verify GPU is being used properly
print("GPU devices available:", tf.config.list_physical_devices('GPU'))

# Test GPU placement with a small prediction
print("\nTesting GPU placement with sample prediction...")
test_input = [np.random.rand(2, IMG_H, IMG_W, 3).astype(np.float32) for _ in range(3)]

# Force execution on GPU
with tf.device('/GPU:0' if tf.config.list_physical_devices('GPU') else '/CPU:0'):
    test_output = siamese_model.predict(test_input, verbose=0)
    print("Sample prediction completed successfully!")
    
    # Check if tensors are on GPU
    if hasattr(test_output, 'device'):
        print(f"Output tensor device: {test_output.device}")
    else:
        print("Output generated - GPU usage confirmed by performance")

print(f"Output shape: {test_output.shape}")

# Alternative way to check GPU usage
print("\nChecking GPU memory usage...")
if tf.config.list_physical_devices('GPU'):
    gpu_devices = tf.config.experimental.list_logical_devices('GPU')
    print(f"Logical GPU devices: {gpu_devices}")
    
    # Check memory info
    try:
        memory_info = tf.config.experimental.get_memory_info('GPU:0')
        print(f"GPU memory - Current: {memory_info['current'] / 1e9:.2f}GB, Peak: {memory_info['peak'] / 1e9:.2f}GB")
    except Exception as e:
        print(f"Memory info not available: {e}")

siamese_model.summary()

Building models on GPU...
GPU devices available: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

Testing GPU placement with sample prediction...
Sample prediction completed successfully!
Output generated - GPU usage confirmed by performance
Output shape: (2, 3, 128)

Checking GPU memory usage...
Logical GPU devices: [LogicalDevice(name='/device:GPU:0', device_type='GPU')]
GPU memory - Current: 0.07GB, Peak: 1.14GB
Model: "siamese_triplet"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 anchor (InputLayer)            [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 positive (InputLayer)          [(None, 22

In [28]:
train_seq = TripletSequence(train_dict, batch_size=BATCH_SIZE, augment=True)
val_seq   = TripletSequence(val_dict, batch_size=BATCH_SIZE, augment=False)
print('Train batches:', len(train_seq), 'Val batches:', len(val_seq))

Train batches: 100 Val batches: 100


In [29]:
callbacks = [
    ModelCheckpoint(MODEL_BEST, monitor='val_loss', save_best_only=True, verbose=1, save_weights_only=False),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-7, verbose=1),
    EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True, verbose=1)
]


In [20]:
# Cell 11 — Train (UPDATED with proper GPU monitoring)
# Calculate appropriate steps based on actual dataset size
train_steps = min(STEPS_PER_EPOCH, len(train_seq))
val_steps = min(VAL_STEPS, len(val_seq))

print(f'Train steps: {train_steps}, Val steps: {val_steps}')

# Simple GPU monitoring callback
class GPUMonitor(tf.keras.callbacks.Callback):
    def on_epoch_begin(self, epoch, logs=None):
        if tf.config.list_physical_devices('GPU'):
            try:
                gpu_stats = tf.config.experimental.get_memory_info('GPU:0')
                print(f"GPU memory - Current: {gpu_stats['current'] / 1e9:.2f}GB, Peak: {gpu_stats['peak'] / 1e9:.2f}GB")
            except:
                print("GPU memory stats not available")

callbacks = [
    ModelCheckpoint(MODEL_BEST, monitor='val_loss', save_best_only=True, verbose=1, save_weights_only=False),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-7, verbose=1),
    EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True, verbose=1),
    GPUMonitor()
]

print("Starting training..." + (" on GPU" if tf.config.list_physical_devices('GPU') else " on CPU"))

# Force GPU training
with tf.device('/GPU:0' if tf.config.list_physical_devices('GPU') else '/CPU:0'):
    history = siamese_model.fit(
        train_seq,
        validation_data=val_seq,
        epochs=EPOCHS,
        steps_per_epoch=train_steps,
        validation_steps=val_steps,
        callbacks=callbacks,
        verbose=1
    )

# Save final embedding model
embedding_model.save(MODEL_FINAL)
print('Saved final embedding model to', MODEL_FINAL)

Train steps: 100, Val steps: 60
Starting training... on GPU
GPU memory - Current: 0.06GB, Peak: 1.14GB
Epoch 1/40
Epoch 1: val_loss improved from inf to 0.19115, saving model to runs\siamese_tf\best_embedding_model.h5
GPU memory - Current: 0.07GB, Peak: 1.14GB
Epoch 2/40
Epoch 2: val_loss improved from 0.19115 to 0.18045, saving model to runs\siamese_tf\best_embedding_model.h5
GPU memory - Current: 0.07GB, Peak: 1.14GB
Epoch 3/40
Epoch 3: val_loss improved from 0.18045 to 0.17941, saving model to runs\siamese_tf\best_embedding_model.h5
GPU memory - Current: 0.07GB, Peak: 1.14GB
Epoch 4/40
Epoch 4: val_loss improved from 0.17941 to 0.17254, saving model to runs\siamese_tf\best_embedding_model.h5
GPU memory - Current: 0.07GB, Peak: 1.14GB
Epoch 5/40
Epoch 5: val_loss did not improve from 0.17254
GPU memory - Current: 0.07GB, Peak: 1.14GB
Epoch 6/40
Epoch 6: val_loss did not improve from 0.17254
GPU memory - Current: 0.07GB, Peak: 1.14GB
Epoch 7/40
Epoch 7: val_loss did not improve from 0

In [None]:
plt.figure(figsize=(10,4))
plt.plot(history.history['loss'], label='train_loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend()
plt.title('Training Loss')
plt.grid(True)
plt.show()

In [None]:
from tensorflow.keras.models import load_model
# load saved embedding model - note: saved model is the embedding network
# Load best embedding model file (if ModelCheckpoint saved full model). If not, load MODEL_FINAL
try:
    emb = load_model(MODEL_BEST, compile=False)
except Exception:
    emb = load_model(MODEL_FINAL, compile=False)

print('Embedding model loaded for DB creation')

ref_db = {}
for person, imgs in train_dict.items():
    embs = []
    for f in imgs:
        a = safe_load(f)
        if a is None: continue
        e = emb.predict(np.expand_dims(a,0), verbose=0)[0]
        embs.append(e)
    if len(embs) == 0: continue
    mean = np.mean(np.stack(embs, axis=0), axis=0)
    mean = mean / (np.linalg.norm(mean) + 1e-10)
    ref_db[person] = mean

print('Reference DB size:', len(ref_db))

In [None]:
# Cell 14 — Recognition helper
import math

def identify_face_from_crop_rgb(crop_rgb, ref_db, threshold=0.7):
    # crop_rgb: numpy HxWx3 in uint8 or float [0..1]
    if crop_rgb.dtype != np.float32:
        crop = Image.fromarray(crop_rgb).resize((IMG_W, IMG_H))
        arr = np.asarray(crop, dtype=np.float32) / 255.0
    else:
        arr = crop_rgb if crop_rgb.max() <= 1.0 else crop_rgb / 255.0
        arr = np.asarray(Image.fromarray((arr*255).astype('uint8')).resize((IMG_W, IMG_H)), dtype=np.float32)/255.0
    e = emb.predict(np.expand_dims(arr,0), verbose=0)[0]
    e = e / (np.linalg.norm(e)+1e-10)
    best_name, best_dist = 'Unknown', 1e9
    for name, ref in ref_db.items():
        d = np.linalg.norm(e - ref)
        if d < best_dist:
            best_dist = d; best_name = name
    if best_dist > threshold:
        return 'Unknown', best_dist
    return best_name, best_dist

In [None]:
# Cell 15 — Quick evaluation on test split (verification accuracy)
# Generate verification pairs from test_dict

def gen_pairs(person_dict, n_pairs=500):
    pairs, labels = [], []
    persons = list(person_dict.keys())
    for _ in range(n_pairs):
        if random.random() < 0.5:
            p = random.choice(persons)
            imgs = person_dict[p]
            if len(imgs) < 2: continue
            a,b = random.sample(imgs,2)
            pairs.append((a,b)); labels.append(1)
        else:
            p1,p2 = random.sample(persons,2)
            a = random.choice(person_dict[p1]); b = random.choice(person_dict[p2])
            pairs.append((a,b)); labels.append(0)
    return pairs, labels

pairs, labels = gen_pairs(test_dict, n_pairs=500)
correct = 0
for (a,b), lab in zip(pairs, labels):
    aa = safe_load(a); bb = safe_load(b)
    if aa is None or bb is None: continue
    ea = emb.predict(np.expand_dims(aa,0), verbose=0)[0]
    eb = emb.predict(np.expand_dims(bb,0), verbose=0)[0]
    d = np.linalg.norm(ea - eb)
    pred = 1 if d < 0.7 else 0
    if pred == lab: correct += 1
acc = correct / len(labels)
print(f'Verification accuracy (threshold=0.7): {acc:.4f}')

In [None]:
import json
out_db = {k: v.tolist() for k,v in ref_db.items()}
with open(OUT_DIR / 'embedding_db.json', 'w') as f:
    json.dump(out_db, f)
print('Saved embedding DB to', OUT_DIR / 'embedding_db.json')