In [1]:
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn import metrics
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import math
import random
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import StratifiedKFold
import glob

In [2]:

# print("Total number of subjects:", len(all_data_files))
# print("Total number of subjects:", len(all_generated_data_files))
# print(all_generated_data_files)

In [3]:
class KeystrokesDataset(Dataset):
    def __init__(self, samples, labels, transform):
        self.samples = samples
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        x = self.samples[idx]
        x = x.transpose((1, 2 ,0))
        x = self.transform(image=x)['image']
        label = torch.from_numpy(self.labels[idx]).float()
        return x, label


In [4]:
class KeystrokeImageNetwork(nn.Module):
  
    def __init__(self):
        super(KeystrokeImageNetwork, self).__init__()

        # 10 fingers
        # => output (x, 10)
        self.conv1_1 = nn.Conv2d(5, 64, kernel_size=3, padding=1)
        self.conv1_2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)

        self.conv2_1 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv2_2 = nn.Conv2d(128, 128, kernel_size=3, padding=1)

        # self.conv3_1 = nn.Conv2d(5, 64, kernel_size=(1,42), padding=0)
        # self.conv4_1 = nn.Conv2d(5, 64, kernel_size=(43,1), padding=0)

        # self.conv3_1 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        # self.conv3_2 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        # self.conv3_3 = nn.Conv2d(256, 256, kernel_size=3, padding=1)

        # max pooling (kernel_size, stride)
        # self.pool = nn.AvgPool2d(2, 2)
        self.pool = nn.MaxPool2d(2, 2)

        # fully conected layers:
        # self.fc6 = nn.Linear(42*42*5, 512)
        self.fc6 = nn.Linear(10*10*128, 512)
        # self.fc6 = nn.Linear(10*10*128 + 64*43 + 64*42, 512)
        self.fc7 = nn.Linear(512, 64)
        self.fc8 = nn.Linear(64, 1)

    def forward(self, x, training=True):
        # beforeX = F.relu(self.conv3_1(x))
        # afterX = F.relu(self.conv4_1(x))

        # print("beofore x shape is %s, after x shape is %s"%(beforeX.shape, afterX.shape)) #64 x 64 x 43 x 1 #64 x 64 x 1 x 42

        # probelm of the sparse feature
        # need to be fixed in the feature engineering
        # Add two new channel 
        # beforeX = beforeX.view(-1, 64 * 43 * 1)  # represent bigraph (*) - (correspending key)
        # afterX = afterX.view(-1, 64 * 1 * 42) # represent bigraph (corresponding key) - (*)

        x = F.relu(self.conv1_1(x))
        x = F.relu(self.conv1_2(x))
        x = self.pool(x)

        x = F.relu(self.conv2_1(x))
        x = F.relu(self.conv2_2(x))
        x = self.pool(x)

        # x = F.relu(self.conv3_1(x))
        # x = F.relu(self.conv3_2(x))
        # x = F.relu(self.conv3_3(x))
        # x = self.pool(x)

        # x = x.view(-1, 42*42*5)
        x = x.view(-1, 10*10*128)

        # x = torch.cat((x, beforeX, afterX), 1)
        # x = F.dropout(x, 0.5, training=training)
        x = F.relu(self.fc6(x))
        x = F.dropout(x, 0.5, training=training)
        x = F.relu(self.fc7(x))
        # x = F.dropout(x, 0.5, training=training)
        x = self.fc8(x)

        # x = F.log_softmax(x, dim=1)
        x = torch.sigmoid(x)
        return x  

In [5]:

