# Anti-Spoofing

In [1]:
import dlib
import cv2
import numpy as np
import imutils
from imutils import face_utils
from scipy.spatial import distance as dist
from imutils.video import VideoStream
import time
import os

ModuleNotFoundError: No module named 'cv2'

In [2]:
# define two constants, one for the eye aspect ratio to indicate
# blink and then a second constant for the number of consecutive
# frames the eye must be below the threshold
EYE_AR_THRESH = 0.23 #baseline
EYE_AR_CONSEC_FRAMES = 3

# eye landmarks
eye_landmarks = "shape_predictor_68_face_landmarks.dat"
# initialize the frame counters and the total number of blinks
COUNTER = 0
TOTAL = 0

## 1.1 eye blink detector class

In [3]:
class eye_blink_detector():
    def __init__(self):
        # cargar modelo para detecction frontal de rostros
        self.detector_faces = dlib.get_frontal_face_detector()
        # cargar modelo para deteccion de puntos de ojos
        self.predictor_eyes = dlib.shape_predictor(eye_landmarks)

    def eye_blink(self,gray,rect,COUNTER,TOTAL):
        (lStart, lEnd) = face_utils.FACIAL_LANDMARKS_IDXS["left_eye"]
        (rStart, rEnd) = face_utils.FACIAL_LANDMARKS_IDXS["right_eye"]
        # determine the facial landmarks for the face region, then
        # convert the facial landmark (x, y)-coordinates to a NumPy
        # array
        shape = self.predictor_eyes(gray, rect)
        shape = face_utils.shape_to_np(shape)
        # extract the left and right eye coordinates, then use the
        # coordinates to compute the eye aspect ratio for both eyes
        leftEye = shape[lStart:lEnd]
        rightEye = shape[rStart:rEnd]
        leftEAR = self.eye_aspect_ratio(leftEye)
        rightEAR = self.eye_aspect_ratio(rightEye)
        # average the eye aspect ratio together for both eyes
        ear = (leftEAR + rightEAR) / 2.0
        # check to see if the eye aspect ratio is below the blink
        # threshold, and if so, increment the blink frame counter
        if ear < EYE_AR_THRESH:
            COUNTER += 1
        # otherwise, the eye aspect ratio is not below the blink
        # threshold
        else:
            # if the eyes were closed for a sufficient number of
            # then increment the total number of blinks
            if COUNTER >= EYE_AR_CONSEC_FRAMES:
                TOTAL += 1
            # reset the eye frame counter
            COUNTER = 0
        return COUNTER,TOTAL

    def eye_aspect_ratio(self,eye):
        # compute the euclidean distances between the two sets of
        # vertical eye landmarks (x, y)-coordinates
        A = dist.euclidean(eye[1], eye[5])
        B = dist.euclidean(eye[2], eye[4])
        # compute the euclidean distance between the horizontal
        # eye landmark (x, y)-coordinates
        C = dist.euclidean(eye[0], eye[3])
        # compute the eye aspect ratio
        ear = (A + B) / (2.0 * C)
        # return the eye aspect ratio
        return ear

## 1.2 eye blink detection function

In [4]:
def convert_rectangles2array(rectangles,image):
    res = np.array([])
    for box in rectangles:
        [x0,y0,x1,y1] = max(0, box.left()), max(0, box.top()), min(box.right(), image.shape[1]), min(box.bottom(), image.shape[0])
        new_box = np.array([x0,y0,x1,y1])
        if res.size == 0:
            res = np.expand_dims(new_box,axis=0)
        else:
            res = np.vstack((res,new_box))
    return res

In [5]:
def get_areas(boxes):
    areas = []
    for box in boxes:
        x0,y0,x1,y1 = box
        area = (y1-y0)*(x1-x0)
        areas.append(area)
    return areas

In [6]:
def bounding_box(img,box,match_name=[]):
    for i in np.arange(len(box)):
        x0,y0,x1,y1 = box[i]
        img = cv2.rectangle(img,
                    (x0,y0),
                    (x1,y1),
                    (0,255,0),3);
        if not match_name:
            continue
        else:
            cv2.putText(img, match_name[i], (x0, y0-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,255,0), 2)
    return img

