## Prepare tools

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import torch
from torch.autograd import Variable
import torchvision.datasets as dsets
import torchvision.transforms as transforms
import torch.nn.init
from scipy.io import loadmat
import numpy as np
import pandas as pd
from pathlib import Path
from torch.utils.data import Dataset, DataLoader

In [3]:
torch.cuda.is_available()

n_gpu = torch.cuda.device_count()

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

t = torch.cuda.get_device_properties(0).total_memory
r = torch.cuda.memory_reserved(0)
a = torch.cuda.memory_allocated(0)
f = r-a  # free inside reserved

print("Number of GPU: ", n_gpu, type(device))
print("total GPU memory: ", t, " memory reserved: ", r, "memory allocated: ", a)

Number of GPU:  1 <class 'torch.device'>
total GPU memory:  15835660288  memory reserved:  0 memory allocated:  0


## Full system implementation

### Data loader

In [16]:
class AudioFaceDataset(Dataset):
    def __init__(self, data_dir, split='train', transform=None, target_transform=None):
        self.data_dir = data_dir
        self.split = split
        self.transform = transform
        self.target_transform = target_transform
        self.all_labels = self.get_all_label_df()  # Get all labels without splitting
        self.labels = self.split_labels()  # Split the labels according to the specified split

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        row = self.labels.iloc[idx]
        label = row["label"]
        path = row["path"]
        data = self.read_mat_cnn(path)
        if self.transform:
            data = self.transform(data)
        if self.target_transform:
            label = self.target_transform(label)

        identifier = path

        return data, label, identifier

    @staticmethod
    def read_mat_cnn(file):
        data = loadmat(file)["mat_concat"]
        data_tmp = np.expand_dims(data, axis=0)
        return data_tmp.astype(np.float32)

    def list_all_mat_files(self):
        all_files = [str(x.absolute()) for x in Path(self.data_dir).glob("**/*.mat")]
        if len(all_files) < 45000:
            raise ValueError('Dataset has not been fully synced!')
        else:
            print(f"Found {len(all_files)} .mat files in {self.data_dir}")
        return all_files

    def convert_path_to_label(self, path_str):
        label_start_idx = path_str.rfind('.mat')
        face_label = path_str[label_start_idx-3]
        mask_label = path_str[label_start_idx-2]
        dist_label = path_str[label_start_idx-1]
        return "_".join([face_label, dist_label, mask_label])

    def get_all_label_df(self):
        label_dict = {}
        for file in self.list_all_mat_files():
            label = self.convert_path_to_label(file)
            label_dict[file] = label

        label_df = pd.DataFrame.from_dict(label_dict, orient="index").reset_index().rename(columns={"index": "path", 0: "label"})
        return label_df

    def split_labels(self):
        all_labels_shuffled = self.all_labels.sample(frac=1).reset_index(drop=True)  # Ensure reproducibility with random_state
        if self.split == 'train':
            return all_labels_shuffled.sample(frac=0.8)  # Use all data for training
        elif self.split == 'test':
            return all_labels_shuffled.sample(frac=0.05)  # Use 20% of the data for testing
        else:
            raise ValueError("Split must be 'train' or 'test'.")

data_dir = './drive/MyDrive/AcFace_AE/RD-Net/Dataset/samples_all'
data_test = AudioFaceDataset(data_dir, split='test')

batch_size = 64  # Specify your batch size
data_test_loader = DataLoader(dataset=data_test,
                              batch_size=batch_size,
                              shuffle=True,  # Typically, we don't need to shuffle the test data
                              num_workers=8)

print("Data loader setup complete.")

Found 45000 .mat files in ./drive/MyDrive/AcFace_AE/RD-Net/Dataset/samples_all
Data loader setup complete.




### Setup model

In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        residual = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

