In [None]:
!pip install wandb
!pip install ultralytics
import wandb
import ultralytics

In [None]:
import xml.etree.ElementTree as ET
import os
import pathlib
!pip install kagglehub
import kagglehub

# Download latest version
path = kagglehub.dataset_download("andrewmvd/face-mask-detection")
os.rename(path, "face_mask_dataset")
def parse_xml(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    return root
def get_yolo_bbox(xml_bbox, width, height) -> str:
    x_center = (xml_bbox[0] + xml_bbox[2]) / 2 / width
    y_center = (xml_bbox[1] + xml_bbox[3]) / 2 / height
    w = (xml_bbox[2] - xml_bbox[0]) / width
    h = (xml_bbox[3] - xml_bbox[1]) / height
    return " ".join([str(i) for i in [x_center, y_center, w, h]])
def convert_to_txt() -> None:
    data = "face_mask_dataset/annotations"
    output = "datasets/labels"
    classes = ["with_mask", "mask_weared_incorrect", "without_mask"]

    for file in os.listdir(data):
        root = ET.parse(f"{data}/{file}").getroot()
        width = int(root.find("size").find("width").text)
        height = int(root.find("size").find("height").text)
        boxes = []

        for i in root.findall("object"):
            class_id = classes.index(i.find("name").text)
            xml_bbox = [int(coords.text) for coords in i.find("bndbox")]
            yolo_bbox = get_yolo_bbox(xml_bbox, width, height)
            boxes.append((class_id, yolo_bbox))
        with open(f"{output}/{file.split('.')[0]}.txt", "w") as f:
            for class_id, yolo_bbox in boxes:
                f.write(f"{class_id} {yolo_bbox}\n")

if not os.path.exists("datasets"):
    os.mkdir("datasets")
    os.mkdir("datasets/labels")
    os.mkdir("datasets/train")
    os.mkdir("datasets/val")
    os.mkdir("datasets/test")
    os.mkdir("datasets/train/images")
    os.mkdir("datasets/val/images")
    os.mkdir("datasets/test/images")
    os.mkdir("datasets/train/labels")
    os.mkdir("datasets/val/labels")
    os.mkdir("datasets/test/labels")

def create_training_set(images) -> None:
    for i, image in enumerate(images):
        if i < 0.7 * len(images):
            os.rename(f"face_mask_dataset/images/{image}", f"datasets/train/images/{image}")
            os.rename(f"datasets/labels/{image.split('.')[0]}.txt", f"datasets/train/labels/{image.split('.')[0]}.txt")
        elif i < 0.85 * len(images):
            os.rename(f"face_mask_dataset/images/{image}", f"datasets/val/images/{image}")
            os.rename(f"datasets/labels/{image.split('.')[0]}.txt", f"datasets/val/labels/{image.split('.')[0]}.txt")
        else:
            os.rename(f"face_mask_dataset/images/{image}", f"datasets/test/images/{image}")
            os.rename(f"datasets/labels/{image.split('.')[0]}.txt", f"datasets/test/labels/{image.split('.')[0]}.txt")

yolo_yaml = f"""train: {pathlib.Path("datasets").resolve().as_posix()}/train/images
val: {pathlib.Path("datasets").resolve().as_posix()}/val/images
nc: 3
names: ['with_mask', 'mask_weared_incorrect', 'without_mask']
"""

with open("yolo.yaml", "w") as f:
    f.write(yolo_yaml)
convert_to_txt()
images = os.listdir("face_mask_dataset/images")
create_training_set(images)
os.rmdir("datasets/labels")

In [None]:
# wandb.init(project='SeoulTechML', name='yolo11m')

In [None]:
ultralytics.SETTINGS['loggers'] = ['wandb']
ultralytics.SETTINGS['wandb'] = {'project': 'SeoulTechML', 'name': 'yolo11m'}

In [None]:
# model = ultralytics.YOLO('yolo11m.pt')

In [None]:
# results = model.train(data="yolo.yaml", epochs=10, save=True, device=0)

In [None]:
from dataclasses import dataclass
import json

# Define the Data dataclass
@dataclass
class Data:
    single_person_files: list[str]
    with_mask: list[str]
    without_mask: list[str]

# Define the JSON file path
json_path = 'single_files.json'

# Create the file with default data
default_data = {
    "single_person_files": [],
    "with_mask": [],
    "without_mask": []
}
with open(json_path, 'w') as file:
    json.dump(default_data, file, indent=2)
print(f"File '{json_path}' has been created with default data.")

# Load the JSON data
with open(json_path, 'r') as file:
    raw_data = json.load(file)


In [None]:
final_model = ultralytics.YOLO('best_50.pt')

In [None]:
import os
import numpy as np

# Load the JSON data to start with the existing structure
with open(json_path, 'r') as file:
    aggregated_data = json.load(file)

# Function to split a list into chunks
def split_list(a, n):
    for i in range(0, len(a), n):
        yield a[i:i+n]

# Define the number of iterations
num_iterations = 5

# Perform the loop x times
for iteration in range(num_iterations):
    print(f"Starting Iteration {iteration + 1}/{num_iterations}")

    # Temporary data for the current iteration
    iteration_data = {
        "single_person_files": [],
        "with_mask": [],
        "without_mask": []
    }

    # Loop through the sets
    for set_type in ["train", "val", "test"]:
        images = [f"datasets/{set_type}/images/{i}" for i in os.listdir(f"datasets/{set_type}/images")]
        for chunk in split_list(images, 50):  # Process images in batches of 50
            predict = final_model.predict(chunk)  # Predict using the model
            for p in predict:
                filename = p.path.split("/")[-1]
                if len(p.boxes.cls) == 1:  # Ensure there is exactly one detected class
                    iteration_data["single_person_files"].append(filename)
                    if p.boxes.cls[0] == 0:  
                        iteration_data["with_mask"].append(filename)
                    elif p.boxes.cls[0] == 2:  
                        iteration_data["without_mask"].append(filename)

    print(f"Finished Iteration {iteration + 1}/{num_iterations}")

    # Aggregate results into the JSON structure
    aggregated_data["single_person_files"].extend(iteration_data["single_person_files"])
    aggregated_data["with_mask"].extend(iteration_data["with_mask"])
    aggregated_data["without_mask"].extend(iteration_data["without_mask"])

# Remove duplicates
final_data = {
    key: list(np.unique(values))  # Use np.unique to remove duplicates
    for key, values in aggregated_data.items()
}

# Save the aggregated data back to the JSON file
with open(json_path, "w") as file:
    json.dump(final_data, file, indent=2)

# Print summary
print(f"Final Single person files: {len(final_data['single_person_files'])}")
print(f"Final With mask: {len(final_data['with_mask'])}")
print(f"Final Without mask: {len(final_data['without_mask'])}")
print(f"Aggregated data saved to {json_path}")


In [None]:
#import json

#with open("single_files.json", "w") as f:
 # json.dump(data, f)

In [16]:
import torch as pt
import torchvision

#code here yipeee