## 1.3 Get User's Verification images & Eye blinking Detection

In [7]:
# instancio detector
detector = eye_blink_detector()
# iniciar variables para el detector de parapadeo
COUNTER = 0
TOTAL = 0
video_path = "data/videos/video.mp4" #/content/video.mp4
image_path = "data/images/"
verification_path = "data/verification/"

In [8]:
def get_verification_images():
    
    verification_video_path = verification_path + "video/video.mp4"
    if not(os.path.isdir(verification_path + "video")):
        os.makedirs(verification_path + "video")
        
    cap = cv2.VideoCapture(0)
    # 저장될 영상은 mp4형식이다.
    # 캠이 여러대인 경우 인자로 0, 1, 2, 3...을 넣어주면 된다.
    # 캠으로부터 정보를 읽어들일 수 없는 경우 에러 메세지를 반환한다.
    if cap.isOpened() == False:
        print("Unable to read camera")

    # 캠으로부터 정보를 읽어들일 수 있으면,
    else:
        # 프레임의 정보 가져와 변수에 저장한다. 
        frame_width = int(cap.get(3))
        frame_height = int(cap.get(4))
        print("원을 그리듯이 고개를 돌려주세요")

        #캠으로 들어온 비디오를 따로 저장한다.
        out = cv2.VideoWriter(verification_video_path,
                            cv2.VideoWriter_fourcc('D', 'I', 'V', 'X'),
                            10,
                            (frame_width, frame_height) )


        # 동영상은 사진을 여러장 이어서 보여주는 개념이다.
        # 1초에 몇 장의 이미지가 들어가는지, fps(frame per second) 단위를 쓴다.
        # 캠으로부터 이미지 한 장만을 받아올 게 아니므로, 반복문을 사용한다.
        start_time = time.time()
        while True:
            ret, frame = cap.read()
            if ret == True:
                out.write(frame)
                cv2.imshow('frame', frame)
                recording_time = time.time() - start_time
                #esc를 입력하면, 이미지를 받아오길 멈추게 한다.
                if cv2.waitKey(1) & 0xFF == 27: # esc누르면 종료
                    break
                elif recording_time > 10: #최대 10초 정도까지 촬영가능
                    print("time over")
                    break
            else:
                break

        # sql 커서와 커넥션을 다 사용하고 나면 연결을 닫아주듯이, 비디오캡쳐도 닫아준다.
        cap.release()
        # 파일도 더 이상 작성하지 않도록 한다.
        out.release()

        #화면에 띄운 창을 닫아준다.
        cv2.destroyAllWindows()
        return verification_video_path

In [15]:
mode = input("Do you want User Registration? [y or n]: ")
if (mode == 'Y' or mode == 'y') or not(os.path.isdir("data/verification")):
    if not(os.path.isdir("data/verification")):
        os.makedirs("data/verification")
    verification_video_path = get_verification_images()
    verification_image_path = verification_path + "images/"
    if not(os.path.isdir("data/verification/images")):
        os.makedirs("data/verification/images")
    vidcap = cv2.VideoCapture(verification_video_path)
    fps = vidcap.get(cv2.CAP_PROP_FPS)
    all_frame = vidcap.get(cv2.CAP_PROP_FRAME_COUNT) #영상 총 프레임 수
    multiplier = all_frame / 50 # 어떤 영상이든 50장 캡쳐하도록
    count = 1
    success = True
    while success:
        frameId = int(round(vidcap.get(1)))
        success,image = vidcap.read()
        if frameId % multiplier < 1 : # 몇 프레임 당 한번씩 캡쳐할지
            cv2.imwrite(verification_image_path + "image%d.jpg" %count, image)
            print("saved image image%d.jpg" % count)
            count += 1

