## Installing and importing all libraries for our project

In [None]:
%%capture
%pip install ultralytics;
%pip install -U ipywidgets
%pip install gdown

In [None]:
import os
import sys

from ultralytics import YOLO
import xml.etree.ElementTree as ET
from os import listdir, getcwd
from os.path import join
import shutil
import random
import matplotlib.pyplot as plt
import cv2
import numpy as np

In [None]:
class SuppressOutput:
    def __enter__(self):
        self._stdout = sys.stdout
        self._stderr = sys.stderr
        sys.stdout = open(os.devnull, 'w')
        sys.stderr = open(os.devnull, 'w')
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stderr.close()
        sys.stdout = self._stdout
        sys.stderr = self._stderr

## Downloading and preparing data for training ML-models

### 1. Dataset for human detection

In [None]:
# Downloading dataset for human detection training
!gdown 'https://drive.google.com/u/0/uc?id=1--0QuKMwj31K-CSvD8oq5fceFweiFPuN&export=download'

In [None]:
with SuppressOutput():
    # Unpackaging
    !unzip /kaggle/working/human_detection_dataset.zip
print("Unpacking complete")

In [None]:
!rm /kaggle/working/human_detection_dataset.zip
print("The origin zip-file has been deleted")

### 2. Dataset for mask detection

In [None]:
# Классы объектов (перечислите все классы, которые используются)
classes = ['without_mask', 'mask_weared_incorrect', 'with_mask']

# Функция для конвертации координат
def convert_bbox_to_yolo(size, box):
    if size[0] == 0:
        dw = 1./(size[0]+0.00001)
    else:
        dw = 1./(size[0])
        
    if size[0] == 0:
        dh = 1./(size[1]+0.00001)
    else:
        dh = 1./(size[1])
    x_center = (box[0] + box[1]) / 2.0 - 1
    y_center = (box[2] + box[3]) / 2.0 - 1
    width = box[1] - box[0]
    height = box[3] - box[2]
    return (x_center * dw, y_center * dh, width * dw, height * dh)

# Функция для конвертации XML-аннотации в YOLO-формат
def convert_xml_to_yolo(xml_file, output_dir):
    tree = ET.parse(xml_file)
    root = tree.getroot()

    # Получение размера изображения
    size = root.find('size')
    width = int(size.find('width').text)
    height = int(size.find('height').text)

    # Имя изображения
    filename = root.find('filename').text
    output_file = os.path.join(output_dir, os.path.splitext(filename)[0] + '.txt')

    with open(output_file, 'w') as out_file:
        for obj in root.iter('object'):
            cls = obj.find('name').text
            if cls not in classes:
                continue
            cls_id = classes.index(cls)
            xmlbox = obj.find('bndbox')
            bbox = (
                float(xmlbox.find('xmin').text), 
                float(xmlbox.find('xmax').text),
                float(xmlbox.find('ymin').text), 
                float(xmlbox.find('ymax').text)
            )
            bbox_yolo = convert_bbox_to_yolo((width, height), bbox)
            out_file.write(f"{cls_id} {' '.join(map(str, bbox_yolo))}\n")


            
xml_dir = '/kaggle/input/face-mask-detection/annotations'  # The original path to XML-files
image_dir = '/kaggle/input/face-mask-detection/images'  # The original path to XML-files

output_train_dir = '/kaggle/working/face-mask-detection/train'
output_val_dir = '/kaggle/working/face-mask-detection/val'
train_ratio = 0.8  # Соотношение тренировочных данных

os.makedirs(os.path.join(output_train_dir, 'images'), exist_ok=True)
os.makedirs(os.path.join(output_train_dir, 'labels'), exist_ok=True)
os.makedirs(os.path.join(output_val_dir, 'images'), exist_ok=True)
os.makedirs(os.path.join(output_val_dir, 'labels'), exist_ok=True)

# Получение всех XML файлов
xml_files = [f for f in os.listdir(xml_dir) if f.endswith('.xml')]
random.shuffle(xml_files)
# Разделение на тренировочные и валидационные наборы
train_size = int(len(xml_files) * train_ratio)
train_files = xml_files[:train_size]
val_files = xml_files[train_size:]

def move_files(xml_files, dest_dir):
    for xml_file in xml_files:
        image_file = xml_file.replace('.xml', '.png')
        if not os.path.exists(os.path.join(image_dir, image_file)):
            continue
        convert_xml_to_yolo(os.path.join(xml_dir, xml_file), os.path.join(dest_dir, 'labels'))
        shutil.copy(os.path.join(image_dir, image_file), os.path.join(dest_dir, 'images', image_file))

