# Metaheuristic course - Session 4

> For this session you will need to install the sklearn python package.

In [1]:
from typing import List, Tuple, Union
from sklearn.datasets import load_iris
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import Normalizer, StandardScaler
from sklearn.svm import SVC

## Let's make a pipeline

A pipeline is a concatenation of structures that can help us process a dataset and train a model for a given task. In this session we will be building simple pipelines using a few transformation structures and some clasification estimators.

Use the `get_pipeline` function defined below to build a pipeline. The arguments of this function are:

- `pca_pos`: (Integer between 0 and 2, or equals -1) Position of the PCA in the pipeline. If -1 then no PCA is used.
- `pca_n_components`: If PCA is used, this defines the `n_components` PCA hyperparameter.
- `normalizer_pos`: (Integer between 0 and 2, or equals -1) Position of the Normalizer in the pipeline. If -1 then no Normalizer is used.
- `normalizer_norm`: If Normalizer is used, this defines the `norm` Normalizer hyperparameter.
- `standar_scaler_pos`: (Integer between 0 and 2, or equals -1) Position of the StandarScaler in the pipeline. If -1 then no StandarScaler is used.
- `use_rfc`: (Boolean) Defines if the RandomForestCalsifier is used. 
- `rfc_n_estimators`: If RFC is used, this defines the `n_estimators` hyperparameter.
- `rfc_max_depth`: If RFC is used, this defines the `n_estimators` hyperparameter.
- `use_knc`: (Boolean) Defines if the KNeighborsClasifier is used.
- `knc_n_neighbors`: If KNC is used, this defines the `n_neighbors` hyperparameter.
- `use_svc`: (Boolean) Defines if the RandomForestCalsifier is used.
- `svc_c`: If SVC is used, this defines the `C` hyperparameter.
- `svc_degree`: If SVC is used, this defines the `degree` hyperparameter.

> The default values for the hyperparameters are the same as the ones defined by sklearn.

In [2]:
def get_pipeline(
    pca_pos: int = -1,
    pca_n_components=None,
    normalizer_pos: int = -1,
    normalizer_norm: str = "l2",
    standar_scaler_pos: int = -1,
    use_rfc: bool = False,
    rfc_n_estimators: int = 100,
    rfc_max_depth: Union[int, None] = None,
    use_knc: bool = False,
    knc_n_neighbors: int = 5,
    use_svc: bool = False,
    svc_c: float = 1.0,
    svc_degree: int = 3,
) -> Pipeline:
    pipeline: List[Union[None, Tuple]] = [None] * 4

    assert pca_pos == -1 or 0 <= pca_pos <= 2
    assert normalizer_pos == -1 or 0 <= normalizer_pos <= 2
    assert standar_scaler_pos == -1 or 0 <= standar_scaler_pos <= 2

    if pca_pos >= 0:
        pipeline[pca_pos] = ("pca", PCA(n_components=pca_n_components))

    if normalizer_pos >= 0:
        pipeline[normalizer_pos] = ("normalizer", Normalizer(norm=normalizer_norm))

    if standar_scaler_pos >= 0:
        pipeline[standar_scaler_pos] = ("scaler", StandardScaler())

    pipeline = [item for item in pipeline if item is not None]
        
    assert sum((use_knc, use_rfc, use_svc)) == 1, "Exactly one classifier must be defined"

    if use_rfc:
        pipeline.append(
            (
                "rdf",
                RandomForestClassifier(
                    n_estimators=rfc_n_estimators,
                    max_depth=rfc_max_depth,
                ),
            )
        )
    if use_knc:
        pipeline.append(
            (
                "knc",
                KNeighborsClassifier(
                    n_neighbors=knc_n_neighbors,
                ),
            )
        )
    if use_svc:
        pipeline.append(
            (
                "svc",
                SVC(
                    C=svc_c,
                    degree=svc_degree,
                ),
            )
        )

    return Pipeline(pipeline)

For example, let's define some pipelines:

In [3]:
get_pipeline(use_knc=True)

Pipeline(steps=[('knc', KNeighborsClassifier())])

In [4]:
get_pipeline(pca_pos=1, normalizer_pos=0, use_rfc=True, rfc_n_estimators=10)

Pipeline(steps=[('normalizer', Normalizer()), ('pca', PCA()),
                ('rdf', RandomForestClassifier(n_estimators=10))])

Once you have build your pipeline you can train and test the estimator against a dataset. For example, let's use the iris sklearn dataset:

In [7]:
iris_ds = load_iris()
X, y = iris_ds["data"], iris_ds["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.35)

p2 = get_pipeline(pca_pos=1, normalizer_pos=0, use_rfc=True, rfc_n_estimators=10)
p2.fit(X_train, y_train)
p2.score(X_test, y_test)

0.9622641509433962

As you can see, this secuence of steps (pipeline) can predict very well the data from the iris dataset. However this is a small and very simple dataset. You can find more information about the sklearn dataset [here](https://scikit-learn.org/stable/datasets/toy_dataset.html).

So, the question is: given a dataset, what is the best pipeline you can build for a given task (e.g. clasification)?

Your task today is to build an heuristic that finds that pipeline using the tools of this notebook :)

In [13]:
from random import uniform

class Estimator:
    def __init__(self) -> None:
        self.p_pca_pos = {-1:1/4, 0:1/4, 1:1/4, 2:1/4}
        self.p_normalizer_pos = {-1:1/4, 0:1/4, 1:1/4, 2:1/4}
        self.p_standard_scaler_pos = {-1:1/4, 0:1/4, 1:1/4, 2:1/4}
        self.p_model = {1: 1/3, 2:1/3, 3:1/3}
    
    def sample_individual(self):
        u1 = uniform(0, 1)
        u2 = uniform(0, 1)
        u3 = uniform(0, 1)
        u4 = uniform(0, 1)

        dist_pca_pos = [0 for i in range(4)]
        dist_normalizer_pos = [0 for i in range(4)]
        dist_stardard_scaler_pos = [0 for i in range(4)]
        dist_model = [0 for i in range(3)]

        pca_keys = list(self.p_pca_pos.keys())
        normalizer_keys = list(self.p_normalizer_pos.keys())
        standard_scaler_keys = list(self.p_standard_scaler_pos.keys())
        model_keys = list(self.p_model.keys())
        for i in range(4):
            dist_pca_pos[i] = self.p_pca_pos[pca_keys[i]] + (0 if i == 0 else dist_pca_pos[i-1])
            dist_normalizer_pos[i] = self.p_normalizer_pos[normalizer_keys[i]] + (0 if i == 0 else dist_normalizer_pos[i-1])
            dist_stardard_scaler_pos[i] = self.p_standard_scaler_pos[standard_scaler_keys[i]] + (0 if i == 0 else dist_stardard_scaler_pos[i-1])

        for i in range(3):
            dist_model[i] = self.p_model[model_keys[i]] + (0 if i == 0 else dist_model[i-1])

        pca_pos = pca_keys[self.estimate_discrete_uniform(dist_pca_pos, u1)]
        normalizer_pos = normalizer_keys[self.estimate_discrete_uniform(dist_normalizer_pos, u2)]
        standard_scaler_pos = standard_scaler_keys[self.estimate_discrete_uniform(dist_stardard_scaler_pos, u3)]
        model = model_keys[self.estimate_discrete_uniform(dist_model, u4)]
        return [pca_pos, normalizer_pos, standard_scaler_pos, model]

    def estimate_discrete_uniform(self, distribution, u):
        assert 0 <= u <= 1
        index = 0
        for value in distribution:
            if u < value:
                return index
            index += 1
        return index

def goal_function_iris(pipeline):
    iris_ds = load_iris()
    X, y = iris_ds["data"], iris_ds["target"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.35)
    pipeline.fit(X_train, y_train)
    return p2.score(X_test, y_test)

def umda_classification(max_generations=20, population_size=20):
    estimator = Estimator()
    population = generate_population(estimator, population_size)
    best_pipeline = None
    best_value = 0
    for i in range(max_generations):
        best_individuals = get_best_individuals(population, population_size/2)
        fine_tune_estimator(estimator, best_individuals)
        population = generate_population(estimator, population_size)
        best_pipeline, best_value = get_best_pipeline(population, best_pipeline, best_value)
    return best_pipeline, best_value

def get_best_pipeline(population, best_pipeline, best_value):
    current_best_pipeline = best_pipeline
    current_best_value = best_value
    for individual in population:
        pipeline = get_pipeline_by_individual(individual)
        evaluation = goal_function_iris(pipeline)
        if evaluation > current_best_value:
            current_best_pipeline = pipeline
            current_best_value = evaluation
    return current_best_pipeline, current_best_value

def fine_tune_estimator(estimator: Estimator, best_individuals):
    preprocessors_count = [{i:0 for i in range(-1, 3)} for i in range(3)]
    size = len(best_individuals)
    models_count = [0 for i in range(3)]
    for ind in best_individuals:
        pca_pos, normalizer_pos, standard_scaler_pos, model = ind
        preprocessors_count[0][pca_pos] += 1
        preprocessors_count[1][normalizer_pos] += 1
        preprocessors_count[2][standard_scaler_pos] += 1
        models_count[model-1]+=1
    
    for i in range(-1, 3):
        estimator.p_pca_pos[i] = preprocessors_count[0][i]/size
        estimator.p_normalizer_pos[i] = preprocessors_count[1][i]/size
        estimator.p_standard_scaler_pos[i] = preprocessors_count[2][i]/size
    
    for i in range(1, 4):
        estimator.p_model[i] = models_count[i-1]/size

def get_best_individuals(population: List[List], size):
    for i in range(len(population)):
        individual = population[i]
        pipeline = get_pipeline_by_individual(individual)
        fitness = goal_function_iris(pipeline)
        individual.insert(0, fitness)
    population.sort()
    result_population = []
    count = 0
    for ind in population:
        if count == size:
            break
        result_population.append([ind[1], ind[2], ind[3], ind[4]])
        count += 1
    return result_population

def generate_population(estimator, size=20):
    population = []
    for i in range(size):
        population.append(estimator.sample_individual())
    return population

def get_pipeline_by_individual(individual: list):
    pca_pos, normalizer_pos, standard_scaler_pos, model = individual
    pipeline = None
    if model == 1:
        pipeline = get_pipeline(pca_pos=pca_pos, normalizer_pos=normalizer_pos, standar_scaler_pos=standard_scaler_pos, use_knc=True)
    elif model == 2:
        pipeline = get_pipeline(pca_pos=pca_pos, normalizer_pos=normalizer_pos, standar_scaler_pos=standard_scaler_pos, use_rfc=True)
    elif model == 3:
        pipeline = get_pipeline(pca_pos=pca_pos, normalizer_pos=normalizer_pos, standar_scaler_pos=standard_scaler_pos, use_svc=True)

    return pipeline

print(umda_classification())

(Pipeline(steps=[('scaler', StandardScaler()), ('knc', KNeighborsClassifier())]), 1.0)