Do you want User Registration? [y or n]: y
time over
saved image image1.jpg
saved image image2.jpg
saved image image3.jpg
saved image image4.jpg
saved image image5.jpg
saved image image6.jpg
saved image image7.jpg
saved image image8.jpg
saved image image9.jpg
saved image image10.jpg
saved image image11.jpg
saved image image12.jpg
saved image image13.jpg
saved image image14.jpg
saved image image15.jpg
saved image image16.jpg
saved image image17.jpg
saved image image18.jpg
saved image image19.jpg
saved image image20.jpg
saved image image21.jpg
saved image image22.jpg
saved image image23.jpg
saved image image24.jpg
saved image image25.jpg
saved image image26.jpg
saved image image27.jpg
saved image image28.jpg
saved image image29.jpg
saved image image30.jpg
saved image image31.jpg
saved image image32.jpg
saved image image33.jpg
saved image image34.jpg
saved image image35.jpg
saved image image36.jpg
saved image image37.jpg
saved image image38.jpg
saved image image39.jpg
saved image image40.

In [10]:
# ----------------------------- video -----------------------------
# Eye blinking detection & Login

vs = cv2.VideoCapture(0) #VideoStream(src=0).start()
antiSpoofing = True

frame_width = int(vs.get(3))
frame_height = int(vs.get(4))

#캠으로 들어온 비디오를 따로 저장한다.
out = cv2.VideoWriter(video_path,
                    cv2.VideoWriter_fourcc('D', 'I', 'V', 'X'),
                    20,
                    (frame_width, frame_height) )

start_time = time.time()
print("눈을 깜박여주세요\n")
while True:
    star_time = time.time()
    ret, im = vs.read()
    if ret == True:
        out.write(im)
        cv2.imshow('frame', im)
        im = cv2.flip(im, 1)
        im = imutils.resize(im, width=720)
        gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
        # detectar_rostro    
        rectangles = detector.detector_faces(gray, 0)
        boxes_face = convert_rectangles2array(rectangles,im)
        if len(boxes_face)!=0:
            # seleccionar el rostro con mas area
            areas = get_areas(boxes_face)
            index = np.argmax(areas)
            rectangles = rectangles[index]
            boxes_face = np.expand_dims(boxes_face[index],axis=0)
            # blinks_detector
            COUNTER,TOTAL = detector.eye_blink(gray,rectangles,COUNTER,TOTAL)
            # agregar bounding box
            img_post = bounding_box(im,boxes_face,['blinks: {}'.format(TOTAL)])
            if TOTAL > 1 :
                break
        else:
            img_post = im 
        # visualizacion 
        end_time = time.time() - star_time    
        FPS = 1/end_time
        cv2.putText(img_post,f"FPS: {round(FPS,3)}",(10,50),cv2.FONT_HERSHEY_COMPLEX,1,(0,0,255),2)
        cv2.imshow('blink_detection',img_post)
        recording_time = time.time() - start_time
        if TOTAL > 1 : #한번 이상 눈깜박거림
            break
        elif recording_time > 7: #약 7초 내에 blink감지 안되면
            antiSpoofing = False
            print("time over")
            break

# sql 커서와 커넥션을 다 사용하고 나면 연결을 닫아주듯이, 비디오캡쳐도 닫아준다.
vs.release()
# 파일도 더 이상 작성하지 않도록 한다.
out.release()

#화면에 띄운 창을 닫아준다.
cv2.destroyAllWindows()

In [14]:
if antiSpoofing:
    print("PASS")
    vidcap = cv2.VideoCapture(video_path)
    fps = vidcap.get(cv2.CAP_PROP_FPS)
    multiplier = fps*20
    count = 1
    success = True
    while success:
        frameId = int(round(vidcap.get(1)))
        success,image = vidcap.read()
        if frameId % multiplier < 1 :
            cv2.imwrite(image_path + "image%d.jpg" %count, image)
            print("saved image image%d.jpg" % count)
            count += 1
else:
    print("Spoofing")

PASS
saved image image1.jpg


In [None]:
print(os.path.isfile(image_path + "1.jpg"))

# Facial Verification

# 1. Setup

In [None]:
from google.colab import drive

drive.mount('/content/gdrive')

## 1.1 Install Dependencies

In [None]:
!pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python matplotlib

## 1.2 Import Dependencies

