In [None]:
import numpy as np
from datasets import DatasetDict, load_dataset
from skimage.feature import hog
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, VotingClassifier
from config import Config
import time
from sklearn.metrics import accuracy_score, classification_report
from joblib import dump
config = Config()

# 1. Preprocessing

In [None]:
def load_hf_dataset(name, *, split=None, cache_dir=None):
    return load_dataset(name, split=split, cache_dir=cache_dir)


def ensure_splits(dataset, *, config=None, seed=42):
    if isinstance(dataset, DatasetDict):
        if "train" in dataset and ("test" not in dataset or "val" not in dataset):
            return ensure_splits(dataset["train"], config=config, seed=seed)
        return dataset

    config = config or Config()
    holdout = config.test_size + config.val_size

    split = dataset.train_test_split(test_size=holdout, seed=seed)
    if config.val_size <= 0:
        return DatasetDict(train=split["train"], test=split["test"])

    val_ratio = config.val_size / holdout
    test_val = split["test"].train_test_split(test_size=val_ratio, seed=seed)
    return DatasetDict(train=split["train"], val=test_val["test"], test=test_val["train"])


def _image_to_array(image, *, resize=None, normalize=True):
    if resize is not None:
        image = image.resize(resize)
    arr = np.asarray(image)
    if arr.ndim == 2:
        arr = np.stack([arr] * 3, axis=-1)
    if normalize:
        arr = arr.astype("float32") / 255.0
    return arr


def _image_to_hog(image, *, resize=None, normalize=True, hog_params=None):
    if resize is not None:
        image = image.resize(resize)
    if getattr(image, "mode", None) != "L":
        image = image.convert("L")
    arr = np.asarray(image)
    if normalize:
        arr = arr.astype("float32") / 255.0
    return hog(arr, **hog_params)


def flatten_images(
    dataset,
    *,
    image_col="image",
    label_col="label",
    resize=None,
    normalize=True,
    data_format="flatten",
    hog_params=None,
):
    hog_params = hog_params or {
        "orientations": 9,
        "pixels_per_cell": (8, 8),
        "cells_per_block": (2, 2),
        "block_norm": "L2-Hys",
    }
    converters = {
        "flatten": lambda img: _image_to_array(
            img, resize=resize, normalize=normalize
        ).reshape(-1),
        "hog": lambda img: _image_to_hog(
            img, resize=resize, normalize=normalize, hog_params=hog_params
        ),
    }
    convert = converters[data_format]

    def _prepare_batch(batch):
        images = batch[image_col]
        labels = batch[label_col]
        features = [convert(img) for img in images]
        return {"features": features, "labels": labels}

    if isinstance(dataset, DatasetDict):
        remove_columns = dataset["train"].column_names
        return dataset.map(_prepare_batch, batched=True, remove_columns=remove_columns)
    return dataset.map(_prepare_batch, batched=True, remove_columns=dataset.column_names)


def to_numpy(dataset, *, feature_col="features", label_col="labels"):
    X = np.stack(dataset[feature_col])
    y = np.asarray(dataset[label_col])
    return X, y


def prepare_for_classification(
    name,
    *,
    image_col="image",
    label_col="label",
    resize=None,
    normalize=None,
    data_format=None,
    hog_params=None,
    config=None,
    seed=42,
):
    dataset = ensure_splits(load_hf_dataset(name), config=config, seed=seed)
    config = config or Config()
    data_format = data_format or config.data_format
    hog_params = hog_params or config.hog_params
    if resize is None:
        resize = config.image_size
    if normalize is None:
        normalize = config.normalize
    dataset = flatten_images(
        dataset,
        image_col=image_col,
        label_col=label_col,
        resize=resize,
        normalize=normalize,
        data_format=data_format,
        hog_params=hog_params,
    )
    return dataset

## 1.1. Data import

In [None]:
def dataset(name=None, config=None):
    config = config or Config()
    ds = prepare_for_classification(name or config.dataset_name, config=config, seed=config.seed)
    X_train, y_train = to_numpy(ds["train"])
    X_val, y_val = to_numpy(ds["val"]) if "val" in ds else (None, None)
    X_test, y_test = to_numpy(ds["test"])
    print("train:", X_train.shape, y_train.shape)
    if X_val is not None:
        print("val:", X_val.shape, y_val.shape)
    print("test:", X_test.shape, y_test.shape)
    return X_train, y_train, X_val, y_val, X_test, y_test