class RDNet(nn.Module):
    def __init__(self, num_face=2, num_dist=2, num_mask=2):
        super(RDNet, self).__init__()

        self.in_channels = 64
        self.conv1 = nn.Conv2d(1, self.in_channels, kernel_size=3, stride=2, padding=1)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace=True)

        # Adding more depth with Residual Blocks
        self.layer1 = self._make_layer(128, stride=2)
        self.layer2 = self._make_layer(256, stride=2)
        self.layer3 = self._make_layer(512, stride=2)
        self.drop = nn.Dropout(p=0.3)

        self.adaptivePool = nn.AdaptiveAvgPool2d((1, 1))

        # Increase model capacity in fully connected layers
        self.face_fc1 = nn.Linear(512, 2048)
        self.face_fc2 = nn.Linear(2048, 2048)
        self.face_fc3 = nn.Linear(2048, 1024)
        self.face_fc4 = nn.Linear(1024, 1024)
        self.face_fc5 = nn.Linear(1024, 1024)
        self.face_fc6 = nn.Linear(1024, 1024)
        self.face_fc7 = nn.Linear(1024, 1024)
        self.face_fc8 = nn.Linear(1024, 512)
        self.face_fc9 = nn.Linear(512, 512)
        self.face_fc10 = nn.Linear(512, num_face)

        self.dist_fc1 = nn.Linear(512 + num_face, 256)
        self.dist_fc2 = nn.Linear(256, 256)
        self.dist_fc3 = nn.Linear(256, 256)
        self.dist_fc4 = nn.Linear(256, 128)
        self.dist_fc5 = nn.Linear(128, num_dist)

        self.mask_fc1 = nn.Linear(512 + num_face, 256)
        self.mask_fc2 = nn.Linear(256, 256)
        self.mask_fc3 = nn.Linear(256, 256)
        self.mask_fc4 = nn.Linear(256, 128)
        self.mask_fc5 = nn.Linear(128, num_mask)

    def _make_layer(self, out_channels, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels),
            )
        layer = ResidualBlock(self.in_channels, out_channels, stride, downsample)
        self.in_channels = out_channels
        return layer

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        x = self.drop(x)
        x = self.adaptivePool(x)
        x_cnn_output = x.view(x.size(0), -1)

        x_face = F.relu(self.face_fc1(x_cnn_output))
        x_face = F.relu(self.face_fc2(x_face))
        x_face = F.relu(self.face_fc3(x_face))
        x_face = F.relu(self.face_fc4(x_face))
        x_face = F.relu(self.face_fc5(x_face))
        x_face = F.relu(self.face_fc6(x_face))
        x_face = F.relu(self.face_fc7(x_face))
        x_face = F.relu(self.face_fc8(x_face))
        x_face = F.relu(self.face_fc9(x_face))
        x_face_output = torch.sigmoid(self.face_fc10(x_face))

        x_dist_input = torch.cat((x_cnn_output, x_face_output), 1)
        x_dist = F.relu(self.dist_fc1(x_dist_input))
        x_dist = F.relu(self.dist_fc2(x_dist))
        x_dist = F.relu(self.dist_fc3(x_dist))
        x_dist = F.relu(self.dist_fc4(x_dist))
        x_dist_output = torch.sigmoid(self.dist_fc5(x_dist))

        x_mask_input = torch.cat((x_cnn_output, x_face_output), 1)
        x_mask = F.relu(self.mask_fc1(x_mask_input))
        x_mask = F.relu(self.mask_fc2(x_mask))
        x_mask = F.relu(self.mask_fc3(x_mask))
        x_mask = F.relu(self.mask_fc4(x_mask))
        x_mask_output = torch.sigmoid(self.mask_fc5(x_mask))

        return [x_face_output, x_dist_output, x_mask_output]

model = RDNet().to(device)

# Calculate total parameters and model size in bytes
param_size = sum(p.numel() * p.element_size() for p in model.parameters())
buffer_size = sum(b.numel() * b.element_size() for b in model.buffers())
total_size = param_size + buffer_size
size_in_mb = total_size / (1024 ** 2)