#EXTRACTS REAL AND GENERATED SAMPLES
def extractRealAndGeneratedData(user_num, all_data_files,all_generated_data_files, positive_only = False, gan_only = False):
    positive_data = None
    negative_data = None

    positive_init = False
    negative_init = False

    positive_indices = None
    negative_indices = None

    positive_data = list()
    negative_data = list()
    target = user_num
    total = 40000
    
    #print('**********************', target)
    
    if(positive_only):
        for i, filename in enumerate(all_data_files):
            if i == target:
                positive_dataset = np.load(filename)
                positive_indices = list(range(len(positive_dataset)))
                np.random.shuffle(positive_indices)
                positive_init = True
                positive_data = positive_dataset[positive_indices]
                #print("Current positive keystroke images Data shape is",positive_data.shape)
        return positive_data
    
    if(gan_only):
        for i, filename in enumerate(all_generated_data_files):
            if i == target:
                negative_dataset = np.load(filename)
                negative_indices = list(range(len(negative_dataset)))
                np.random.shuffle(negative_indices)
                negative_init = True
                negative_data = negative_dataset[negative_indices]
                #print("Current negative keystroke images Data shape is",negative_data.shape)
        return negative_data
        
    else:
        for i, filename in enumerate(all_data_files):
            if i == target:
                positive_dataset = np.load(filename)
                positive_indices = list(range(len(positive_dataset)))
                np.random.shuffle(positive_indices)
                positive_init = True
                positive_data = positive_dataset[positive_indices]
                #print("Current positive keystroke images Data shape is",positive_data.shape)
    
        for i, filename in enumerate(all_generated_data_files):
            if i == target:
                negative_dataset = np.load(filename)
                negative_indices = list(range(len(negative_dataset)))
                np.random.shuffle(negative_indices)
                negative_init = True
                negative_data = negative_dataset[negative_indices]
                #print("Current negative keystroke images Data shape is",negative_data.shape)
        return positive_data, negative_data


In [6]:
def getNegative(target,all_data_files):
    positive_length = 100
    negative_length = 35000
    negative_data = None
    negative_init = False
        
    for i, filename in enumerate(all_data_files):
            if i != target:
                negative_dataset = np.load(filename)
                nega_len = len(negative_dataset)
                if random.randint(0, 1) == 0:
                    smaple_len = math.floor(nega_len/negative_length * positive_length)
                else:
                    smaple_len = math.ceil(nega_len/negative_length * positive_length)
                negative_indices = list(range(nega_len))

                if not negative_init:
                    negative_data = negative_dataset[negative_indices[:smaple_len]]
                    negative_init = True
                else:
                    extend_sameple = negative_dataset[negative_indices[:smaple_len]]
                    negative_data = np.concatenate((negative_data, extend_sameple), axis=0)
    #print("Negative Data Shape:",negative_data.shape)
    return negative_data


In [7]:
def evaluate(model,test_dataloader):
    correct = 0.0
    total = 0.0

    scores = []
    y = []

    with torch.no_grad():
        for i, data in enumerate(test_dataloader):
            inputs, labels = data
            #images = images.to(device).half() # uncomment for half precision model
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            #Passes input through CNN and generates output
            outputs = model.forward(inputs, training=False)
            total += labels.size(0)

            scores.extend(outputs.cpu().numpy().reshape(len(outputs)))
            y.extend(labels.cpu().numpy().reshape(len(labels)))
            
            #scores are then aggregated and averaged
            predicted = (outputs > 0.3).float()
            correct += (predicted == labels).sum().item()

    test_acc = (100.0 * correct) / total
    return test_acc

def getTransform(total_dataset):
    mean = 0.
    std = 0.

    # batch size (the last batch can have smaller size!)
    batch_samples = len(total_dataset) 
    images = total_dataset.reshape(batch_samples, 5, -1)
    mean += images.mean(2).sum(0)
    std += images.std(2).sum(0)

    mean /= len(total_dataset)
    std /= len(total_dataset)


    test_transform = A.Compose([
    A.Normalize(mean=mean.tolist(), std=std.tolist(), max_pixel_value=1.0, p=1.0),
    ToTensorV2(p=1.0),
    ], p=1)
    return test_transform

    

In [None]:
## EVALUATING GENERATED DATA  
model = KeystrokeImageNetwork()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
knum = 0
# list of test results to save and average at the end (CNN [50,75,100] vs [WGAN,DCGAN,CGAN][50,75,100]) 27 different lists
cnn_test_data = {'WGAN':[],'DCGAN':[],'CGAN':[]}
cnn_pos_data = list() # cnn tested on only positive ("1) data from dataset
cnn_gan_data = list() # cnn tested on gan data ("1") labeled real
cnn_pos_and_gan_data = list()# cnn tested on positive and real data labeled 1
cnn_pos_vs_gan_data = list() # cnn tested on positive data ("1") vs generated data labeled ("0")
cnn_pos_vs_neg_data = list() # 
cnn_gan_vs_neg_data = list() # 
cnn_pos_and_gan_vs_neg_data = list() #