# Перемещаем тренировочные и валидационные файлы
move_files(train_files, output_train_dir)
move_files(val_files, output_val_dir)

print("Convertation and split are completed.")

## Trainig models (both are YOLOv8)

In [None]:
human_detection_model = YOLO('yolov8m.pt')
mask_detection_model = YOLO('yolov8m.pt')

In [None]:
def process_video_with_masks(model, video_path, output_path):
    cap = cv2.VideoCapture(video_path)
    
    # Getting video parameters
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Creating VideoWriter object to write new video with predictions
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    class_colors = {
        0: (0, 0, 255),
        1: (0, 0, 255),
        2: (0, 255, 0),
    }
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Predicting
        results = model(frame, iou = 0.4)

        # Visualizing results        
        for result in results:
            for bbox in result.boxes:
                class_id = int(bbox.cls)  # Ensure class_id is an integer
                color = class_colors.get(class_id, (255, 255, 255))  # Default to white if class not in dictionary
                
                # Convert bbox.xyxy tensor to list
                bbox_coords = bbox.xyxy.tolist()
                x1, y1, x2, y2 = map(int, bbox_coords[0])
                label = f'Class {class_id}'
                
                # Draw bounding box
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                # Draw label
#                 cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        
        # Writing annotated fram to new videofile
        out.write(frame)
    
    # Resources release
    cap.release()
    out.release()

### 1. Training mask detection model

In [None]:
os.environ['WANDB_MODE'] = 'disabled'

result = mask_detection_model.train(
    data='/kaggle/input/mask-data/mask_data.yaml',  # Path to data.yaml
    epochs=35,  # Epoch number
    imgsz=640,  # Image size
    batch=16,    # Batch size
)
print("Training of mask detection model complete")

In [None]:
output_video_path = "output_mask_video.avi"
# Применение модели к видео
process_video_with_masks(mask_detection_model, "/kaggle/input/test-dataset/crowd_1280_720_30fps.mp4", output_video_path)

### 2. Training human detection model

In [None]:
def process_video(model, video_path, output_path):
    cap = cv2.VideoCapture(video_path)
    
    # Getting video parameters
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Creating VideoWriter object to write new video with predictions
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Predicting
        results = model(frame, iou = 0.4, classes = [0])

        # Visualizing results        
        for result in results:
            annotated_frame = result.plot()
        
        # Writing annotated fram to new videofile
        out.write(annotated_frame)
    
    # Resources release
    cap.release()
    out.release()

In [None]:
# result = human_detection_model.train(
#     data='/kaggle/working/human_detection_dataset/data.yaml',  # Path data.yaml
#     epochs=100,  # Epoch number
#     imgsz=640,  # Image size
#     batch=16,    # Batch size
# )

In [None]:
output_video_path = "output_human_video.avi"
# Applying model to video
process_video(human_detection_model, "/kaggle/input/test-dataset/crowd_1280_720_30fps.mp4", output_video_path)

In [None]:
output_video_path = "output_human_mask_video.avi"
# Applying model to video
process_video(human_detection_model, "/kaggle/input/test-dataset/crowd_1280_720_30fps.mp4", output_video_path)
process_video_with_masks(mask_detection_model, "/kaggle/working/output_human_mask_video.avi", output_video_path)


## Results

### 1. Result for mask detection model

In [None]:
result_dir = '/kaggle/working/runs/detect/train'
files = os.listdir(result_dir)

if files:
    for file in files:
        img_path = os.path.join(result_dir, file)
        # Image uploading via OpenCV
        img = cv2.imread(img_path)

        if img is not None:
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            plt.figure(figsize=(10, 10))
            plt.imshow(img_rgb)
            plt.axis('off')
            plt.title(file)
            plt.show()
        else:
            print(f"Can't load the image: {file}")
else:
    print("Results haven't been saved. Check the path.")

In [None]:
results = mask_detection_model.predict(source='/kaggle/input/mask-data/test.jpeg', save=True)

In [None]:
result_dir = '/kaggle/working/runs/detect/train3'
files = os.listdir(result_dir)

if files:
    for file in files:
        img_path = os.path.join(result_dir, file)
        # Image uploading via OpenCV
        img = cv2.imread(img_path)

        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # Отображение изображения
        plt.figure(figsize=(10, 10))
        plt.imshow(img_rgb)
        plt.axis('off')
        plt.title(file)
        plt.show()
else:
    print("Результаты не были сохранены. Проверьте, правильно ли указан путь.")

