In [None]:
from logging import DEBUG, INFO, FileHandler, Formatter, Logger, StreamHandler, getLogger
from pathlib import Path

_LOGGIN_DIR = Path().parent / "logs"

if not _LOGGIN_DIR.exists():
    _LOGGIN_DIR.mkdir()


def create_logger(name: str) -> Logger:
    logger = getLogger(name)
    logger.setLevel(DEBUG)
    stream_handler = StreamHandler()
    file_handler = FileHandler(_LOGGIN_DIR / f"{name}.log")
    formatter = Formatter(fmt="%(asctime)s.%(msecs)03d %(levelname)s: %(message)s", datefmt="%Y-%m-%d,%H:%M:%S")
    stream_handler.setFormatter(formatter)
    file_handler.setFormatter(formatter)
    stream_handler.setLevel(INFO)
    file_handler.setLevel(DEBUG)
    logger.addHandler(stream_handler)
    logger.addHandler(file_handler)
    return logger

In [None]:
from hashlib import sha256
from pathlib import Path
from shutil import rmtree
from typing import Optional
from urllib.request import urlretrieve
from zipfile import ZipFile

from tqdm import tqdm

_DATASETS_DIR = Path().parent / "datasets"

if not _DATASETS_DIR.exists():
    _DATASETS_DIR.mkdir()

logger = create_logger(__name__)


class DownloadProgressBar(tqdm):
    def update_to(self, b: int = 1, bsize: int = 1, tsize: int = None):
        if tsize is not None:
            self.total = tsize
        self.update(b * bsize - self.n)


def load_dataset(name: str, url: str, hash: Optional[str] = None) -> None:
    directory_name = _DATASETS_DIR / name
    archive_name = directory_name.with_suffix(".zip")
    if not archive_name.exists():
        logger.info(f"Downloading dataset '{url}' into {archive_name}...")
        with DownloadProgressBar(unit="B", unit_scale=True, miniters=1, desc=name) as t:
            urlretrieve(url, archive_name, t.update_to)
    else:
        logger.debug(f"Dataset '{url}' is found in {archive_name}!")
    if hash is not None:
        logger.debug(f"Verifying dataset {name} archive {archive_name} SHA256 checksum...")
        if sha256(archive_name.read_bytes()).hexdigest() == hash:
            logger.info(f"Dataset {name} SHA256 verification successful!")
        else:
            raise ValueError(f"Error verifying dataset {name} archive {archive_name} SHA256 checksum!")
    if directory_name.is_dir():
        logger.debug(f"Removing previous dataset {name} directory...")
        rmtree(directory_name)
    with ZipFile(archive_name, "r") as zipfile:
        logger.debug(f"Unpacking dataset {name} into {_DATASETS_DIR}...")
        zipfile.extractall(directory_name)
    logger.info(f"Dataset {name} available in {directory_name}!")


def verify_dataset(name: str) -> None:
    directory_name = _DATASETS_DIR / name
    if directory_name.is_dir():
        logger.info(f"Dataset {name} found in {directory_name}!")
    else:
        raise RuntimeError(f"Dataset {name} does not exist!")

In [None]:
load_dataset(
    "allergen30",
    "https://prod-dcd-datasets-cache-zipfiles.s3.eu-west-1.amazonaws.com/9ygs9vhnpw-1.zip",
    "ab6e19d32f7490988ca77d600fc6f3df2e8648365c4c92ced8c1b462c01d9d9f"
)

In [None]:
load_dataset(
    "kaggle_calories",
    "https://www.kaggle.com/api/v1/datasets/download/kkhandekar/calories-in-food-items-per-100-grams"
)

In [None]:
from ultralytics import YOLO

YOLO_VERSION = "yolo11n.pt"


model = YOLO(YOLO_VERSION)
model.model_name = "find_allergens"

print("YOLO network parameters:")
for k, v in model.named_parameters():
  print(k)

In [None]:
BATCH_SIZE = 16
IMAGE_SIZE = 416

In [None]:
FREEZE_LAYERS = 10
EPOCHS_NUMBER = 15
LEARNING_RATE = 0.001


model.train(
    data="datasets/allergen30.yaml",
    epochs=EPOCHS_NUMBER,
    batch=BATCH_SIZE,
    freeze=FREEZE_LAYERS,
    imgsz=IMAGE_SIZE,
    lr0=LEARNING_RATE
)