# Print parameters
print(f'Total parameters: {sum(p.numel() for p in model.parameters())}')
print(f'Model size: {size_in_mb:.3f} MB')

# load model
model_load_path = './drive/MyDrive/AcFace_AE/RD-Net/Model/model_pretrained.pth'  # The path where your model is saved
model.load_state_dict(torch.load(model_load_path))

Total parameters: 17748230
Model size: 67.725 MB


<All keys matched successfully>

### Inference test




In [22]:
import torch
import numpy as np
import time

criterion = torch.nn.CrossEntropyLoss()    # Softmax is internally computed.
model.eval()

acc_list = []
cost_list = []
predictions = []
true_labels = []
inference_times = []

for i, (test_X, test_Y, sample_ids) in enumerate(data_test_loader):
    face_Y, dist_Y, mask_Y = [], [], []
    for Y_i in test_Y:
        underline_idx = Y_i.find("_")
        face_Y.append(int(Y_i[underline_idx-1]))
        dist_Y.append(int(Y_i[underline_idx+1]))
        mask_Y.append(int(Y_i[underline_idx+3]))

    X = test_X.to(device)
    face_Y = torch.LongTensor(face_Y).to(device)
    dist_Y = torch.LongTensor(dist_Y).to(device)
    mask_Y = torch.LongTensor(mask_Y).to(device)

    for j in range(len(X)):
        with torch.no_grad():
            data_in = X[j].unsqueeze(0)
            start_time = time.time()
            output = model(data_in)
            end_time = time.time()  # End time after inference
            inference_times.append(end_time - start_time)  # Calculate inference time for this sample
            # print(end_time - start_time)

            cost_face = criterion(output[0], face_Y[j].unsqueeze(0))
            cost_dist = criterion(output[1], dist_Y[j].unsqueeze(0))
            cost_mask = criterion(output[2], mask_Y[j].unsqueeze(0))
            cost = cost_face - 0.015 * cost_dist - 0.01 * cost_mask

            accuracy = (torch.max(output[0], 1)[1] == face_Y[j]).float().mean().item()

            acc_list.append(accuracy)
            cost_list.append(cost.item())

            predictions.append(torch.max(output[0], 1)[1].cpu().item())
            true_labels.append(face_Y[j].cpu().item())

            # print(f'Sample {sample_ids[j]} inference time: {time.time() - start_time:.4f} seconds')

if acc_list:  # Check if acc_list is not empty
    averaged_delay = np.mean(inference_times)
    print(f'Avg Inference Delay: {averaged_delay * 1000:.2f} ms')
    print('\nAvg Accuracy: {:2.2f} %'.format(np.mean(acc_list) * 100))
else:
    raise Exception("\nNo valid accuracy computations were performed.")

Avg Inference Delay: 5.07 ms

Avg Accuracy: 96.62 %


## Light-weight implementation

### Data loader

In [8]:
import numpy as np
import pandas as pd

from pathlib import Path
from scipy.io import loadmat
from torch.utils.data import Dataset

from sklearn.model_selection import train_test_split

