# River

To use river in Google Colab execute the cell and restart the runtime environment.

Entorno de ejecución > Reinicar entorno de ejecución

The Google Colab environment already includes most of the used libraries, but river has to be installed manually.

In [None]:
!pip install river

# Libraries

The src folder can be imported, or the code for each class can be copied into the notebook

In [None]:
import numpy as np


class OnlineBagging:
    def __init__(
        self,
        base_classifier_class,
        n_classifiers: int = 25,
        lambda_diversity: float = 1,
        p_classifiers: dict = {},
        class_list: list = [0, 1],
    ):
        """Ensemble with online bagging, it uses lambda_diversity to control its diveristy

        base_classifier_class: classifier class that implements the BaseClassifier interface
        n_classifiers: number of classifiers
        lambda_diversity: paremeter for forcing diversity when learning (higher means lower diversity)
        p_classifiers: dictionary with parameters for the classifiers initialization
        list_classes: list of classes to predict
        """
        self.base_classifier_class = base_classifier_class
        self.lambda_diversity = lambda_diversity
        self.n_classifiers = n_classifiers
        self.p_classifiers = p_classifiers
        self.classifier_list = [self.base_classifier_class(**self.p_classifiers) for _ in range(self.n_classifiers)]
        self.class_list = class_list

    def predict_one(self, x: dict):
        # Get all base classifiers predictions at once
        pred_arr = np.array([clf.predict_one(x) for clf in self.classifier_list])
        # Remove None predictions as they will crash numpy
        pred_arr = pred_arr[pred_arr != np.array(None)]
        # If all are None, return the first class
        if pred_arr.size == 0:
            return self.class_list[0]
        # Return most common value
        values, counts = np.unique(pred_arr, return_counts=True)
        return values[counts.argmax()]

    def predict_proba_one(self, x: dict):
        y_pred_proba = {}
        for clf in self.classifier_list:
            pred = clf.predict_proba_one(x)
            for key, value in pred.items():
                y_pred_proba[key] = y_pred_proba.get(key, 0) + value / self.n_classifiers
        return y_pred_proba

    def learn_one(self, x: dict, y):
        # For each base classifier draw k from the poisson distribution and learn k times
        for i in range(self.n_classifiers):
            k = np.random.poisson(self.lambda_diversity)
            for _ in range(k):
                self.classifier_list[i].learn_one(x, y)

    def reset(self):
        self.classifier_list = [self.base_classifier_class(**self.p_classifiers) for _ in range(self.n_classifiers)]


class ConvexMetrics:
    def __init__(self, gamma: float = 0.9, mu: float = 1, lambda_error: float = 0.05, store_metrics: bool = False):
        # For the first use, first combine and then update
        # By default 1 is low div (fast), 2 is high div (slow)
        # Convex Combination parameters
        self.gamma = gamma  # forgetting factor
        self.mu = mu  # step size
        self.lambda_error = lambda_error
        self.store_metrics = store_metrics

        # Initial values
        self.B = 0.5
        self.a = 0.0
        self.p = 0.5

        if self.store_metrics:
            # Store metrics
            self.B_list = []
            self.a_list = []
            self.p_list = []

    def combine(self, y1: float, y2: float):
        y = self.B * y1 + (1 - self.B) * y2
        return y

    def update(self, e: float, e1: float, e2: float):
        self.p = self.gamma * self.p + (1 - self.gamma) * np.power((e2 - e1), 2)
        # Get new a from updated p and e
        # Multiplying by 0.99 (or gamma) was added later to avoid a scaling to infinity
        self.a = 0.99 * self.a + (self.mu / self.p) * e * (e2 - e1) * self.lambda_error
        # Use new a to get new B which will be used for next prediction
        self.B = 1 / (1 + np.exp(-self.a))

        if self.store_metrics:
            # Store metrics
            self.B_list.append(self.B)
            self.a_list.append(self.a)
            self.p_list.append(self.p)


