# 1. Setup

#### Bibliotecas principais
- A biblioteca `os` é um **sistema operacional** que vai ser utilizado na criação de estrutura de pastas
- Numpy é utlizado para manipular **arrays**
- Matplotlib para plotar os gráficos

## 1.1 Install Dependencies

In [None]:
 # %pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python matplotlib

## 1.2 Import Dependencies

In [None]:
import cv2
import os
import random
import numpy as np
from matplotlib import pyplot as plt

### O que são Redes Neurais Siamesas?

Redes Neurais Siamesas (RNS) foram introduzidas por Bromley em colaboração, no ano de 1994, onde foram utilizadas para realizara veriﬁcação de assinaturas. A arquitetura de uma RNS, consiste em duas redes neurais que compartilham pesos idênticos e que são ligadas por uma ou mais camadas. Na maioria dos casos, uma RNS executa uma codiﬁcação não linear dos dados de entrada com o objetivo de atingir um espaço semanticamente signiﬁcativo onde padrões relacionados sejam próximos uns dos outros (tais como faces de pessoas, assinaturas,entre outros) e os não relacionados sejam distantes unsdos dos outros (Harandi et al., 2017).

- `from tensorflow.keras.models import Model` importa Modelos e agrupa camadas em um objeto com recursos de treinamento e inferência. Ele recebe a imagem de `input` a retorna **0** se a imagem for diferente e **1** se for igual no `output`

- `from tensorflow.keras.layers` importa as camadas possibilitando a criação de uma camada personalizada chamada `L1Dist`
  - Conv2D: utilizada para criar uma rede neural convolucional;
  - Dense: camada conectada;
  - MaxPooling2D: permite juntar as camadas e escolhe as informações que realmente são necessárias. Semelhante a uma média;
  - Input: a entrada do algoritmo;
  - Flatten: leva as informações de uma camada anterior, "achata" em uma única dimensão, que permite enviar os dados de uma rede convolucionoal para uma camada densa.

In [None]:
# tensorflow 
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer, Conv2D, Dense, MaxPooling2D, Input, Flatten
import tensorflow as tf

## 1.3 Set GPU Growth

### Configuração da GPU

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

## 1.4 Create Folder Structures

O algoritmo utiliza 3 pastas.
- Âncora
- Positivo
- Negativo

Na pasta âncora está a imagem que define quem nós somos. Na pasta positivo, está a imagem que confirmará se a o arquivo da pasta âncora é correto. Negativo são os que não se assemelham com o arquivo da pasta âncora.

In [None]:
# configurando os caminhos
POS_PATH = os.path.join('data', 'positive')
NEG_PATH = os.path.join('data', 'negative')
ANC_PATH = os.path.join('data', 'anchor')

In [None]:
# Make the directories
#os.makedirs(POS_PATH)
#os.makedirs(NEG_PATH)
#os.makedirs(ANC_PATH)

# 2. Collect Positives and Anchors

## 2.1 Untar Labelled Faces in the Wild Dataset

In [None]:
#http://vis-www.cs.umass.edu/lfw/

In [None]:
# descompacta o arquivo `lfw.tgz`
!tar -xf lfw.tgz

In [None]:
# Mova as imagens LFW para os seguintes dados/negativos do repositório
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(NEG_PATH, file)
        os.replace(EX_PATH, NEW_PATH)

## 2.2 Collect Positive and Anchor Classes

In [None]:
# Importe a biblioteca uuid para gerar nomes de imagem exclusivos
import uuid

In [None]:
os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))

### Cria o s arquivos âncora, que serão utilizados no treino para comparar com os positivos

In [None]:
# Conecta webcam
cap = cv2.VideoCapture(0)
while cap.isOpened(): 
    ret, frame = cap.read()
   
    frame = frame[120:120+250,200:200+250, :]
    
    # ancoras 
    if cv2.waitKey(1) & 0XFF == ord('a'):
        # Crie o caminho de arquivo exclusivo
        imgname = os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Escreva a imagem da âncora
        cv2.imwrite(imgname, frame)
    
    # positivos
    if cv2.waitKey(1) & 0XFF == ord('p'):
        # Crie o caminho de arquivo exclusivo
        imgname = os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1()))
        # Escreva a imagem da positiva
        cv2.imwrite(imgname, frame)
    
    # Mostrar imagem de volta à tela
    cv2.imshow('Image Collection', frame)
    
    if cv2.waitKey(1) & 0XFF == ord('q'):
        break
        
