# **Run General Setup**

In [None]:
#Downloading 
!pip install flask
!pip install pyngrok
!pip install facenet-pytorch
!pip install opencv-python
!pip install seaborn

from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyngrok
  Downloading pyngrok-6.0.0.tar.gz (681 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m681.2/681.2 kB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyngrok
  Building wheel for pyngrok (setup.py) ... [?25l[?25hdone
  Created wheel for pyngrok: filename=pyngrok-6.0.0-py3-none-any.whl size=19867 sha256=b08232ec69000b096fa274b8bd7b443706881a7517826b5d7bd56c3b5bfa4f55
  Stored in directory: /root/.cache/pip/wheels/5c/42/78/0c3d438d7f5730451a25f7ac6cbf4391759d22a67576ed7c2c
Successfully built pyngrok
Installing collected packages: pyngrok
Successfully installed pyngrok-6.0.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/publ

In [None]:
from flask import Flask, render_template, request
import sqlite3
from google.colab import output
from pyngrok import ngrok
import cv2
import io
import csv
import numpy as np
import os
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init
import torch.optim as optim
import torch.utils.data as data
import torchvision.transforms as transforms
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import PIL.Image
import pickle
from facenet_pytorch import InceptionResnetV1, fixed_image_standardization
from torch.utils.data import Dataset, DataLoader
from IPython.display import display, Javascript, Image
from google.colab.output import eval_js
from flask import jsonify, flash, get_flashed_messages, request, send_file, redirect, session, url_for

face_cascade = cv2.CascadeClassifier('/content/drive/MyDrive/triplet_loss/haarcascade_frontalface_default.xml')

training_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0), ratio=(0.75, 1.3333333333333333), antialias=True),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

validation_transform = transforms.Compose([
    transforms.Resize((224, 224), antialias=True),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
class TripletFaceDataset(Dataset):
    def __init__(self, data_dir, transform):
        self.data_dir = data_dir
        self.transform = transform
        self.labels_to_images = self._get_labels_to_images()
        
    def _get_labels_to_images(self):
        labels_to_images = {}
        for root, dirs, files in os.walk(self.data_dir):
            for file in files:
                if file.endswith('.jpg') or file.endswith('JPEG') or file.endswith('jpeg'):
                    label = os.path.basename(root)
                    if label not in labels_to_images:
                        labels_to_images[label] = []
                    image_path = os.path.join(root, file)
                    labels_to_images[label].append(image_path)
        return labels_to_images
    
    def __len__(self):
        return len(self.labels_to_images)
    
    def __getitem__(self, idx):
        labels = list(self.labels_to_images.keys())
        anchor_label = random.choice(labels)
        positive_label = anchor_label
        negative_label = random.choice(labels)
        while negative_label == anchor_label:
            negative_label = random.choice(labels)
                
        anchor_positive_images = self.labels_to_images[anchor_label]
        anchor_img_path = random.sample(anchor_positive_images, 1)[0]
        positive_img_path = random.sample(list(set(anchor_positive_images) - set([anchor_img_path])), 1)[0]

        negative_img_path = random.sample(self.labels_to_images[negative_label], 1)[0]
        
        anchor_img = self.transform(PIL.Image.open(anchor_img_path))
        positive_img = self.transform(PIL.Image.open(positive_img_path))
        negative_img = self.transform(PIL.Image.open(negative_img_path))
        
        return anchor_img, positive_img, negative_img

In [None]:
class InceptionResnetV1Model(nn.Module):
    def __init__(self, embedding_size, pretrained=True, dropout_prob=0.6):
        super(InceptionResnetV1Model, self).__init__()

        self.model = InceptionResnetV1(pretrained='vggface2')

        for param in self.model.parameters():
            param.requires_grad = False

        self.dropout = nn.Dropout(p=dropout_prob)
        self.batchnorm = nn.BatchNorm1d(512)
        self.fc = nn.Linear(512, embedding_size)

    def forward(self, x):
        x = self.model(x)
        x = self.dropout(x)
        x = self.batchnorm(x)
        x = self.fc(x)
        return x

In [None]:
model = InceptionResnetV1Model(embedding_size=128)
model.classify = True

for name, module in model.named_modules():
    if isinstance(module, nn.Linear):
        init.xavier_uniform_(module.weight)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model_cpu = model.cpu()

weight_decay = 0.00001
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=weight_decay)

model_path = '/content/drive/MyDrive/triplet_loss/model_save.pt'

  0%|          | 0.00/107M [00:00<?, ?B/s]

In [None]:
def latest_model_state():
  if torch.cuda.is_available():
    checkpoint = torch.load(model_path)
  else:
    checkpoint = torch.load(model_path, map_location=torch.device('cpu'))

  epoch = checkpoint['epoch']
  model.load_state_dict(checkpoint['model_state_dict'])
  optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
  loss = checkpoint['loss']

with open('/content/drive/MyDrive/triplet_loss/embeddings_db.pkl', 'rb') as f:
  known_embeddings = pickle.load(f)

# **Run Model**

In [None]:
def crop_face(image_path):
    image = cv2.imread(image_path)
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))

    if len(faces) == 0:
        return None

    (x, y, w, h) = faces[0]
    cropped_image = image[y:y+h, x:x+w]

    cropped_pil = PIL.Image.fromarray(cropped_image)

    return cropped_pil