In [None]:
# Import standard dependencies
#import cv2
#import os
import random
#import numpy as np
from matplotlib import pyplot as plt

In [24]:
# Import tensorflow dependencies - Functional API
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

## 1.3 Set GPU Growth

In [None]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

## 1.4 Create Folder Structures

In [None]:
PATH = '/content/gdrive/Shareddrives/Face_Recognition'
os.chdir(PATH)

In [None]:
# Setup paths
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative2')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
# Make the directories
# os.makedirs(POS_PATH)
# os.makedirs(NEG_PATH)
# os.makedirs(ANC_PATH)

In [None]:
file_list = os.listdir(NEG_PATH) 

print (len(file_list))

# 2. Collect Positives and Anchors

## 2.1 Untar Labelled Faces in the Wild Dataset

In [None]:
# http://vis-www.cs.umass.edu/lfw/

In [None]:
# Uncompress Tar GZ Labelled Faces in the Wild Dataset
!tar -xf lfw.tgz

In [None]:
# Move LFW Images to the following repository data/negative
# for directory in os.listdir('lfw'):
#     for file in os.listdir(os.path.join('lfw', directory)):
#         EX_PATH = os.path.join('lfw', directory, file)
#         NEW_PATH = os.path.join(NEG_PATH, file)
#         os.replace(EX_PATH, NEW_PATH)

## 2.2 Collect Positive and Anchor Classes

In [None]:
# Import uuid library to generate unique image names
import uuid

In [None]:
os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))

In [None]:
# # Establish a connection to the webcam
# cap = cv2.VideoCapture(0)
# while cap.isOpened(): 
#     ret, frame = cap.read()
   
#     # Cut down frame to 250x250px
#     frame = frame[120:120+250,200:200+250, :]
    
#     # Collect anchors 
#     if cv2.waitKey(1) & 0XFF == ord('a'):
#         # Create the unique file path 
#         imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
#         # Write out anchor image
#         cv2.imwrite(imgname, frame)
    
#     # Collect positives
#     if cv2.waitKey(1) & 0XFF == ord('p'):
#         # Create the unique file path 
#         imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
#         # Write out positive image
#         cv2.imwrite(imgname, frame)
    
#     # Show image back to screen
#     cv2.imshow('Image Collection', frame)
    
#     # Breaking gracefully
#     if cv2.waitKey(1) & 0XFF == ord('q'):
#         break
        
# # Release the webcam
# cap.release()
# # Close the image show frame
# cv2.destroyAllWindows()

In [None]:
#plt.imshow(frame[120:120+250,200:200+250, :])

# 2.x NEW - Data Augmentation

In [None]:
# def data_aug(img):
#     data = []
#     for i in range(9):
#         img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
#         img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
#         # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
#         img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
#         img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
#         img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
#         data.append(img)
    
#     return data

In [None]:
# import os
# import uuid

In [None]:
# img_path = os.path.join(ANC_PATH, 'a386326c-59f3-11ed-bb8e-0242ac1c0002.jpg')
# img = cv2.imread(img_path)
# augmented_images = data_aug(img)

# for image in augmented_images:
#     cv2.imwrite(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [None]:
# for file_name in os.listdir(os.path.join(POS_PATH)):
#     img_path = os.path.join(POS_PATH, file_name)
#     img = cv2.imread(img_path)
#     augmented_images = data_aug(img) 
    
#     for image in augmented_images:
#         cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

# 3. Load and Preprocess Images

## 3.1 Get Image Directories

In [4]:
positive = tf.data.Dataset.list_files('/content/gdrive/Shareddrives/Face_Recognition/data/positive/*.jpg').take(300)
negative = tf.data.Dataset.list_files('/content/gdrive/Shareddrives/Face_Recognition/data/negative2/*.jpg').take(300)
anchor = tf.data.Dataset.list_files('/content/gdrive/Shareddrives/Face_Recognition/data/anchor/*.jpg').take(300)

NameError: name 'tf' is not defined

In [5]:
dir_test = anchor.as_numpy_iterator()

NameError: name 'anchor' is not defined