class AudioFaceDataset_train(Dataset):
    def __init__(self, data_dir, transform=None, target_transform=None):
        self.classes = {
            "accept": 0,
            "reject": 1
        }
        self.data_dir = data_dir
        self.labels = self.get_label_df()
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        row = self.labels.iloc[idx]
        label = row["label"]
        path = row["path"]
        data = AudioFaceDataset_train.read_mat_cnn(path)
        if self.transform:
            data = self.transform(data)
        if self.target_transform:
            label = self.target_transform(label)
        return data, label

    @staticmethod
    def read_mat(file):
        data = loadmat(file)["fig_mat_flatten"]
        return data.transpose().ravel().astype(np.float32)

    @staticmethod
    def read_mat_cnn(file):
        data = loadmat(file)["face_sample"]
        data_tmp = np.expand_dims(data,axis=0)
        return data_tmp.astype(np.float32)

    def list_all_mat_files(self):
        return [str(x.absolute()) for x in Path(self.data_dir).glob("**/*.mat")]

    def convert_path_to_label(self, path_str):
        label_start_idx = path_str.find('.mat')
        face_label = path_str[label_start_idx-3]
        mask_label = path_str[label_start_idx-2]
        dist_label = path_str[label_start_idx-1]
        return face_label + "_" + dist_label + "_" + mask_label

    def get_label_df(self):
        label_dict = {}
        for file in self.list_all_mat_files():
            label_dict[file] = self.convert_path_to_label(file)
        label_df = pd.DataFrame.from_dict(label_dict, orient="index").reset_index().rename(columns={"index": "path",
                                                                                                    0: "label"})

        # Splitting the dataset into training and testing sets
        train_df, test_df = train_test_split(label_df, test_size=0.2, random_state=42)  # Adjust test_size as needed
        return train_df.reset_index(drop=True)  # Return only the training set

class AudioFaceDataset_test(Dataset):
    def __init__(self, data_dir, transform=None, target_transform=None):
        self.classes = {
            "accept": 0,
            "reject": 1
        }
        self.data_dir = data_dir
        self.labels = self.get_label_df()
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        row = self.labels.iloc[idx]
        label = row["label"]
        path = row["path"]
        data = AudioFaceDataset_test.read_mat_cnn(path)
        if self.transform:
            data = self.transform(data)
        if self.target_transform:
            label = self.target_transform(label)
        return data, label

    @staticmethod
    def read_mat(file):
        data = loadmat(file)["fig_mat_flatten"]
        return data.transpose().ravel().astype(np.float32)

    @staticmethod
    def read_mat_cnn(file):
        data = loadmat(file)["face_sample"]
        data_tmp = np.expand_dims(data,axis=0)
        return data_tmp.astype(np.float32)

    def list_all_mat_files(self):
        return [str(x.absolute()) for x in Path(self.data_dir).glob("**/*.mat")]

    def convert_path_to_label(self, path_str):
        label_start_idx = path_str.find('.mat')
        face_label = path_str[label_start_idx-3]
        mask_label = path_str[label_start_idx-2]
        dist_label = path_str[label_start_idx-1]
        return face_label + "_" + dist_label + "_" + mask_label

    def get_label_df(self):
        label_dict = {}
        for file in self.list_all_mat_files():
            label_dict[file] = self.convert_path_to_label(file)
        label_df = pd.DataFrame.from_dict(label_dict, orient="index").reset_index().rename(columns={"index": "path",
                                                                                                    0: "label"})

        # Splitting the dataset into training and testing sets
        train_df, test_df = train_test_split(label_df, test_size=0.2, random_state=42)  # Adjust test_size as needed
        return test_df.reset_index(drop=True)  # Return only the testing set

test_data_dir = './drive/MyDrive/AcFace_AE/RD-Net/Dataset/Efficiency/'
test_dataset = AudioFaceDataset_test(test_data_dir)

batch_size = 64
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False)  # No need to shuffle the test data


### Setup model

