<a href="https://colab.research.google.com/github/raj1149078/Iris-Liveness-Detection-/blob/main/Liveness%20Detection%20II.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from joblib import load, dump
from scipy.signal import butter, filtfilt, welch
from scipy.stats import entropy, skew, kurtosis, moment

<h2 align=center>CONSTANTS</h2>

In [2]:
PATH = '/content/drive/MyDrive/ETPAD.v2/'
LIVE = 'LIVE EYE MOVEMENTS/'
SAS_I = 'SAS_I EYE MOVEMENTS/'
SAS_II = 'SAS_II EYE MOVEMENTS/'

<h2 align=center>FEATURE EXTRACTION</h2>

In [3]:
class FeatureExtraction:

    """
                        ----------- FEATURE EXTRACTION CLASS ------------

            Required params:
                - path      : Path to the dataset folder
                - folder    : Name of the data folder. e.g. 'LIVE EYE MOVEMENTS/',
                             'SAS_I EYE MOVEMENTS/' etc.
                - label     : For live data label = 1 and for spoof label = 0
            Oprional params:
                - unit_size : size of a single unit. Must be able to divide 15000 
                              with no remainder. 
                - take_size : 1500 (FIXED FOR THIS DATASET)
    """

    def __init__(self, path, folder, label, unit_size = 1500, take_size=15000):
        self.path = path
        self.unit_size = unit_size
        self.folder = folder
        self.label = label
        self.take_size = take_size

    @staticmethod
    def get_local_unit_centroid(u, M):
        return np.sum(u, axis=1) / M

    @staticmethod
    def get_local_unit_power(u, M):
        return np.sum(u ** 2, axis=1) / M

    @staticmethod
    def get_local_unit_variance(u, LUC, M):
        return np.sum(np.subtract(u, LUC.reshape(-1, 1)) ** 2, axis=1) / M

    @staticmethod
    def get_local_unit_snr(LUC, LUV):
        LUV[LUV == 0] = 0.0000001
        return np.divide(LUC, np.sqrt(LUV))

    @staticmethod
    def get_local_unit_invalidity(validity, M):
        return np.sum(validity, axis=1) / M

    @staticmethod
    def get_entropy(u):
        u[u == 0] = 0.0000001
        temp = np.multiply(u, np.log(np.abs(u)))
        return np.sum(temp, axis=1)

    @staticmethod
    def get_max_spectral_power(u, fs=1000):
        f, p = welch(u, fs=fs, axis=1)
        return np.max(p, axis=1)
    
    #@staticmethod
    #def get_skew (u):
    #return pskew(axis = 1, bias = True) 

    @staticmethod
    def get_filtered_data(data, cut_off=5, fs=1000):
        b, a = butter(N=4, Wn=cut_off, btype='low', fs=fs)
        return filtfilt(b, a, data)

    def apply_temporal_filter(self, data):
        x = data['x'].to_numpy()
        y = data['y'].to_numpy()
        validity = data['validity'].to_numpy()
        x_velocity = np.gradient(x)
        y_velocity = np.gradient(y)
        filter_condition = (x_velocity < 5) & (y_velocity < 5)
        x_filt = x[~filter_condition]
        y_filt = y[~filter_condition]
        validity_filt = validity[~filter_condition]
        extra_elements = x_filt.shape[0] % self.unit_size
        reshaped_x = x_filt[:(x_filt.shape[0] - extra_elements)]
        reshaped_y = y_filt[:(y_filt.shape[0] - extra_elements)]
        reshaped_v = validity_filt[:(validity_filt.shape[0] - extra_elements)]
        return reshaped_x.reshape(-1, self.unit_size), \
        reshaped_y.reshape(-1, self.unit_size), \
        reshaped_v.reshape(-1, self.unit_size)

    def load_file(self, filename):
        data = pd.read_csv(self.path + self.folder + filename, 
                           delim_whitespace=True, skiprows=1, header=None)
        data.columns = ['sample', 'x', 'y', 'validity']
        return data

    def extract(self):
        features = np.empty((1, 39))

        # ------ GET ALL FILENAMES IN THE FOLDER ------
        file_names = os.listdir(self.path + self.folder)

        # -------- ITERATE THROUGH ALL THE FILES ----------- 
        # for file in file_names:
        for file in tqdm(file_names, desc="EXTRACTING... "):
            data = self.load_file(file)

            # ------------ APPLY FILTER ----------
            # x = self.get_filtered_data(data['x'].to_numpy()) \
            #         .reshape(-1, self.unit_size)
            # y = self.get_filtered_data(data['y'].to_numpy()) \
            #         .reshape(-1, self.unit_size)

            # x = data['x'].to_numpy().reshape(-1, self.unit_size)
            # y = data['y'].to_numpy().reshape(-1, self.unit_size)
            # validity = data['validity'].to_numpy().reshape(-1, self.unit_size)

            x, y, validity = self.apply_temporal_filter(data)

            # --------------- UNIT FEATURES --------------
            LUCx = self.get_local_unit_centroid(x, self.unit_size)
            LUCy = self.get_local_unit_centroid(y, self.unit_size)
            LUPx = self.get_local_unit_power(x, self.unit_size)
            LUPy = self.get_local_unit_power(y, self.unit_size)
            LUVx = self.get_local_unit_variance(x, LUCx, self.unit_size)
            LUVy = self.get_local_unit_variance(y, LUCy, self.unit_size)
            LUSx = self.get_local_unit_snr(LUCx, LUVx)
            LUSy = self.get_local_unit_snr(LUCy, LUVy)
            LUI = self.get_local_unit_invalidity(validity, self.unit_size)
            ENTx = self.get_entropy(x)
            ENTy = self.get_entropy(y)
            MSPx = self.get_max_spectral_power(x)
            MSPy = self.get_max_spectral_power(y)
            skewx = skew(x, axis=1)
            skewy = skew(y, axis=1)
            kurx = kurtosis(x, axis=1)
            kury = kurtosis(y, axis=1)
            Mntx = moment (x, axis=1)
            Mnty = moment (y, axis=1)

            

            # --------- COMBINE EVERYTHING --------------
            temp = np.stack([LUCx, LUCy, LUPx, LUPy, LUVx, 
                             LUVy, LUSx, LUSy, LUI, MSPx, MSPy, ENTx, ENTy, skewx, skewy, kurx, kury, Mntx, Mnty], axis=1)
            
            # ----------- AVERAGE AND STD -------------
            f_avg = np.mean(temp, axis=0).reshape(1, -1)
            f_std = np.std(temp, axis=0).reshape(1, -1)

            # ------ FINAL FEATURE VECTOR INCLUDING LABEL (400x19) ----------
            f_final = np.concatenate([f_avg, f_std, self.label], axis=1)
            features = np.append(features, f_final, axis=0)
        return features[1:, :]