In [None]:
EPOCHS_NUMBER = 7
LEARNING_RATE = 0.0001


model.train(
    data="datasets/allergen30.yaml",
    epochs=EPOCHS_NUMBER,
    batch=BATCH_SIZE,
    imgsz=IMAGE_SIZE,
    lr0=LEARNING_RATE
)

In [None]:
results = model.predict(source="datasets/allergen30/Allergen30/test/images", show_labels=True, conf=0.25)
print("Test set image predictions:")
for r in results:
    print(r.boxes.data)

In [None]:
model.save("find_allergens.pt")
model.export(format="onnx", imgsz=IMAGE_SIZE, optimize=True, simplify=True, int8=True, data="datasets/allergen30.yaml", nms=True)

In [None]:
from os import PathLike
from typing import List, Tuple, Union, Optional
from pathlib import Path

from cv2 import imread, cvtColor, resize, COLOR_BGR2RGB
from pandas import read_csv
from torch import Tensor, load, device
from torch import max, argmax
from torch.cuda import is_available
from torchvision.transforms import ToTensor


class AllergenCaloriesPredictor:
    def __init__(self, model_file: Union[str, bytes, PathLike], calories_dataset: Union[str, bytes, PathLike]) -> None:
        model_file = Path(model_file)
        if not model_file.exists():
            raise RuntimeError(f"Model file '{model_file}' not found!")
        calories_dataset = Path(calories_dataset)
        if not calories_dataset.exists():
            raise RuntimeError(f"Calories dataset '{calories_dataset}' not found!")

        dev = device("cuda") if is_available() else device("cpu")
        self._model = load(model_file.absolute(), dev, weights_only=False)["model"]
        self._model.eval()
        self._calories = read_csv(calories_dataset)
        labels = [label.capitalize() for label in self._model.names.values()]
        self._calories = self._calories[self._calories["FoodItem"].isin(labels)]

    def _prepare_image(self, image_path: Union[str, bytes, PathLike]) -> Tensor:
        image = imread(image_path)
        image = cvtColor(image, COLOR_BGR2RGB)
        image = resize(image, (IMAGE_SIZE, IMAGE_SIZE))
        image_tensor = ToTensor()(image)
        return image_tensor.unsqueeze(0).half()

    def _parse_yolo_output(self, outputs, conf_threshold: float = 0.5) -> Tuple[Tensor, Tensor]:
        detection = outputs[0].view(1, 34, -1)
        class_confidences = detection[:, 5:, :]
        filter_mask = max(class_confidences, dim=1)[0] > conf_threshold
        filtered_boxes = detection[:, :4, :].permute(0, 2, 1)[filter_mask].view(-1, 4)
        filtered_classes = class_confidences.permute(0, 2, 1)[filter_mask].view(-1, class_confidences.size(1))
        return filtered_boxes, filtered_classes

    def predict(self, image_path: Union[str, bytes, PathLike], confidence: float = 0.5, top: int = 3) -> List[Tuple[str, Optional[str], float, Tuple[float, float, float, float]]]:
        prediction = self._model(self._prepare_image(image_path))
        fboxes, fclasses = self._parse_yolo_output(prediction[0], confidence)
        fargmaxes = [argmax(fc).item() for fc in fclasses]
        result = list()
        for fa, fc, fb in zip(fargmaxes, fclasses, fboxes):
            label = self._model.names[fa + 1]
            calories = self._calories[self._calories["FoodItem"] == label.capitalize()]
            calories = None if len(calories) == 0 else calories.iloc[0]["Cals_per100grams"]
            confidence = fc[fa].item()
            location = tuple(fb.tolist())
            result += [(label, calories, confidence, location)]
        return sorted(result, key=lambda r: r[2], reverse=True)[:top]


In [None]:
avopic = "datasets/allergen30/Allergen30/test/images/00000855_jpg.rf.aae89d3987f6e278cb471a0ad94aeb70.jpg"
acp = AllergenCaloriesPredictor("find_allergens.pt", "datasets/kaggle_calories/calories.csv")
for pclass, cals, pconf, pbox in acp.predict(avopic, 0.75):
    print(f"Object found: {pclass}\n\tconfidence: {pconf}\n\tcalories: {cals if cals is not None else 'unknown :('}\n\tat: {pbox}\n")