class ConvexCombination:
    def __init__(
        self,
        fast_learner,
        slow_learner,
        p_convex: dict = {},
        use_binary_error: bool = False,
        class_list: list = [0, 1],
    ):
        """
        Wrapper for two river learners (fast and slow) to use convex combination.
        Note there is no drift detection.

        fast_learner: fast learner
        slow_learner: slow learner
        p_convex = parameters for convex combination
        """
        # By default 1 is low div (fast), 2 is high div (slow)
        self.fast_learner = fast_learner
        self.slow_learner = slow_learner
        self.p_convex = p_convex
        self.use_binary_error = use_binary_error # Quizas actualizar cada 10 samples?
        self.class_list = class_list

        self.convex = ConvexMetrics(**self.p_convex)

    def predict_one(self, x: dict, include_individual: bool = False):
        if include_individual:
            y_pred_proba, y_fast_proba, y_slow_proba = self.predict_proba_one(x, include_individual=True)
            return (
                max(y_pred_proba, key=lambda k: y_pred_proba[k]),
                max(y_fast_proba, key=lambda k: y_fast_proba[k]),
                max(y_slow_proba, key=lambda k: y_slow_proba[k]),
            )
        else:
            y_pred_proba = self.predict_proba_one(x)
            return max(y_pred_proba, key=lambda k: y_pred_proba[k])

    def predict_proba_one(self, x: dict, include_individual: bool = False) -> dict:
        # Obtain the predictions
        y_fast_proba = self.fast_learner.predict_proba_one(x)
        y_slow_proba = self.slow_learner.predict_proba_one(x)
        # Obtain the weighted probability of every label
        y_pred_proba = {}
        for label in self.class_list:
            y_pred_proba[label] = self.convex.combine(y_fast_proba.get(label, 0), y_slow_proba.get(label, 0))

        if include_individual:
            y_fast_parsed = {}
            y_slow_parsed = {}
            for label in self.class_list:
                y_fast_parsed[label] = y_fast_proba.get(label, 0)
                y_slow_parsed[label] = y_slow_proba.get(label, 0)
            return y_pred_proba, y_fast_parsed, y_slow_parsed

        return y_pred_proba

    def learn_one(self, x: dict, y_true):
        # Update convex combination
        if self.use_binary_error:
            y_pred, y_fast, y_slow = self.predict_proba_one(x, include_individual=True)
            e = 0 if y_pred == y_true else 1
            e1 = 0 if y_fast == y_true else 1
            e2 = 0 if y_slow == y_true else 1
        else:
            y_pred, y_fast, y_slow = self.predict_proba_one(x, include_individual=True)
            e = 1 - y_pred.get(y_true, 0)
            e1 = 1 - y_fast.get(y_true, 0)
            e2 = 1 - y_slow.get(y_true, 0)
        self.convex.update(e, e1, e2)

        # Update ensembles
        self.fast_learner.learn_one(x, y_true)
        self.slow_learner.learn_one(x, y_true)


class DriftDetectorWrapper:
    def __init__(self, classifier, drift_detector, train_in_background: bool = True):
        """Wrapper for an OnlineBagging classifier with a drift detector (DDM or EDDM).

        classifier: classifier instance that implements the BaseClassifier interface and has a reset method. Usually OnlineBagging.
        drift_detector: drift detector instance that implements the DriftDetector interface.
        """
        self.classifier = classifier
        self.drift_detector = drift_detector
        self.train_in_background = train_in_background

        self.in_warning = False
        self.X_warning = []
        self.y_warning = []

    def predict_one(self, x: dict):
        return self.classifier.predict_one(x)

    def predict_proba_one(self, x: dict):
        return self.classifier.predict_proba_one(x)

    def learn_one(self, x: dict, y_true, y_pred=None):
        if y_pred is None:
            y_pred = self.predict_one(x)

        if y_true == y_pred:
            prediction = 0
        else:
            prediction = 1
        self.drift_detector.update(prediction)

        # Check if drift was detected
        detected = ""
        if self.drift_detector.drift_detected:
            self.reset()
            detected = "drift"

        if self.train_in_background:
            # If entering warning zone, trigger warning flag
            if self.drift_detector.warning_detected and not self.in_warning:
                self.in_warning = True
                detected = "warning"

            if self.in_warning:
                # Check if we have exited warning zone
                if not self.drift_detector.warning_detected:
                    self.X_warning = []
                    self.y_warning = []
                    detected = "exited warning"
                else:
                    self.X_warning.append(x)
                    self.y_warning.append(y_true)

        # Learn instance and return true if drift was detected
        self.classifier.learn_one(x, y_true)
        return detected

    def reset(self):
        self.in_warning = False
        self.classifier.reset()
        for i in range(len(self.X_warning)):
            self.classifier.learn_one(self.X_warning[i], self.y_warning[i])
        self.X_warning = []
        self.y_warning = []


