In [39]:
import cv2
import numpy as np
import os
import glob
from pymongo import MongoClient
import pickle
from sklearn.model_selection import train_test_split
import tensorflow as tf

from tensorflow.keras import backend as K
K.clear_session()


# Storing into MongoDB

In [5]:
user = "admin"
password = "g41WZadbgsPmC37B"
uri = f"mongodb+srv://{user}:{password}@rpm.spzvwtw.mongodb.net"
db_name = 'RecognitivePretrainedModels'
collection = '300W'

In [11]:
try:
    connection = MongoClient(uri)
    db = connection[db_name]
    data = db[collection]
    print("Database Connected!")
except:
    print("Connection failed.")

Database Connected!


In [2]:
def serialize_image(image_path):
    with open(image_path, 'rb') as image_file:
        image_data = image_file.read()
        serialized_image = pickle.dumps(image_data)
    return serialized_image

def parse_pts(filename):
    with open(filename) as file:
        lines = file.readlines()
        start = lines.index("{\n") + 1  # Find the start of the landmarks
        landmarks = [list(map(float, lines[i].strip().split())) for i in range(start, start + 68)]
        return landmarks

In [13]:
def process_and_insert_data(directory, collection):
    for image_path in glob.glob(os.path.join(directory, "*.png")):
        pts_path = image_path.replace(".png", ".pts")

        if os.path.exists(pts_path):
            try:
                serialized_image = serialize_image(image_path)
                landmarks = parse_pts(pts_path)

                document = {
                    "image": serialized_image,
                    "landmarks": landmarks
                }

                collection.insert_one(document)
            except Exception as e:
                print(f"Error processing file {image_path}: {e}")


# Data preprocessing

In [40]:
def load_image(image_path, size=(224, 224)):
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)  # Convert to grayscale
    image = cv2.resize(image, size)  # Resize image
    image = image / 255.0  # Normalize pixel values to [0, 1]
    return image

In [41]:
def parse_pts(filename):
    with open(filename) as file:
        lines = file.readlines()
        start = lines.index("{\n") + 1  # Find the start of the landmarks
        end = lines.index("}\n", start)  # Find the end of the landmarks
        landmarks = [list(map(float, line.strip().split())) for line in lines[start:end]]
        return np.array(landmarks).reshape(-1, 2)

In [42]:
def normalize_landmarks(landmarks, image_size=(224, 224)):
    normalized_landmarks = []
    for landmark in landmarks:
        landmark = landmark.reshape(-1, 2)
        # Normalize each point individually
        norm_landmark = [[x / image_size[0], y / image_size[1]] for x, y in landmark]
        normalized_landmarks.append(norm_landmark)
    return np.array(normalized_landmarks)


In [43]:
def preprocess_data(directory):
    images = []
    landmarks = []

    for image_path in glob.glob(os.path.join(directory, "*.png")):
        image = load_image(image_path)
        pts_path = image_path.replace(".png", ".pts")
        landmark = parse_pts(pts_path)
        landmark = normalize_landmarks(landmark)

        images.append(image)
        landmarks.append(landmark)

    return np.array(images), np.array(landmarks)

In [44]:
images, landmark_annotations = preprocess_data("C:/Users/User/Desktop/Document/UWE/FYP/fyp/codes/Dataset/300W")

In [45]:
def split_data(images, landmarks, test_size=0.2):
    return train_test_split(images, landmarks, test_size=test_size, random_state=42)

In [46]:
X_train, X_test, y_train, y_test = split_data(images, landmark_annotations)

In [27]:
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size=32)

In [47]:
from tensorflow import keras
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, BatchNormalization, Input, Normalization
from tensorflow.keras.models import Model

def build_landmark_model(input_shape, output_size):
    # Preprocessing layers
    preprocess = Normalization()

    # Convolutional layers
    conv_1 = Conv2D(filters=32, kernel_size=(3, 3), activation='relu')
    conv_2 = Conv2D(filters=64, kernel_size=(3, 3), activation='relu')
    conv_3 = Conv2D(filters=64, kernel_size=(3, 3), activation='relu')
    conv_4 = Conv2D(filters=64, kernel_size=(3, 3), activation='relu')
    conv_5 = Conv2D(filters=64, kernel_size=(3, 3), activation='relu')
    conv_6 = Conv2D(filters=128, kernel_size=(3, 3), activation='relu')
    conv_7 = Conv2D(filters=128, kernel_size=(3, 3), activation='relu')
    conv_8 = Conv2D(filters=256, kernel_size=(3, 3), activation='relu')

    # Pooling layers
    pool_1 = MaxPooling2D(pool_size=(2, 2))
    pool_2 = MaxPooling2D(pool_size=(2, 2))
    pool_3 = MaxPooling2D(pool_size=(2, 2))
    pool_4 = MaxPooling2D(pool_size=(2, 2))

    # Batch normalization layers
    bn_1 = BatchNormalization()
    bn_2 = BatchNormalization()
    bn_3 = BatchNormalization()
    bn_4 = BatchNormalization()
    bn_5 = BatchNormalization()
    bn_6 = BatchNormalization()
    bn_7 = BatchNormalization()
    bn_8 = BatchNormalization()
    bn_9 = BatchNormalization()

    # Dense layers
    dense_1 = Dense(units=1024, activation='relu')
    dense_2 = Dense(units=output_size)

    # Flatten layer
    flatten = Flatten()

    # Define the model
    inputs = Input(shape=input_shape)
    x = preprocess(inputs)
    x = conv_1(x)
    x = bn_1(x)
    x = pool_1(x)
    x = conv_2(x)
    x = bn_2(x)
    x = conv_3(x)
    x = bn_3(x)
    x = pool_2(x)
    x = conv_4(x)
    x = bn_4(x)
    x = conv_5(x)
    x = bn_5(x)
    x = pool_3(x)
    x = conv_6(x)
    x = bn_6(x)
    x = conv_7(x)
    x = bn_7(x)
    x = pool_4(x)
    x = conv_8(x)
    x = bn_8(x)
    x = flatten(x)
    x = dense_1(x)
    x = bn_9(x)
    outputs = dense_2(x)

    model = Model(inputs=inputs, outputs=outputs, name="landmark_model")
    return model

# Example usage
input_shape = (224, 224, 1)  # Example input shape, adjust as needed
output_size = 136  # Example output size, adjust as needed
model = build_landmark_model(input_shape, output_size)
model.summary()  # Display the model architecture


Model: "landmark_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 224, 224, 1)]     0         
                                                                 
 normalization (Normalizati  (None, 224, 224, 1)       3         
 on)                                                             
                                                                 
 conv2d (Conv2D)             (None, 222, 222, 32)      320       
                                                                 
 batch_normalization (Batch  (None, 222, 222, 32)      128       
 Normalization)                                                  
                                                                 
 max_pooling2d (MaxPooling2  (None, 111, 111, 32)      0         
 D)                                                              
                                                    

In [48]:
model.compile(optimizer='adam', loss='mean_squared_error', metrics=['accuracy'])


In [49]:
# Reshape y_train and y_test to have 136 values per sample
y_train_flat = y_train.reshape(y_train.shape[0], -1)
y_test_flat = y_test.reshape(y_test.shape[0], -1)

# Now create the train_dataset with the reshaped y_train
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train_flat))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size=32)

# Fit the model
history = model.fit(train_dataset, epochs=50, validation_data=(X_test, y_test_flat))


Epoch 1/50
Epoch 2/50

KeyboardInterrupt: 