In [None]:
def widget_get(name, default):
    try:
        if value := dbutils.widgets.get(name):
            return value
    except Exception:
        pass
    dbutils.widgets.text(name, default)
    return default


In [None]:
import mlflow

CATALOG_NAME = widget_get("CATALOG_NAME", "iot_dev")
SCHEMA_NAME = widget_get("SCHEMA_NAME", "iot_ingest")
MODEL_ID = widget_get("MODEL_ID", "yolo_object_detect")
ALIAS = widget_get("ALIAS", "champion")
TEST_IMAGES_PATH = widget_get("TEST_IMAGES_PATH", "test_images")
REGISTERED_MODEL_NAME = f"{CATALOG_NAME}.{SCHEMA_NAME}.{MODEL_ID}"
EXPERIMENT_PATH = f"/Shared/{REGISTERED_MODEL_NAME}"

mlflow.set_experiment(EXPERIMENT_PATH)

In [None]:
from pathlib import Path
import re
from PIL import Image
import io

def image_to_base64(input, verify:bool = False) -> str:
    raw=image_bytes(input, verify)
    return base64.b64encode(raw).decode("utf-8")

def image_bytes(input, verify:bool = False) -> bytes:
    if isinstance(input, bytes):
        raw=input
    elif isinstance(input, Path):
        raw= input.read_bytes()
    else:
        input_str=str(input)
        input_path=Path(input_str)
        if input_path.is_file():
            raw= image_bytes(input_path, verify=False)
        else:
            raw = base64.b64decode(re.sub(r"data:.*?,", "", input_str))
    if(verify):
        Image.open(io.BytesIO(raw)).verify()
    return raw

In [None]:
from pathlib import Path
from typing import Iterable

def test_images() -> Iterable[Path]:
    default_files = []
    other_files = []

    for p in Path(TEST_IMAGES_PATH).rglob("*"):
        if not p.is_file():
            continue

        if p.stem == "default":
            default_files.append(p)
        else:
            other_files.append(p)

    for p in default_files:
        yield p

    for p in other_files:
        yield p

In [None]:
from pathlib import Path
import base64
import json
import numpy as np
import pandas as pd

def annotate_image(image, predictions, target_height:int|None =480) -> bytes:
    import cv2
    raw = image_bytes(image)
    img = cv2.imdecode(np.frombuffer(raw, np.uint8), cv2.IMREAD_COLOR)

   

    for box in predictions:
        x1 = int(box["x1"])
        y1 = int(box["y1"])
        x2 = int(box["x2"])
        y2 = int(box["y2"])
        label = box["label"]
        conf = box["confidence"]

        cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(
            img,
            f"{label} {conf:.2f}",
            (x1, max(y1 - 5, 10)),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.6,
            (0, 255, 0),
            2,
        )
    if target_height:
        # resize annotated image to 480p height
        h, w = img.shape[:2]
        scale = target_height / h
        new_w = int(w * scale)
        img = cv2.resize(img, (new_w, target_height))

    _, buf = cv2.imencode(".png", img)
    return buf.tobytes()