In [6]:
print(dir_test.next())

NameError: name 'dir_test' is not defined

## 3.2 Preprocessing - Scale and Resize

In [7]:
def preprocess(file_path):
    
    # Read in image from file path
    byte_img = tf.io.read_file(file_path)
    # Load in the image 
    img = tf.io.decode_jpeg(byte_img)
    
    # Preprocessing steps - resizing the image to be 100x100
    img = tf.image.resize(img, (100,100))
    # Scale image to be between 0 and 1 
    img = img / 255.0

    # Return image
    return img

In [8]:
#img = preprocess('/content/gdrive/Shareddrives/Face_Recognition/data/anchor/1a18c9bc-64d8-11ed-904a-ac9084d4c809.jpg')

In [9]:
#plt.imshow(img)

In [10]:
#img.numpy().max() 

In [11]:
# dataset.map(preprocess)

## 3.3 Create Labelled Dataset

In [12]:
# (anchor, positive) => 1,1,1,1,1
# (anchor, negative) => 0,0,0,0,0

In [13]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

NameError: name 'tf' is not defined

In [14]:
samples = data.as_numpy_iterator()

NameError: name 'data' is not defined

In [15]:
exampple = samples.next()

NameError: name 'samples' is not defined

In [16]:
exampple

NameError: name 'exampple' is not defined

## 3.4 Build Train and Test Partition

In [17]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [18]:
res = preprocess_twin(*exampple)

NameError: name 'exampple' is not defined

In [19]:
plt.imshow(res[0])

NameError: name 'plt' is not defined

In [20]:
res[0]

NameError: name 'res' is not defined

In [21]:
# Build dataloader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=1024)

NameError: name 'data' is not defined

In [22]:
# Training partition
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

NameError: name 'data' is not defined

In [23]:
# Testing partition
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

NameError: name 'data' is not defined

# 4. Model Engineering

## 4.1 Build Embedding Layer

In [25]:
inp = Input(shape=(100,100,3), name='input_image')

In [26]:
c1 = Conv2D(64, (10,10), activation='relu')(inp)

In [27]:
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

In [28]:
c2 = Conv2D(128, (7,7), activation='relu')(m1)
m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

In [29]:
c3 = Conv2D(128, (4,4), activation='relu')(m2)
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

In [30]:
c4 = Conv2D(256, (4,4), activation='relu')(m3)
f1 = Flatten()(c4)
d1 = Dense(4096, activation='sigmoid')(f1)

In [31]:
mod = Model(inputs=[inp], outputs=[d1], name='embedding')

In [32]:
mod.summary()

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 100, 100, 3)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 91, 91, 64)        19264     
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 46, 46, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 40, 40, 128)       401536    
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 20, 20, 128)       0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 17, 17, 128)       262272    
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 9, 9, 128)         0 

In [33]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # First block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # Second block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # Third block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Final embedding block
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [34]:
embedding = make_embedding()

In [35]:
embedding.summary()