In [None]:
def find_match():
    query_img = crop_face('/content/image.png')
    if query_img == None:
        return "No match found", 0
    query_tensor = validation_transform(query_img).unsqueeze(0)

    model.eval()

    with torch.no_grad():
        query_embedding = model_cpu(query_tensor).squeeze().numpy()

    distances = {}
    for name, embeddings in known_embeddings.items():
        distances[name] = torch.cdist(torch.tensor([query_embedding]), embeddings).squeeze().numpy()

    min_distance = float('inf')
    matching_name = None
    for name, distance in distances.items():
        avg_distance = distance.mean()
        if avg_distance < min_distance:
            min_distance = avg_distance
            matching_name = name

    # Normalize the distances between 0 and 1
    min_distance_all = min(distances.values(), key=lambda x: x.mean()).mean()
    max_distance_all = max(distances.values(), key=lambda x: x.mean()).mean()
    normalized_distance = (min_distance - min_distance_all) / (max_distance_all - min_distance_all)

    confidence_score = 1 - normalized_distance
    confidence_threshold = 0.6

    if confidence_score < confidence_threshold:
        return "No match found", 0
    else:
        return matching_name, confidence_score

In [None]:
def process_image():
  img_data = request.get_data()
  img_data_stream = io.BytesIO(img_data)
  img = PIL.Image.open(img_data_stream)
  img.save('image.png')

In [None]:
def check_id(name):
  id = cursor.execute('select Darbuotojo_ID from timesheet where label=?;', (name,)).fetchone()
  if id is None:
    check = cursor.execute('select max(Darbuotojo_ID) + 1 FROM timesheet;').fetchone()[0]
    if check is None:
      return 1
    else:
      return cursor.execute('select max(Darbuotojo_ID) + 1 from timesheet;').fetchone()[0]
  else:
    return id[0]

In [None]:
def insert_values(match_name, workers):
  if match_name not in workers:
    id = check_id(match_name)
    cursor.execute(f"insert into timesheet(Iraso_NR, Darbuotojo_ID ,label, Atvykimas, Isvykimas) values (NULL, {id}, '{match_name}', datetime('now', '+3 hour'), NULL);")
    message = ". Sveiki atvykę."
  elif match_name in workers:
    cursor.execute(f"update timesheet set Isvykimas = datetime('now', '+3 hour') where label = ? and Iraso_NR = (select max(Iraso_NR) from timesheet where label=?);", (match_name, match_name))
    message = ". Viso gero."
  connection.commit()
  return message

In [None]:
def create_timesheet(name):
  file_path = os.path.join('/content/drive/MyDrive/triplet_loss/downloadable', f'{name}_timesheet.csv')
  with open(file_path, 'w', newline='') as file:
    writer = csv.writer(file)
    if name == 'bendras':
      cursor.execute("select * from timesheet")
    else:
      cursor.execute("select * from timesheet where label=?", (name,))
    columns = [description[0] for description in cursor.description]
    writer.writerow(columns)
    rows = cursor.fetchall()
    writer.writerows(rows)
  return file_path

In [None]:
def get_labels():
  directory = '/content/drive/MyDrive/triplet_loss/faces'
  if os.path.isdir(directory):
    labels = [label for label in os.listdir(directory)]
  return labels

In [None]:
if os.path.exists(model_path):
  latest_model_state()

app = Flask(__name__, template_folder='/content/drive/MyDrive/triplet_loss/templates')
ngrok.set_auth_token("2Nk96yKFpLVyMgYnj073X7veRq2_LCALuFqnRHXMiwowDMT3")
app.secret_key = 'secret_key'
port_number=4500;
public_url = ngrok.connect(port_number).public_url