In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class RDNet(torch.nn.Module):
    def __init__(self, num_face=2, num_dist=2, mask=2):
        super().__init__()

        # Convolutional Layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2)
        self.drop = nn.Dropout(p=0.3)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.face_fc1 = nn.Linear(6528, 600)
        self.face_fc2 = nn.Linear(600, 300)
        self.face_fc3 = nn.Linear(300, num_face)

        self.dist_fc1 = nn.Linear(6530, 600)
        self.dist_fc2 = nn.Linear(600, 300)
        self.dist_fc3 = nn.Linear(300, num_dist)

        self.mask_fc1 = nn.Linear(6530, 600)
        self.mask_fc2 = nn.Linear(600, 300)
        self.mask_fc3 = nn.Linear(300, mask)

    def forward(self, x):
        batch_size = x.size(0)

        # Convolutional layers with ReLu activations
        a = torch.relu(self.conv1(x))
        a = torch.relu(self.conv2(a))
        # a = self.drop(a)
        # a = self.pool(a)
        x_cnn_output = a.view((batch_size, -1))

        x_face = F.relu(self.face_fc1(x_cnn_output))
        x_face = F.relu(self.face_fc2(x_face))
        x_face = self.face_fc3(x_face)
        x_face_output = torch.sigmoid(x_face)

        x_dist_input = torch.cat((x_cnn_output,x_face_output),1)
        x_dist = F.relu(self.dist_fc1(x_dist_input))
        x_dist = F.relu(self.dist_fc2(x_dist))
        x_dist = self.dist_fc3(x_dist)
        x_dist_output = torch.sigmoid(x_dist)

        x_mask_input = torch.cat((x_cnn_output,x_face_output),1)
        x_mask = F.relu(self.mask_fc1(x_mask_input))
        x_mask = F.relu(self.mask_fc2(x_mask))
        x_mask = self.mask_fc3(x_mask)
        x_mask_output = torch.sigmoid(x_mask)

        return [x_face_output,x_dist_output,x_mask_output]

model = RDNet()
model.to(device)

param_size = 0
for param in model.parameters():
    param_size += param.nelement() * param.element_size()
buffer_size = 0
for buffer in model.buffers():
    buffer_size += buffer.nelement() * buffer.element_size()

size_all_mb = (param_size + buffer_size) / 1024**2

print(f'Total parameters: {sum(p.numel() for p in model.parameters())}')
print(f'Model size: {size_all_mb:.3f} MB')

model_load_path = './drive/MyDrive/AcFace_AE/RD-Net/Model/efficiency/model_efficiency.pth'  # The path where your model is saved
model.load_state_dict(torch.load(model_load_path))

Total parameters: 12316122
Model size: 46.982 MB


<All keys matched successfully>

### Inference test

In [14]:
import time

criterion = torch.nn.CrossEntropyLoss()

model.eval()

test_count = 0
acc_list = []
cost_list = []
inference_times = []

for i, (test_X, test_Y) in enumerate(test_loader):
    test_count = test_count + 1

    face_Y = []
    dist_Y = []
    mask_Y = []
    for Y_i in test_Y:
      underline_idx = Y_i.find("_")
      face_Y.append(int(Y_i[underline_idx-1]))
      dist_Y.append(int(Y_i[underline_idx+1]))
      mask_Y.append(int(Y_i[underline_idx+3]))

    X = test_X.to(device)
    face_Y = torch.LongTensor(face_Y).to(device)
    dist_Y = torch.LongTensor(dist_Y).to(device)
    mask_Y = torch.LongTensor(mask_Y).to(device)

    # forward propagation
    start_time = time.time()
    output = model(X)
    end_time = time.time()
    inference_times.append(end_time - start_time)
    # print("inference delay: ", end_time - start_time)
    cost_face = criterion(output[0], face_Y)
    cost_dist = criterion(output[1], dist_Y)
    cost_mask = criterion(output[2], mask_Y)
    cost = cost_face - (0.03*cost_dist + 0.02*cost_mask)/2

    acc_list.append((torch.max(output[0],dim=1)[1] == face_Y).float().mean().item())
    cost_list.append(cost.item())

if acc_list:  # Check if acc_list is not empty
    averaged_delay = np.mean(inference_times)
    print(f'Avg Inference Delay: {averaged_delay * 1000:.2f} ms')
    print('\nAvg Accuracy: {:2.2f} %'.format(np.mean(acc_list) * 100))
else:
    raise Exception("\nNo valid accuracy computations were performed.")

Avg Inference Delay: 1.14 ms

Avg Accuracy: 91.67 %