In [4]:
# ------------- EXTRACT FEATURE FOR ALL 3 CLASSES --------------
FE = FeatureExtraction(PATH, LIVE, np.array([[1]]))
live_features = FE.extract()
FE = FeatureExtraction(PATH, SAS_I, np.array([[0]]))
sasi_features = FE.extract()
FE = FeatureExtraction(PATH, SAS_II, np.array([[0]]))
sasii_features = FE.extract()

EXTRACTING... : 100%|██████████| 400/400 [00:07<00:00, 52.31it/s]
EXTRACTING... : 100%|██████████| 400/400 [00:07<00:00, 51.35it/s]
EXTRACTING... : 100%|██████████| 400/400 [00:08<00:00, 45.08it/s]


In [5]:
# ----------- WRITE FEATURE VECTOR TO DRIVE -----------
dump(live_features, PATH + 'LIVE_FEATURES.joblib')
dump(sasi_features, PATH + 'SAS_I_FEATURES.joblib')
dump(sasii_features, PATH + 'SAS_II_FEATURES.joblib')

['/content/drive/MyDrive/ETPAD.v2/SAS_II_FEATURES.joblib']

<h2 align=center>CLASSIFICATION</h2>

In [43]:
from sklearn.svm import SVC
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score, ShuffleSplit
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.decomposition import PCA


In [7]:
# ------------- READ FEATURES FROM FILE --------------
# live_features = load(PATH + 'LIVE_FEATURES.joblib')
# sasi_features = load(PATH + 'SAS_I_FEATURES.joblib')
# sasii_features = load(PATH + 'SAS_II_FEATURES.joblib')

In [44]:
all_features_i = np.concatenate([live_features, sasi_features], axis=0)
all_features_i = all_features_i[~np.isnan(all_features_i).any(axis=1)]
X = all_features_i[:, :-1]
y = all_features_i[:, -1].astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=10)

In [45]:
pca = PCA()
X_train = pca.fit_transform(X_train)
X_test = pca.transform(X_test)
explained_variance = pca.explained_variance_ratio_
#print (explained_variance)

In [46]:
#clf = make_pipeline(StandardScaler(), SVC(gamma=1, C=100))

clf = RandomForestClassifier(random_state=0)
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)
#print("Confusion Matrix:")
cm = confusion_matrix(y_test, y_pred)
print(cm)

#clf = GradientBoostingClassifier()
cv = ShuffleSplit(n_splits=100, test_size=0.5, random_state=0)
scores = cross_val_score(clf, X, y, cv=cv) * 100
print("AVERAGE ACCURACY: %0.1f%% (+/- %0.2f%%)" % (scores.mean(), scores.std() * 2))
print("MAXIMUM ACCURACY: %0.1f%%" %(scores.max()))

[[186  11]
 [ 18 185]]
AVERAGE ACCURACY: 94.4% (+/- 2.10%)
MAXIMUM ACCURACY: 96.8%


In [47]:
all_features_ii = np.concatenate([live_features, sasii_features], axis=0)
all_features_ii = all_features_ii[~np.isnan(all_features_ii).any(axis=1)]
X = all_features_ii[:, :-1]
y = all_features_ii[:, -1].astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=10)

In [48]:
#clf = make_pipeline(StandardScaler(), SVC(gamma=2, C=100))

clf = RandomForestClassifier(random_state=0)
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)

#Confusion Matrix
print("Confusion Matrix:")
cm = confusion_matrix(y_test, y_pred)
print(cm)

cv = ShuffleSplit(n_splits=100, test_size=0.5, random_state=0)
scores = cross_val_score(clf, X, y, cv=cv) * 100
print("AVERAGE ACCURACY: %0.1f%% (+/- %0.2f%%)" % (scores.mean(), scores.std() * 2))
print("MAXIMUM ACCURACY: %0.1f%%" %(scores.max()))

Confusion Matrix:
[[190   7]
 [  8 195]]
AVERAGE ACCURACY: 97.0% (+/- 1.76%)
MAXIMUM ACCURACY: 99.5%