Model: "embedding"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_image (InputLayer)     [(None, 100, 100, 3)]     0         
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 91, 91, 64)        19264     
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 46, 46, 64)        0         
_________________________________________________________________
conv2d_5 (Conv2D)            (None, 40, 40, 128)       401536    
_________________________________________________________________
max_pooling2d_4 (MaxPooling2 (None, 20, 20, 128)       0         
_________________________________________________________________
conv2d_6 (Conv2D)            (None, 17, 17, 128)       262272    
_________________________________________________________________
max_pooling2d_5 (MaxPooling2 (None, 9, 9, 128)         0 

## 4.2 Build Distance Layer

In [46]:
anchor_image = Input(name='anchor_img', shape=(100,100,3))
validation_image = Input(name='validation_img', shape=(100,100,3))

In [47]:
anchor_embedding = embedding(anchor_image)
validation_embedding = embedding(validation_image)

In [48]:
# Siamese L1 Distance class
class L1Dist(Layer):
    
    # Init method - inheritance
    def __init__(self, **kwargs):
        super().__init__()
       
    # Magic happens here - similarity calculation
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [49]:
l1 = L1Dist()

In [50]:
l1(anchor_embedding, validation_embedding)

<KerasTensor: shape=(None, 4096) dtype=float32 (created by layer 'l1_dist_2')>

## 4.3 Make Siamese Model

In [None]:
input_image = Input(name='input_img', shape=(100,100,3))
validation_image = Input(name='validation_img', shape=(100,100,3))

In [None]:
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

In [None]:
siamese_layer = L1Dist()

In [None]:
distances = siamese_layer(inp_embedding, val_embedding)

In [None]:
classifier = Dense(1, activation='sigmoid')(distances)

In [None]:
classifier

In [None]:
siamese_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_network.summary()

In [None]:
def make_siamese_model(): 
    
    # Anchor image input in the network
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # Validation image in the network 
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # Combine siamese distance components
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # Classification layer 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

# 5. Training

## 5.1 Setup Loss and Optimizer

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

## 5.2 Establish Checkpoints

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

## 5.3 Build Train Step Function

In [None]:
test_batch = train_data.as_numpy_iterator()

In [None]:
batch_1 = test_batch.next()

In [None]:
X = batch_1[:2]

In [None]:
y = batch_1[2]

In [None]:
y

In [None]:
tf.losses.BinaryCrossentropy??

In [None]:
@tf.function
def train_step(batch):
    
    # Record all of our operations 
    with tf.GradientTape() as tape:     
        # Get anchor and positive/negative image
        X = batch[:2]
        # Get label
        y = batch[2]
        
        # Forward pass
        yhat = siamese_model(X, training=True)
        # Calculate loss
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    # Calculate gradients
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    # Calculate updated weights and apply to siamese model
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    # Return loss
    return loss

## 5.4 Build Training Loop

In [None]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

In [None]:
def train(data, EPOCHS):
    # Loop through epochs
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        # Creating a metric object 
        r = Recall()
        p = Precision()
        
        # Loop through each batch
        for idx, batch in enumerate(data):
            # Run train step here
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        # Save checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

## 5.5 Train the model

In [None]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)

# 6. Evaluate Model

## 6.1 Import Metrics

In [None]:
# Import metric calculations
from tensorflow.keras.metrics import Precision, Recall

## 6.2 Make Predictions

In [None]:
# Get a batch of test data
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input, test_val])

In [None]:
# Post processing the results 
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [None]:
y_true

## 6.3 Calculate Metrics

In [None]:
# Creating a metric object 
m = Recall()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

In [None]:
# Creating a metric object 
m = Precision()

# Calculating the recall value 
m.update_state(y_true, y_hat)

# Return Recall Result
m.result().numpy()

In [None]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())

## 6.4 Viz Results

In [None]:
# Set plot size 
plt.figure(figsize=(10,8))

# Set first subplot
plt.subplot(1,2,1)
plt.imshow(test_input[0])

# Set second subplot
plt.subplot(1,2,2)
plt.imshow(test_val[0])

# Renders cleanly
plt.show()

# 7. Save Model

In [None]:
# Save weights
siamese_model.save('siamesemodelv1.h5')

In [None]:
L1Dist

In [None]:
# Reload model 
siamese_model = tf.keras.models.load_model('siamesemodelv1.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
# Make predictions with reloaded model
siamese_model.predict([test_input, test_val])

In [None]:
# View model summary
siamese_model.summary()

# 8. Real Time Test

## 8.1 Verification Function

In [None]:
os.getcwd()

In [None]:
os.listdir(os.path.join('application_data', 'verification_images'))

In [None]:
#
PATH = '/content/gdrive/my-drive'
os.chdir(PATH)
#
os.path.join('application_data', 'input_image', 'input_image.jpg')

In [None]:
for image in os.listdir(os.path.join('application_data', 'verification_images')):
    validation_img = os.path.join('application_data', 'verification_images', image)
    print(validation_img)

In [None]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified

## 8.2 OpenCV Real Time Verification

In [None]:
results, verified = verify(siamese_model, 0.5, 0.5)
print(verified)

In [None]:
np.sum(np.squeeze(results) > 0.9)

In [None]:
results