cap.release()
cv2.destroyAllWindows()

In [None]:
plt.imshow(frame)

# 2.x NEW - Data Augmentation

In [None]:
def data_aug(img):
    data = []
    for i in range(9):
        img = tf.image.stateless_random_brightness(img, max_delta=0.02, seed=(1,2))
        img = tf.image.stateless_random_contrast(img, lower=0.6, upper=1, seed=(1,3))
        # img = tf.image.stateless_random_crop(img, size=(20,20,3), seed=(1,2))
        img = tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_jpeg_quality(img, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100),np.random.randint(100)))
        img = tf.image.stateless_random_saturation(img, lower=0.9,upper=1, seed=(np.random.randint(100),np.random.randint(100)))
            
        data.append(img)
    
    return data

In [None]:
import os
import uuid

In [None]:
img_path = os.path.join(ANC_PATH, 'f2bfd8c3-7184-11ed-8dc8-6432a87b22d4.jpg')
img = cv2.imread(img_path)
augmented_images = data_aug(img)

for image in augmented_images:
    cv2.imwrite(os.path.join(ANC_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

### Essa parte do código eu comentei, pq ele criam uitas imagens na pasta positivo e minha máquina não estava conseguindo executar, travando muito. Então peguei uma quantidade suficiente pra funcionar e comentei esse código pra não travar o note.

Na verdade a celula está como markdown

for file_name in os.listdir(os.path.join(POS_PATH)):
    img_path = os.path.join(POS_PATH, file_name)
    img = cv2.imread(img_path)
    augmented_images = data_aug(img) 
    
    for image in augmented_images:
       cv2.imwrite(os.path.join(POS_PATH, '{}.jpg'.format(uuid.uuid1())), image.numpy())

# 3. Load and Preprocess Images

## 3.1 Get Image Directories

In [None]:
anchor = tf.data.Dataset.list_files(ANC_PATH+'\*.jpg').take(3000)
positive = tf.data.Dataset.list_files(POS_PATH+'\*.jpg').take(3000)
negative = tf.data.Dataset.list_files(NEG_PATH+'\*.jpg').take(3000)

In [None]:
dir_test = anchor.as_numpy_iterator()

In [None]:
print(dir_test.next())

## 3.2 Preprocessing - Scale and Resize

In [None]:
def preprocess(file_path):
    
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    
    img = tf.image.resize(img, (100,100))
    img = img / 255.0

    return img

In [None]:
img = preprocess('data\\anchor\\f2bfd8c3-7184-11ed-8dc8-6432a87b22d4.jpg')

In [None]:
img.numpy().max() 

## 3.3 Create Labelled Dataset

In [None]:
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor)))))
data = positives.concatenate(negatives)

In [None]:
samples = data.as_numpy_iterator()

In [None]:
exampple = samples.next()

In [None]:
exampple

## 3.4 Build Train and Test Partition

In [None]:
def preprocess_twin(input_img, validation_img, label):
    return(preprocess(input_img), preprocess(validation_img), label)

In [None]:
res = preprocess_twin(*exampple)

In [None]:
plt.imshow(res[1])

In [None]:
res[2]

In [None]:
# Carregando pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)

In [None]:
# Partição de treinamento
train_data = data.take(round(len(data)*.7))
train_data = train_data.batch(16)
train_data = train_data.prefetch(8)

In [None]:
# Partição de teste
test_data = data.skip(round(len(data)*.7))
test_data = test_data.take(round(len(data)*.3))
test_data = test_data.batch(16)
test_data = test_data.prefetch(8)

# 4. Model Engineering

## 4.1 Build Embedding Layer

In [None]:
inp = Input(shape=(100,100,3), name='input_image')

In [None]:
c1 = Conv2D(64, (10,10), activation='relu')(inp)

In [None]:
m1 = MaxPooling2D(64, (2,2), padding='same')(c1)

In [None]:
c2 = Conv2D(128, (7,7), activation='relu')(m1)
m2 = MaxPooling2D(64, (2,2), padding='same')(c2)

In [None]:
c3 = Conv2D(128, (4,4), activation='relu')(m2)
m3 = MaxPooling2D(64, (2,2), padding='same')(c3)

In [None]:
c4 = Conv2D(256, (4,4), activation='relu')(m3)
f1 = Flatten()(c4)
d1 = Dense(4096, activation='sigmoid')(f1)