In [None]:
results = mask_detection_model.predict(source='/kaggle/input/mask-data/masktypes.jpg', save=True)

In [None]:
result_dir = '/kaggle/working/runs/detect/train4'
# Список файлов в папке с результатами
files = os.listdir(result_dir)

if files:
    # Вывод первых нескольких файлов для проверки
    print("Файлы в папке с результатами:", files)

    for file in files:
        img_path = os.path.join(result_dir, file)
        # Загрузка изображения с использованием OpenCV
        img = cv2.imread(img_path)
        # Преобразование изображения в формат RGB для отображения с Matplotlib
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # Отображение изображения
        plt.figure(figsize=(10, 10))
        plt.imshow(img_rgb)
        plt.axis('off')
        plt.title(file)
        plt.show()
else:
    print("Результаты не были сохранены. Проверьте, правильно ли указан путь.")

### 2. Results for human detection model

In [None]:
# result_dir = '/kaggle/working/runs/detect/train2'
# # Список файлов в папке с результатами
# files = os.listdir(result_dir)

# if files:
#     print("Файлы в папке с результатами:", files)

#     for file in files:
#         img_path = os.path.join(result_dir, file)
#         # Загрузка изображения с использованием OpenCV
#         img = cv2.imread(img_path)

#         if img is not None:
#             # Преобразование изображения в формат RGB для отображения с Matplotlib
#             img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

#             # Отображение изображения
#             plt.figure(figsize=(10, 10))
#             plt.imshow(img_rgb)
#             plt.axis('off')
#             plt.title(file)
#             plt.show()
#         else:
#             print(f"Не удалось загрузить изображение: {file}")
# else:
#     print("Результаты не были сохранены. Проверьте, правильно ли указан путь.")

In [None]:
# results = human_detection_model.predict(source='/kaggle/input/mask-data/test.jpeg', save=True)

In [None]:
# result_dir = '/kaggle/working/runs/detect/train22'
# # Список файлов в папке с результатами
# files = os.listdir(result_dir)

# if files:
#     # Вывод первых нескольких файлов для проверки
#     print("Файлы в папке с результатами:", files)

#     for file in files:
#         img_path = os.path.join(result_dir, file)
#         # Загрузка изображения с использованием OpenCV
#         img = cv2.imread(img_path)
#         # Преобразование изображения в формат RGB для отображения с Matplotlib
#         img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
#         # Отображение изображения
#         plt.figure(figsize=(10, 10))
#         plt.imshow(img_rgb)
#         plt.axis('off')
#         plt.title(file)
#         plt.show()
# else:
#     print("Результаты не были сохранены. Проверьте, правильно ли указан путь.")

In [None]:
# results = human_detection_model.predict(source='/kaggle/input/mask-data/masktypes.jpg', save=True)

In [None]:
# result_dir = '/kaggle/working/runs/detect/train23'
# # Список файлов в папке с результатами
# files = os.listdir(result_dir)

# if files:
#     # Вывод первых нескольких файлов для проверки
#     print("Файлы в папке с результатами:", files)

#     for file in files:
#         img_path = os.path.join(result_dir, file)
#         # Загрузка изображения с использованием OpenCV
#         img = cv2.imread(img_path)
#         # Преобразование изображения в формат RGB для отображения с Matplotlib
#         img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
#         # Отображение изображения
#         plt.figure(figsize=(10, 10))
#         plt.imshow(img_rgb)
#         plt.axis('off')
#         plt.title(file)
#         plt.show()
# else:
#     print("Результаты не были сохранены. Проверьте, правильно ли указан путь.")

In [None]:
# results = human_detection_model.predict(source='/kaggle/working/runs/detect/train4/masktypes.jpg', save=True)

In [None]:
# result_dir = '/kaggle/working/runs/detect/train24'
# # Список файлов в папке с результатами
# files = os.listdir(result_dir)

# if files:
#     # Вывод первых нескольких файлов для проверки
#     print("Файлы в папке с результатами:", files)

#     for file in files:
#         img_path = os.path.join(result_dir, file)
#         # Загрузка изображения с использованием OpenCV
#         img = cv2.imread(img_path)
#         # Преобразование изображения в формат RGB для отображения с Matplotlib
#         img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
#         # Отображение изображения
#         plt.figure(figsize=(10, 10))
#         plt.imshow(img_rgb)
#         plt.axis('off')
#         plt.title(file)
#         plt.show()
# else:
#     print("Результаты не были сохранены. Проверьте, правильно ли указан путь.")