In [5]:
from load import Dataset
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [6]:
d = Dataset("cellcycle", nan_strategy="none")

In [7]:
from typing import Union, Literal
from feature_selection import ModSelectKBest, IterativeSelect

FeatureSelector = Union[ModSelectKBest, IterativeSelect]
# Can't be imported for some reason
ImputerStrategy = Union[Literal["drop"],
                        Literal["knn"],
                        Literal["mean"],
                        Literal["median"],
                        Literal["most_frequent"],
                        Literal["constant"]]

In [8]:
from typing import Any, Dict, Optional
from sklearn.pipeline import Pipeline
from hiclass import MultiLabelHierarchicalClassifier
from hiclass.metrics import f1
from handle_nan import NumericImputer
from feature_selection import fill_reshape

def baseline(dataset: Dataset,
             hiclass_model: MultiLabelHierarchicalClassifier,
             imputer_strategy: ImputerStrategy,
             imputer_kwargs: Optional[Dict[str, Any]] = None,
             verbose: bool = False):
    x_train = dataset.x_train()
    y_train = dataset.y_train()
    
    x_test = dataset.x_test()
    y_test = dataset.y_test()
    y_test_reshaped = fill_reshape(y_test)

    pipeline = Pipeline([
        ("imputer", NumericImputer(strategy=imputer_strategy, **(imputer_kwargs or {}))),
        ("model", hiclass_model)
    ], verbose=verbose, memory="cache")
    
    pipeline.fit(x_train, y_train)
    y_pred = pipeline.predict(x_test)
    return "baseline", {}, f1(y_test_reshaped, fill_reshape(y_pred))
    

In [10]:
from hiclass import MultiLabelLocalClassifierPerNode
from sklearn.tree import DecisionTreeClassifier

base_name, _, base_f1_score = baseline(
    d,
    MultiLabelLocalClassifierPerNode(DecisionTreeClassifier()),
    imputer_strategy="mean",
    verbose=True
)
print(f"{base_name:<10}: {base_f1_score:.4f}", flush=True)


[Pipeline] ............. (step 2 of 2) Processing model, total=  34.8s
baseline  : 0.4620


In [11]:
from typing import Any, Dict, Iterable, Literal, Optional, Union
from feature_selection import ModSelectKBest, IterativeSelect, fill_reshape
from handle_nan import NumericImputer
from hiclass import MultiLabelHierarchicalClassifier
from itertools import product
from pprint import PrettyPrinter
from sklearn.pipeline import Pipeline
from hiclass.metrics import f1


pp = PrettyPrinter(indent=4)


def feature_selection(dataset: Dataset,
                      hiclass_model: MultiLabelHierarchicalClassifier,
                      imputer_strategy: ImputerStrategy,
                      imputer_kwargs: Optional[Dict[str, Any]] = None,
                      n_feature_splits: Optional[int] = None,
                      n_features: Optional[Iterable[int]] = None,
                      n_epochs: int | Iterable[int] = 100,
                      verbose: bool = False):
    def get_pipeline(imputer: NumericImputer,
                     selector: FeatureSelector,
                     verbose: bool = False) -> Pipeline:
        return Pipeline([
            ("imputer", imputer),
            ("selector", selector),
            ("model", hiclass_model)
        ], verbose=verbose, memory="cache")
    
    imputer = NumericImputer(strategy=imputer_strategy, **(imputer_kwargs or {}))
    
    x_train = dataset.x_train()
    y_train = dataset.y_train()
    imputer.fit(x_train, y_train)

    x_valid = dataset.x_valid()
    y_valid = dataset.y_valid()
    x_valid = imputer.transform(x_valid)
    
    x_test = dataset.x_test()
    y_test = dataset.y_test()
    y_test_reshaped = fill_reshape(y_test)
    
    if n_features is None:
        if n_feature_splits is None:
            n_feature_splits = 5

        n_features = [round(k * x_train.shape[1] / (n_feature_splits + 1))
                      for k
                      in range(1, n_feature_splits + 1)]
    elif any(not (0 < k <= x_train.shape[1]) for k in n_features):
            raise ValueError("Invalid number of features")
    
    if isinstance(n_epochs, int):
        n_epochs = [n_epochs]
    
    selectors = (
        ("k_best",
         [{"k": k} for k in n_features]),
        ("iterative",
         [
             {
                 "k": k,
                 "x_valid": x_valid,
                 "y_valid": y_valid,
                 "epochs": epochs,
                 "verbose": verbose,
             }
             for k, epochs
             in product(n_features, n_epochs)
         ])
    )
    
    for key, kwargs in selectors:
        for kw in kwargs:
            selector = ModSelectKBest(**kw) if key == "k_best" else IterativeSelect(**kw)
            pipeline = get_pipeline(imputer, selector, verbose=verbose)
            try:
                pipeline.fit(x_train, y_train)
                y_pred = pipeline.predict(x_test)
            except BaseException as e:
                print(f"Selector: {key} failed\nkwargs: ", flush=True)
                pp.pprint(kw)
                raise e
            score = f1(y_test_reshaped, fill_reshape(y_pred))
            if verbose:
                print(f"Selector: {key}, kwargs: {kw}, score: {score}", flush=True)
            yield key, kw, score


In [16]:
from hiclass import MultiLabelLocalClassifierPerNode
from sklearn.tree import DecisionTreeClassifier

feat_sel_results = feature_selection(d,
                                     MultiLabelLocalClassifierPerNode(DecisionTreeClassifier()),
                                     imputer_strategy="mean",
                                     n_feature_splits=1,
                                     n_epochs=1,
                                     verbose=False)

print(f"{base_name:<16}: {base_f1_score:.4f}", flush=True)
for selector, kwargs, f1_score in feat_sel_results:
    print(f"{selector:<10}({kwargs.get('k', '-'):<4}): {f1_score:.4f}", flush=True)
#  headers=["Selector", "k", "F1 score"], tablefmt="fancy_grid")

baseline        : 0.4620