def testCNN():
    for gan in ['WGAN','DCGAN']:
        
        
        for keystroke_num in ['50','75','100']:
            cnn_pos_data = list() # cnn tested on only positive ("1) data from dataset
            cnn_gan_data = list() # cnn tested on gan data ("1") labeled real
            cnn_pos_and_gan_data = list()# cnn tested on positive and real data labeled 1
            cnn_pos_vs_gan_data = list() # cnn tested on positive data ("1") vs generated data labeled ("0")
            cnn_pos_vs_neg_data = list() # 
            cnn_gan_vs_neg_data = list() # 
            cnn_pos_and_gan_vs_neg_data = list() #
            path = r"/home/jupyter/src/Thesis_Project/Data/"+keystroke_num
            all_data_files = glob.glob(path + "/*")
            all_data_files.sort()
            path = r"/home/jupyter/src/Thesis_Project/"+gan+"_data/"+keystroke_num
            all_generated_data_files = glob.glob(path + "/*")
            all_generated_data_files.sort()
            
            for k in range(75):
                # POS + GAN (i.e. both are labeled ones)
                positive_data, gan_data = extractRealAndGeneratedData(k,all_data_files,all_generated_data_files)
                total_dataset = np.concatenate((positive_data, gan_data), axis = 0)
                total_labels = np.concatenate((
                  np.ones((len(positive_data), 1)), 
                  np.ones((len(gan_data), 1))
                ), axis=0)
                test_transform = getTransform(total_dataset)
                test_dataset = KeystrokesDataset(total_dataset, total_labels, test_transform)
                test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)

                #Loading and testing model 
                model.load_state_dict(torch.load("/home/jupyter/src/Thesis_Project/CNN_"+keystroke_num+"_models/model_"+str(k)+".pth"))
                model.to(device)
                model.eval()

                # Test each CNN against Generated Data
                test_acc = evaluate(model,test_dataloader)


                # Summarize results 

                # fpr, tpr, thresholds = metrics.roc_curve(y, scores, pos_label=1)
                # fnr = 1 - tpr
                # eer_threshold = thresholds[np.nanargmin(np.absolute((fnr - fpr)))]
                # eer = fpr[np.nanargmin(np.absolute((fnr - fpr)))]
                # print(fpr,tpr,fnr)
                # print('Accuracy of the network on the test inputs: %f %%' % (test_acc))
                cnn_pos_and_gan_data.append(test_acc)


                # POS ONLY
                total_dataset = positive_data
                total_labels = np.ones((len(positive_data), 1))
                test_transform = getTransform(total_dataset)
                test_dataset = KeystrokesDataset(total_dataset, total_labels, test_transform)
                test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)
                test_acc = evaluate(model,test_dataloader)
                # print('Accuracy of the network on the test inputs: %f %%' % (test_acc))
                cnn_pos_data.append(test_acc)


                # GAN ONLY
                total_dataset = gan_data
                total_labels = np.ones((len(gan_data), 1))
                test_transform = getTransform(total_dataset)
                test_dataset = KeystrokesDataset(total_dataset, total_labels, test_transform)
                test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)
                test_acc = evaluate(model,test_dataloader)
                # print('Accuracy of the network on the test inputs: %f %%' % (test_acc))
                cnn_gan_data.append(test_acc)


                # POS vs GAN ONLY
                total_dataset = np.concatenate((positive_data, gan_data), axis = 0)
                total_labels = np.concatenate((
                  np.ones((len(positive_data), 1)), 
                  np.zeros((len(gan_data), 1))
                ), axis=0)
                test_transform = getTransform(total_dataset)
                test_dataset = KeystrokesDataset(total_dataset, total_labels, test_transform)
                test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)
                test_acc = evaluate(model,test_dataloader)
                # print('Accuracy of the network on the test inputs: %f %%' % (test_acc))
                cnn_pos_vs_gan_data.append(test_acc)
                
                # grab negative data
                negative_data = getNegative(k,all_data_files)
    
                # POS vs NEG 
                total_dataset = np.concatenate((positive_data, negative_data), axis = 0)
                total_labels = np.concatenate((
                  np.ones((len(positive_data), 1)), 
                  np.zeros((len(negative_data), 1))
                ), axis=0)
                test_transform = getTransform(total_dataset)
                test_dataset = KeystrokesDataset(total_dataset, total_labels, test_transform)
                test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)
                test_acc = evaluate(model,test_dataloader)
                # print('Accuracy of the network on the test inputs: %f %%' % (test_acc))
                cnn_pos_vs_neg_data.append(test_acc)

                
                # GAN vs NEG
                total_dataset = np.concatenate((gan_data, negative_data), axis = 0)
                total_labels = np.concatenate((
                  np.ones((len(gan_data), 1)), 
                  np.zeros((len(negative_data), 1))
                ), axis=0)
                test_transform = getTransform(total_dataset)
                test_dataset = KeystrokesDataset(total_dataset, total_labels, test_transform)
                test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)
                test_acc = evaluate(model,test_dataloader)
                # print('Accuracy of the network on the test inputs: %f %%' % (test_acc))
                cnn_gan_vs_neg_data.append(test_acc)

                
                # POS + GAN vs NEG 
                total_dataset = np.concatenate((positive_data,np.concatenate((gan_data, negative_data), axis = 0)),axis=0)
                total_labels = np.concatenate((
                  np.ones((len(positive_data)+len(gan_data), 1)), 
                  np.zeros((len(negative_data), 1))
                ), axis=0)
                test_transform = getTransform(total_dataset)
                test_dataset = KeystrokesDataset(total_dataset, total_labels, test_transform)
                test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)
                test_acc = evaluate(model,test_dataloader)
                # print('Accuracy of the network on the test inputs: %f %%' % (test_acc))
                cnn_pos_and_gan_vs_neg_data.append(test_acc)
                
                
    
            cnn_test_data[gan].append([cnn_pos_data,cnn_gan_data,cnn_pos_and_gan_data,cnn_pos_vs_gan_data,cnn_pos_vs_neg_data,cnn_gan_vs_neg_data, cnn_pos_and_gan_vs_neg_data])
        