connection = sqlite3.connect('/content/drive/MyDrive/triplet_loss/database/timesheet.db', check_same_thread=False)
cursor = connection.cursor()
worker_set = set()
depart_set = set()

@app.route('/process_data', methods=['POST'])
def process_data():
  process_image()
  match_name, confidence = find_match()
  if match_name == "No match found":
    flash("Neužfiksuotas veidas arba nerastas atitikmuo. Prašome bandyti iš naujo")
  if match_name in depart_set:
    flash(f"Jau užfiksuotas šio asmens ({match_name}) atvykimo ir išvykimo laikas. Viso gero")
  elif match_name != "No match found":
    message = insert_values(match_name, worker_set)
    if message == ". Viso gero.":
      depart_set.add(match_name)
    worker_id = cursor.execute("select Darbuotojo_ID from timesheet where label = ?", (match_name,)).fetchone()[0]
    flash(f"Atitikmuo rastas. Įsitikinimas: {confidence:.2f}. Darbuotojo ID: {worker_id}, etiketė: {match_name}" + message)
    worker_set.add(match_name)
  return render_template('index.html')

@app.route('/flash_messages')
def get_flash_messages():
  messages = get_flashed_messages()
  return jsonify(messages)

@app.route('/login', methods=['GET', 'POST'])
def admin_login():
  if request.method == 'POST' and 'username' in request.form and 'password' in request.form:
    username = request.form['username']
    password = request.form['password']
    if username == 'admin' and password == 'admin':
      session["logged_in"] = True
      return redirect('admin')    
  return render_template("login.html")

@app.before_request
def check_logged_in():
    if 'logged_in' not in session and request.endpoint == 'admin_page':
        return redirect('login')

@app.route('/admin')
def admin_page():
  cursor.execute("select * from timesheet order by rowid desc limit 15")
  rows = cursor.fetchall()
  labels = get_labels()
  return render_template('admin.html', rows = rows, labels = labels)

@app.route('/download', methods=['GET'])
def download_file():
  name = request.args.get('name')
  file_path = create_timesheet(name)
  return send_file(file_path, as_attachment=True)

@app.route('/')
def index():
  return render_template('index.html')

print(f"To access the system, please use this link: {public_url}")

app.run(port=port_number)

connection.close()

# **Train Model**

In [None]:
batch_size = 64
data_dir = '/content/drive/MyDrive/triplet_loss/faces'
training_dataset = TripletFaceDataset(data_dir=data_dir, transform=training_transform)
training_dataloader = DataLoader(training_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

validation_dir = '/content/drive/MyDrive/triplet_loss/validation_set'
validation_dataset = TripletFaceDataset(data_dir=validation_dir, transform=validation_transform)
validation_dataloader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

criterion = nn.TripletMarginLoss(margin=1.0, p=2.0, reduction='mean')

In [None]:
if os.path.exists(model_path):
  latest_model_state()

num_epochs = 50
patience = 10
counter = 0
average_loss = 0
best_val_loss = float('inf')

for epoch in range(num_epochs):
    running_loss = 0.0
    model.train()
    for anchor, positive, negative in training_dataloader:

        anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)

        optimizer.zero_grad()

        embedding_anchor, embedding_positive, embedding_negative = model(anchor), model(positive), model(negative)
        loss = criterion(embedding_anchor, embedding_positive, embedding_negative)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    epoch_train_loss = running_loss / len(training_dataloader)

    # validation loop
    running_loss = 0.0
    model.eval()
    with torch.no_grad():
        for anchor, positive, negative in validation_dataloader:

            anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)

            embedding_anchor, embedding_positive, embedding_negative = model(anchor), model(positive), model(negative)
            loss = criterion(embedding_anchor, embedding_positive, embedding_negative)

            running_loss += loss.item()

        epoch_val_loss = running_loss / len(validation_dataloader)

        if epoch_val_loss < best_val_loss and epoch_val_loss != 0:
            best_val_loss = epoch_val_loss
            counter = 0
            torch.save({
                        'epoch': epoch,
                        'model_state_dict': model.state_dict(),
                        'optimizer_state_dict': optimizer.state_dict(),
                        'loss': loss
                        }, model_path)
        else:
            counter += 1

    average_loss += epoch_train_loss
    print('Epoch [%d/%d], Training Loss: %.4f, Validation Loss: %.4f' % (epoch+1, num_epochs, epoch_train_loss, epoch_val_loss))

    if counter >= patience:
        print("Early stopping at epoch", epoch + 1)
        break