In [None]:
mod = Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
mod.summary()

In [None]:
def make_embedding(): 
    inp = Input(shape=(100,100,3), name='input_image')
    
    # primeiro block
    c1 = Conv2D(64, (10,10), activation='relu')(inp)
    m1 = MaxPooling2D(64, (2,2), padding='same')(c1)
    
    # segundo block
    c2 = Conv2D(128, (7,7), activation='relu')(m1)
    m2 = MaxPooling2D(64, (2,2), padding='same')(c2)
    
    # terceiro block 
    c3 = Conv2D(128, (4,4), activation='relu')(m2)
    m3 = MaxPooling2D(64, (2,2), padding='same')(c3)
    
    # Bloco de incorporação final
    c4 = Conv2D(256, (4,4), activation='relu')(m3)
    f1 = Flatten()(c4)
    d1 = Dense(4096, activation='sigmoid')(f1)
    
    
    return Model(inputs=[inp], outputs=[d1], name='embedding')

In [None]:
embedding = make_embedding()

In [None]:
embedding.summary()

## 4.2 Build Distance Layer

In [None]:
# Classificação da distancia siamesa L1
class L1Dist(Layer):
    
    def __init__(self, **kwargs):
        super().__init__()
       
    # calcula similaridade
    def call(self, input_embedding, validation_embedding):
        return tf.math.abs(input_embedding - validation_embedding)

In [None]:
l1 = L1Dist()

In [None]:
# l1(anchor_embedding, validation_embedding)

## 4.3 Make Siamese Model

In [None]:
input_image = Input(name='input_img', shape=(100,100,3))
validation_image = Input(name='validation_img', shape=(100,100,3))

In [None]:
inp_embedding = embedding(input_image)
val_embedding = embedding(validation_image)

In [None]:
siamese_layer = L1Dist()

In [None]:
distances = siamese_layer(inp_embedding, val_embedding)

In [None]:
classifier = Dense(1, activation='sigmoid')(distances)

In [None]:
classifier

In [None]:
siamese_network = Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_network.summary()

In [None]:
def make_siamese_model(): 
    
    # entrada da imagem ancora na rede
    input_image = Input(name='input_img', shape=(100,100,3))
    
    # validaçao da imagem na rede 
    validation_image = Input(name='validation_img', shape=(100,100,3))
    
    # combinação das distancias dos componentes
    siamese_layer = L1Dist()
    siamese_layer._name = 'distance'
    distances = siamese_layer(embedding(input_image), embedding(validation_image))
    
    # camada de classificação 
    classifier = Dense(1, activation='sigmoid')(distances)
    
    return Model(inputs=[input_image, validation_image], outputs=classifier, name='SiameseNetwork')

In [None]:
siamese_model = make_siamese_model()

In [None]:
siamese_model.summary()

# 5. Training

### Treino do modelo

## 5.1 Setup Loss and Optimizer

In [None]:
binary_cross_loss = tf.losses.BinaryCrossentropy()

In [None]:
opt = tf.keras.optimizers.Adam(1e-4) # 0.0001

## 5.2 Establish Checkpoints

In [None]:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(opt=opt, siamese_model=siamese_model)

## 5.3 Build Train Step Function

In [None]:
test_batch = train_data.as_numpy_iterator()

In [None]:
batch_1 = test_batch.next()

In [None]:
X = batch_1[:2]

In [None]:
y = batch_1[2]

In [None]:
y

In [None]:
@tf.function
def train_step(batch):
    
    with tf.GradientTape() as tape:     
        X = batch[:2]
        y = batch[2]
        
        yhat = siamese_model(X, training=True)
        loss = binary_cross_loss(y, yhat)
    print(loss)
        
    grad = tape.gradient(loss, siamese_model.trainable_variables)
    
    opt.apply_gradients(zip(grad, siamese_model.trainable_variables))
        
    return loss

## 5.4 Build Training Loop

In [None]:
from tensorflow.keras.metrics import Precision, Recall

In [None]:
def train(data, EPOCHS):
    for epoch in range(1, EPOCHS+1):
        print('\n Epoch {}/{}'.format(epoch, EPOCHS))
        progbar = tf.keras.utils.Progbar(len(data))
        
        r = Recall()
        p = Precision()
        
        # Percorra cada lote
        for idx, batch in enumerate(data):
            # executa o passo de treino
            loss = train_step(batch)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat) 
            progbar.update(idx+1)
        print(loss.numpy(), r.result().numpy(), p.result().numpy())
        
        # salva checkpoints
        if epoch % 10 == 0: 
            checkpoint.save(file_prefix=checkpoint_prefix)