testCNN()


In [None]:
test_name = ["cnn_pos_data\t\t\t","cnn_gan_data\t\t\t","cnn_pos_and_gan_data\t\t","cnn_pos_vs_gan_data\t\t","cnn_pos_vs_neg_data\t\t","cnn_gan_vs_neg_data\t\t", "cnn_pos_and_gan_vs_neg_data\t"]
knum = ['50','75','75']
for g in ['WGAN','DCGAN']:
    k = 0
    for i in cnn_test_data[g]:
        t = 0
        for tests in i:
            print(g +'\t',knum[k]+" keystrokes\t",test_name[t] + "AVERAGE:",round(sum(tests)/len(tests),3))
            t += 1
        k += 1
        print("\n\n")
    

#### positive_indices = list(range(3))
positive = [[1,2,3],[1,1,1],[2,2,2]]
np.random.shuffle(positive_indices)
positive_data = positive[positive_indices]

In [70]:
def calculate_activation_statistics(images, model):
    # Convert images to tensor and normalize
    images = torch.tensor(images).float()
    images = images.to(device)
    images = (images - 0.5) * 2

    # Run images through model
    with torch.no_grad():
        activations = model(images)
        activations = activations.view(activations.size(0), -1)
    
    # Calculate mean and covariance of activations
    mean = torch.mean(activations, dim=0)
    cov = torch.matmul(activations.T, activations) / activations.shape[0] - torch.matmul(mean.unsqueeze(1), mean.unsqueeze(0))

    return mean.cpu().numpy(), cov.cpu().numpy()

def calculate_fid_score(images1, images2, model):
    # Calculate activations and statistics for each dataset
    mu1, sigma1 = calculate_activation_statistics(images1, model)
    mu2, sigma2 = calculate_activation_statistics(images2, model)

    # Calculate the squared difference between means
    diff = mu1 - mu2
    diff_squared = diff.dot(diff)

    # Calculate the trace of the product of covariance matrices
    covmean = torch.from_numpy(scipy.linalg.sqrtm(sigma1.dot(sigma2))).real
    if not np.isfinite(covmean).all():
        covmean = torch.eye(sigma1.shape[0])

    # Calculate the FID score
    fid = diff_squared + torch.trace(sigma1 + sigma2 - 2 * covmean)

    return fid.item()