average_loss /= num_epochs
print('Average training loss: %.4f' % average_loss)

# **Test Model**

In [None]:
class TestDataset(Dataset):
    def __init__(self, data_dir, transform=validation_transform):
        self.data_dir = data_dir
        self.file_names = os.listdir(data_dir)
        self.label_encoder = LabelEncoder()
        self.labels = [file_name.split("-")[0] for file_name in self.file_names]
        self.label_encoder.fit(self.labels)
        self.transform = transform
    
    def __len__(self):
        return len(self.file_names)
    
    def __getitem__(self, index):
        file_name = self.file_names[index]
        img_path = os.path.join(self.data_dir, file_name)
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = PIL.Image.fromarray(img)
        if self.transform:
            img = self.transform(img)
        label = self.labels[index]
        label = self.label_encoder.transform([label])[0]
        return img, label

In [None]:
def get_closest_class(embedding, known_embeddings):
    min_distance = float('inf')
    closest_class = None
    for class_name, class_embeddings in known_embeddings.items():
        distances = [torch.dist(embedding, class_emb) for class_emb in class_embeddings]
        avg_distance = np.mean(distances)
        if avg_distance < min_distance:
            min_distance = avg_distance
            closest_class = class_name
    return closest_class

In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt

batch_size = 64
test_dir = '/content/drive/MyDrive/triplet_loss/testing_set'
test_dataset = TestDataset(data_dir=test_dir)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

correct = 0
total = 0
model.eval()

true_labels = []
predicted_labels = []

with torch.no_grad():
    for images, labels in test_dataloader:
        images = images.to(device)
        labels = labels.to(device)
        embeddings = model(images)
        
        for i, embedding in enumerate(embeddings):
            predicted = get_closest_class(embedding, known_embeddings)
            total += 1
            true_label = test_dataset.label_encoder.inverse_transform([labels[i].item()])[0]
            true_labels.append(true_label)
            predicted_labels.append(predicted)
            if predicted == true_label:
                correct += 1

accuracy = 100 * correct / total
print('Accuracy on test images: {:.2f}%'.format(accuracy))

conf_mat = confusion_matrix(true_labels, predicted_labels)
conf_mat_normalized = conf_mat.astype('float') / conf_mat.sum(axis=1)[:, np.newaxis]

plt.figure(figsize=(10, 10))
sns.heatmap(conf_mat_normalized, annot=True, cmap='Blues', xticklabels=test_dataset.label_encoder.classes_, yticklabels=test_dataset.label_encoder.classes_)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# **Crop Faces, Create Embeddings & SQLite Database**

In [None]:
directory= ''
image_dir = f'/content/drive/MyDrive/triplet_loss/temporary/{directory}'

output_dir = f'/content/drive/MyDrive/triplet_loss/temporary/{directory}_cropped'

if not os.path.exists(output_dir):
  os.makedirs(output_dir)

for filename in os.listdir(image_dir):

  image_path = os.path.join(image_dir, filename)
  image = cv2.imread(image_path)
  gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  faces = face_cascade.detectMultiScale(gray, 1.3, 5)
  
  for i, (x, y, w, h) in enumerate(faces):
    face = image[y:y+h, x:x+w]
    output_path = os.path.join(output_dir, f'{filename.split(".")[0]}_face{i}.jpg')
    cv2.imwrite(output_path, face)

In [None]:
model.eval()
data_dir='/content/drive/MyDrive/triplet_loss/faces'
known_embeddings = {}

for name in os.listdir(data_dir):
    name_path = os.path.join(data_dir, name)
    if os.path.isdir(name_path):
        embeddings = []
        for img_name in os.listdir(name_path):
            img_path = os.path.join(name_path, img_name)
            img = PIL.Image.open(img_path)
            img_tensor = validation_transform(img).unsqueeze(0)
            with torch.no_grad():
                embedding = model(img_tensor).squeeze().numpy()
                embeddings.append(embedding)
        known_embeddings[name] = torch.tensor(embeddings)

with open('/content/drive/MyDrive/triplet_loss/embeddings_db.pkl', 'wb') as f:
    pickle.dump(known_embeddings, f)

In [None]:
%load_ext sql

In [None]:
%sql sqlite:////content/drive/MyDrive/triplet_loss/database/timesheet.db

In [None]:
%%sql

drop table timesheet

create table timesheet(Iraso_NR integer primary key, Darbuotojo_ID varchar, label varchar, Atvykimas datetime, Isvykimas datetime);