## 5.5 Train the model

In [None]:
EPOCHS = 50

In [None]:
train(train_data, EPOCHS)

# 6. Evaluate Model

## 6.1 Import Metrics

In [None]:
# Importar cálculos de métricas
from tensorflow.keras.metrics import Precision, Recall

## 6.2 Make Predictions

In [None]:
# Obter um lote de dados de teste
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [None]:
y_hat = siamese_model.predict([test_input, test_val])

In [None]:
# Pós-processamento dos resultados
[1 if prediction > 0.5 else 0 for prediction in y_hat ]

In [None]:
y_true

## 6.3 Calculate Metrics

In [None]:
m = Recall()

m.update_state(y_true, y_hat)

m.result().numpy()

In [None]:
m = Precision()

m.update_state(y_true, y_hat)

m.result().numpy()

In [None]:
r = Recall()
p = Precision()

for test_input, test_val, y_true in test_data.as_numpy_iterator():
    yhat = siamese_model.predict([test_input, test_val])
    r.update_state(y_true, yhat)
    p.update_state(y_true,yhat) 

print(r.result().numpy(), p.result().numpy())

## 6.4 Viz Results

### Visualizando os resultados

In [None]:
plt.figure(figsize=(10,8))

plt.subplot(1,2,1)
plt.imshow(test_input[0])

plt.subplot(1,2,2)
plt.imshow(test_val[0])

plt.show()

# 7. Save Model

In [None]:
# Salvar pesos
siamese_model.save('siamesemodelv2.h5')

In [None]:
L1Dist

In [None]:
# Recarregando o modelo
siamese_model = tf.keras.models.load_model('siamesemodelv2.h5', 
                                   custom_objects={'L1Dist':L1Dist, 'BinaryCrossentropy':tf.losses.BinaryCrossentropy})

In [None]:
# previsões com modelo recarregado
siamese_model.predict([test_input, test_val])

In [None]:
# resumo do modelo
siamese_model.summary()

# 8. Real Time Test

## 8.1 Verification Function

In [None]:
# application_data//verification_images

In [None]:
os.listdir(os.path.join('application_data', 'C:/Users/rayan/Documents/machinelearning/facial-recognition-app/FaceIDApp-main/application_data/verification_images'))

In [None]:
os.path.join('application_data', 'input_image', 'input_image.jpg')

In [None]:
for image in os.listdir(os.path.join('application_data', 'C:/Users/rayan/Documents/machinelearning/facial-recognition-app/FaceIDApp-main/application_data/verification_images')):
    validation_img = os.path.join('application_data', 'C:/Users/rayan/Documents/machinelearning/facial-recognition-app/FaceIDApp-main/application_data/verification_images', image)
    print(validation_img)

In [None]:
def verify(model, detection_threshold, verification_threshold):
    results = []
    for image in os.listdir(os.path.join('application_data', 'C:/Users/rayan/Documents/machinelearning/facial-recognition-app/FaceIDApp-main/application_data/verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'C:/Users/rayan/Documents/machinelearning/facial-recognition-app/FaceIDApp-main/application_data/verification_images', image))
        
        # faz predições
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    # Limiar de Detecção: Métrica acima da qual uma predição é considerada positiva 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Limiar de verificação: Proporção de previsões positivas/total de amostras positivas
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified

## 8.2 OpenCV Real Time Verification

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[120:120+250,200:200+250, :]
    
    cv2.imshow('Verification', frame)
    
    # Verification trigger
    if cv2.waitKey(10) & 0xFF == ord('v'):
        # Save input image to application_data/input_image folder 
#         hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
#         h, s, v = cv2.split(hsv)

#         lim = 255 - 10
#         v[v > lim] = 255
#         v[v <= lim] -= 10
        
#         final_hsv = cv2.merge((h, s, v))
#         img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)

        cv2.imwrite(os.path.join('application_data', 'input_image', 'input_image.jpg'), frame)
        # Run verification
        results, verified = verify(siamese_model, 0.5, 0.5)
        print(verified)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

In [None]:
np.sum(np.squeeze(results) > 0.9)

In [None]:
results