# Preparation

## Dataset

In [None]:
# Sine high speed high severity
from river.datasets import synth
from itertools import chain

class_list = [0, 1]
n_samples = 10000

stream1 = synth.Sine(classification_function=2, balance_classes=True, has_noise=True)
stream2 = synth.Sine(classification_function=3, balance_classes=True, has_noise=True)
stream3 = synth.Sine(classification_function=2, balance_classes=True, has_noise=True)

dataset = chain(stream1.take(3000), stream2.take(3000), stream3.take(4000))

## Models

In [None]:
# We create a function as a constructor for the logistic regression model
from river import preprocessing, optim, linear_model


def get_logis_reg():
    return preprocessing.StandardScaler() | linear_model.LogisticRegression(
        optimizer=optim.SGD(0.1)
    )

In [None]:
from river import tree, drift

p_ht = {"delta": 1e-07, "max_depth": 25}
p_drift_detector = {"alpha": 0.90, "beta": 0.85}
p_convex = {"gamma": 0.9, "mu": 1, "lambda_error": 0.95, "store_metrics": True}

p_slow_ensemble = {
    "base_classifier_class": tree.HoeffdingTreeClassifier,
    "n_classifiers": 10,
    "lambda_diversity": 1,
    "p_classifiers": p_ht,
    "class_list": class_list,
}
p_fast_ensemble = {
    "base_classifier_class": get_logis_reg,
    "n_classifiers": 10,
    "lambda_diversity": 1,
    "p_classifiers": {},
    "class_list": class_list,
}
slow_learner = DriftDetectorWrapper(
    OnlineBagging(**p_slow_ensemble),
    drift.binary.EDDM(**p_drift_detector),
    train_in_background=False,
)
fast_learner = OnlineBagging(**p_fast_ensemble)
convex_model = ConvexCombination(
    fast_learner, slow_learner, p_convex=p_convex, class_list=class_list
)

n_models = 3

# Execution

## Main loop

In [None]:
# Lists for storing predictions convex
y_true_list = []
y_pred_list = []

for i in range(n_models):
    y_pred_list.append([])

In [None]:
# Generic loop for convex storing predictions
for idx, (x, y) in enumerate(dataset):
    # Predict and update the model
    y_convex, y_fast, y_slow = convex_model.predict_one(x, include_individual=True)
    convex_model.learn_one(x, y)

    y_true_list.append(y)
    y_pred_list[0].append(y_convex)
    y_pred_list[1].append(y_fast)
    y_pred_list[2].append(y_slow)

## Metrics

In [None]:
# Metrics
from river import metrics, utils


metrics_list = []
metrics_cum_list = []

for i in range(n_models):
    model_metrics = [utils.Rolling(metrics.Accuracy(), 250), utils.Rolling(metrics.Accuracy(), 1000)]
    n_metrics = len(model_metrics)

    metrics_list.append(model_metrics)
    metrics_cum_list.append([[] for _ in range(n_metrics)])

In [None]:
# Convert predictions to metrics
for i in range(n_models):
    for idx in range(len(y_true_list)):
        y = y_true_list[idx]
        y_pred = y_pred_list[i][idx]
        # Update the metrics
        for j in range(n_metrics):
            try:
                metrics_list[i][j].update(y, y_pred)
            except:
                metrics_list[i][j].update(y, class_list[0])
                print("ERROR")
            metrics_cum_list[i][j].append(metrics_list[i][j].get())

# Plotting

In [None]:
from datetime import datetime
import matplotlib.pyplot as plt

metric_n = 1

fig, ax = plt.subplots()
ax.plot(range(1, n_samples + 1), metrics_cum_list[1][metric_n], "-", color="tab:orange", label=f"Fast")
ax.plot(range(1, n_samples + 1), metrics_cum_list[2][metric_n], "-", color="tab:blue", label=f"Slow")
ax.plot(range(1, n_samples + 1), metrics_cum_list[0][metric_n], ":", color="k", label=f"Convex")
ax.set_xlim(0, n_samples)
ax.set_ylim(0.5, 1)
ax.set_xlabel("Time step")
ax.set_ylabel("Accuracy")
#ax.set_title("Graph title")
ax.legend(loc="lower left")
ax.grid()
print(f"Executed {datetime.now()}")