dataset()

# 2. Implementation

In [None]:
def build_svm(**kwargs):
    return SVC(**kwargs)


def build_random_forest(**kwargs):
    return RandomForestClassifier(**kwargs)


def build_boosting(**kwargs):
    return GradientBoostingClassifier(**kwargs)


def build_voting(
    *,
    svm_params,
    random_forest_params,
    voting="soft",
    weights=None,
    **kwargs,
):
    svm_params = dict(svm_params)
    if voting == "soft" and "probability" not in svm_params:
        svm_params["probability"] = True
    svm_model = build_svm(**svm_params)
    rf_model = build_random_forest(**random_forest_params)
    return VotingClassifier(
        estimators=[("svm", svm_model), ("random_forest", rf_model)],
        voting=voting,
        weights=weights,
        **kwargs,
    )


def get_models():
    return {
        "svm": build_svm(),
        "random_forest": build_random_forest(),
        "boosting": build_boosting(),
        "voting": build_voting(
            svm_params=config.svm_params,
            random_forest_params=config.random_forest_params,
            voting=config.voting_params.get("voting", "soft"),
        ),
    }

In [None]:
def _with_verbose(params, *, enabled, value):
    if not enabled or "verbose" in params:
        return params
    return {**params, "verbose": value}


def build_model(config, model_name):
    builders = {
        "svm": lambda: build_svm(
            **_with_verbose(config.svm_params, enabled=config.training_verbose, value=True)
        ),
        "random_forest": lambda: build_random_forest(
            **_with_verbose(config.random_forest_params, enabled=config.training_verbose, value=1)
        ),
        "boosting": lambda: build_boosting(
            **_with_verbose(config.boosting_params, enabled=config.training_verbose, value=1)
        ),
        "voting": lambda: build_voting(
            svm_params=config.svm_params,
            random_forest_params=config.random_forest_params,
            **_with_verbose(config.voting_params, enabled=config.training_verbose, value=1),
        ),
    }
    return builders[model_name]()


def train_and_eval(config=None):
    config = config or Config()
    print("loading data...")
    X_train, y_train, X_val, y_val, X_test, y_test = dataset(config=config)
    print("data loaded")
    print(f"train size: {X_train.shape[0]}")
    if X_val is not None:
        print(f"val size: {X_val.shape[0]}")
    print(f"test size: {X_test.shape[0]}")

    model_list = config.model if isinstance(config.model, list) else [config.model]
    trained = {}
    val_scores = {}
    total_models = len(model_list)
    for idx, model_name in enumerate(model_list, start=1):
        model = build_model(config, model_name)
        print(f"model {idx}/{total_models}: {model_name}")
        print(f"model params ({model_name}): {model.get_params()}")
        print(f"training starting ({model_name})...")
        start_time = time.time()
        model.fit(X_train, y_train)
        elapsed = time.time() - start_time
        print("training finished")
        print(f"training time ({model_name}): {elapsed:.2f}s")

        if X_val is not None:
            val_pred = model.predict(X_val)
            val_acc = accuracy_score(y_val, val_pred)
            val_scores[model_name] = val_acc
            print("val accuracy:", val_acc)

        model_path = f"model_{model_name}.pth"
        dump(model, model_path)
        print(f"model saved: {model_path}")
        trained[model_name] = model

    if X_val is not None and val_scores:
        best_model_name = max(val_scores, key=val_scores.get)
        print(f"best model by val accuracy: {best_model_name} ({val_scores[best_model_name]:.4f})")
    else:
        best_model_name = model_list[0]
        print("no validation set available; evaluating test on first model only")

    best_model = trained[best_model_name]
    print(f"evaluating test set ({best_model_name})...")
    test_pred = best_model.predict(X_test)
    print("test accuracy:", accuracy_score(y_test, test_pred))
    print(classification_report(y_test, test_pred))

    return trained