In [1]:
import torch
from torch import nn, optim
import numpy as np
import pandas as pd
import collections
import statistics
import time
import csv
import os
import sys
import argparse
import psutil
from river import base, stats, utils, drift, metrics, preprocessing, datasets
from river.datasets import synth
from deep_river import classification

In [2]:
# --- 1. FUNÇÕES DE SUPORTE ---
def generate_rotation_matrix(n_features):
    random_matrix = torch.randn(n_features, n_features)
    q, _ = torch.linalg.qr(random_matrix)
    return q

def log_results_to_csv(filename, data_dict):
    file_exists = os.path.isfile(filename)
    with open(filename, mode='a', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=data_dict.keys())
        if not file_exists:
            writer.writeheader()
        writer.writerow(data_dict)
        
def fast_preprocess(X_batch, oh_encoder, scaler, n_feat):
    X_df = pd.DataFrame(X_batch)
    
    # 1. Separar Categóricas (OHE) apenas se existirem strings
    cat_cols = X_df.select_dtypes(include=['object', 'category']).columns
    num_cols = X_df.select_dtypes(include=[np.number]).columns
    
    if len(cat_cols) > 0:
        oh_encoder.learn_many(X_df[cat_cols])
        X_cat = oh_encoder.transform_many(X_df[cat_cols])
        X_combined = pd.concat([X_df[num_cols], X_cat], axis=1)
    else:
        X_combined = X_df[num_cols]
    
    # 2. Solução para o erro 'complex': Cálculo Manual do Scaler
    # O River StandardScaler acumula médias e variâncias. 
    # Para evitar o erro, usamos os valores acumulados mas tratamos a raiz quadrada.
    
    # Atualiza o scaler do river (para manter compatibilidade se precisar)
    scaler.learn_many(X_combined)
    
    # Extrai médias e variâncias acumuladas
    means = np.array([scaler.means.get(c, 0) for c in X_combined.columns])
    # PROTEÇÃO: clip(0) garante que variâncias negativas virem zero antes da raiz
    vars_ = np.array([scaler.vars.get(c, 0) for c in X_combined.columns])
    stds = np.sqrt(np.maximum(vars_, 0)) 
    
    # 3. Transformação Manual (NumPy Puro)
    X_raw = X_combined.values.astype(np.float64)
    X_sc = (X_raw - means) / (stds + 1e-8) # 1e-8 evita divisão por zero
    
    # 4. Padding Eficiente
    batch_size = len(X_batch)
    data_fix = np.zeros((batch_size, n_feat))
    cols_to_copy = min(X_sc.shape[1], n_feat)
    data_fix[:, :cols_to_copy] = X_sc[:, :cols_to_copy]
    
    # 5. Saída para Dicionário (Rápido)
    feature_names = [f"f{j}" for j in range(n_feat)]
    return [dict(zip(feature_names, row)) for row in data_fix]

In [3]:
# --- 2. CONFIGURAÇÃO DE DATASETS  ---
def get_dataset(name, seed, n_classes=2):
    """Retorna o stream e as dimensões esperadas pós-encoding."""
    if name.lower() == 'elec2':
        # Elec2 tem 8 features originais -> ~17 após OneHot das categorias
        return datasets.Elec2(), 17, 2
    elif name.lower() == 'agrawal':
        return synth.Agrawal(seed=seed), 10, 2
    elif name.lower() == 'sea':
        return synth.SEA(seed=seed), 3, 2
    elif name.lower() == 'sine':
        return synth.Sine(seed=seed), 3, 2
    elif name.lower() == 'hyperplane':
        return synth.Hyperplane(seed=seed, n_features=10), 10, 2
    elif name.lower() == 'randomtree':
        # Usando os parâmetros específicos fornecidos
        return synth.RandomTree(
            seed_tree=seed, 
            seed_sample=seed, 
            n_classes=n_classes, 
            n_num_features=5, 
            n_cat_features=5
        ), 15, n_classes # 5 num + 5 cat (que expandem no OHE)
    else:
        raise ValueError(f"Dataset {name} não configurado.")

In [4]:
# --- 3. CLASSE DAS REDES NEURAIS (Mix Heterogêneo) ---
class FlexibleNeuralNetwork(nn.Module):
    def __init__(self, n_features, n_classes, hidden_layers=[32], use_cnn=False, projection_matrix=None):
        super().__init__()
        self.n_features = n_features
        self.use_cnn = use_cnn
        if projection_matrix is not None:
            self.register_buffer('projection', projection_matrix.to(torch.float32))
        else:
            self.projection = None
        self.in_dim = (8 * n_features) if use_cnn else n_features
        if use_cnn:
            self.cnn_block = nn.Sequential(nn.Conv1d(1, 8, 3, padding=1), nn.ReLU(), nn.Flatten())
        else:
            self.cnn_block = None
        layers = []
        curr = self.in_dim
        for h in hidden_layers:
            layers.append(nn.Linear(curr, h))
            layers.append(nn.ReLU())
            curr = h
        layers.append(nn.Linear(curr, n_classes))
        self.mlp_head = nn.Sequential(*layers)

    def forward(self, x):
        if x.dim() == 1: x = x.unsqueeze(0)
        x = x.to(torch.float32)
        if x.shape[1] != self.n_features:
            tmp = torch.zeros((x.shape[0], self.n_features), device=x.device)
            tmp[:, :min(x.shape[1], self.n_features)] = x[:, :min(x.shape[1], self.n_features)]
            x = tmp
        if self.projection is not None: x = torch.matmul(x, self.projection)
        if self.use_cnn: x = self.cnn_block(x.unsqueeze(1))
        return self.mlp_head(x)

In [5]:
class FlexibleNN(nn.Module):
    def __init__(self, n_features, n_classes, hidden_layers=[10]):
        super(FlexibleNN, self).__init__()
        layers = []
        in_dim = n_features
        
        for h_dim in hidden_layers:
            layers.append(nn.Linear(in_dim, h_dim))
            layers.append(nn.ReLU())
            in_dim = h_dim
            
        layers.append(nn.Linear(in_dim, n_classes))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)

In [6]:
class ARTELight(base.Ensemble, base.Classifier):
    def __init__(self, models, drift_detector, lambda_val=10.0, seed=42):
        super().__init__(models=models)
        self.lambda_val = lambda_val
        self._rng = np.random.RandomState(seed)
        self.drift_detector = drift_detector
        self._detectors = [drift_detector.clone() for _ in range(len(models))]
        self._acc_windows = [utils.Rolling(stats.Mean(), window_size=100) for _ in range(len(models))]
        self._total_drifts = 0

    # def learn_one(self, x, y):
    #     any_drift = False
    #     for i, model in enumerate(self.models):
    #         # Usamos inference_mode para a predição interna de monitoramento
    #         with torch.inference_mode():
    #             y_pred = model.predict_one(x)
            
    #         correct = (y == y_pred)
    #         self._detectors[i].update(0 if correct else 1)
    #         self._acc_windows[i].update(1 if correct else 0)
            
    #         if not correct:
    #             k = self._rng.poisson(self.lambda_val)
    #             if k > 0:
    #                 # OTIMIZAÇÃO: Transfere para a GPU uma única vez em lote
    #                 x_df = pd.DataFrame([x] * k)
    #                 y_df = pd.Series([y] * k)
    #                 model.learn_many(x_df, y_df)

    #         if self._detectors[i].drift_detected:
    #             self._total_drifts += 1
    #             # Criamos um novo modelo limpo em vez de clonar o estado treinado
    #             self.models[i] = model.clone() 
    #             self._detectors[i] = self.drift_detector.clone()
    #             self._acc_windows[i] = utils.Rolling(stats.Mean(), window_size=100)
    #             any_drift = True
    #     return any_drift

    def learn_one(self, x, y):
        any_drift = False
        for i, model in enumerate(self.models):
            # Usar o predict_one do Classifier (já otimizado)
            y_pred = model.predict_one(x)
            
            correct = (y == y_pred)
            self._detectors[i].update(0 if correct else 1)
            self._acc_windows[i].update(1 if correct else 0)
            
            if not correct:
                k = self._rng.poisson(self.lambda_val)
                if k > 0:
                    # OTIMIZAÇÃO: Transfere para a GPU uma única vez em lote
                    x_df = pd.DataFrame([x] * k)
                    y_df = pd.Series([y] * k)
                    model.learn_many(x_df, y_df)

            if self._detectors[i].drift_detected:
                self._total_drifts += 1
                # IMPORTANTE: Re-instanciar limpa o estado que o clone/deepcopy mantém
                self.models[i] = model.clone() 
                self._detectors[i] = self.drift_detector.clone()
                self._acc_windows[i] = utils.Rolling(stats.Mean(), window_size=100)
                any_drift = True
        return any_drift

    @torch.inference_mode()
    def predict_proba_one(self, x):
        accs = [w.get() for w in self._acc_windows]
        avg = sum(accs)/len(accs) if accs else 0
        idx = [i for i, a in enumerate(accs) if a >= avg] or range(len(self.models))
        votes = collections.Counter()
        for i in idx:
            p = self.models[i].predict_proba_one(x)
            for c, v in p.items(): votes[c] += v / len(idx)
        return votes

    @property
    def total_drifts(self): 
        return self._total_drifts

In [7]:
# --- WRAPPER PARA DESBALANCEAMENTO (Replicando BinaryImbalancedGenerator.java) ---
class ImbalancedStream:
    def __init__(self, stream, ir=0.1, seed=42):
        self.stream = stream
        self.ir = ir # Ex: 0.05 para 5% de classe minoritária
        self.rng = np.random.RandomState(seed)
        self.iterator = iter(self.stream)

    def __iter__(self):
        return self

    def __next__(self):
        # Decide qual classe queremos com base na probabilidade IR
        target_class = 1 if self.rng.rand() < self.ir else 0
        
        # Consome o stream original até achar a classe desejada
        while True:
            try:
                x, y = next(self.iterator)
                y_val = int(y) if isinstance(y, (int, float, bool, np.bool_)) else int(y)
                if y_val == target_class:
                    return x, y_val
            except StopIteration:
                raise StopIteration

In [8]:
# --- 4. EXECUÇÃO DO EXPERIMENTO ---
def main(dataset='elec2', ir=None, seed=123456789, exp_type='binary', 
         n_models=30, n_classes=2, batch_size=32, n_instances=100000, lambda_val=6):
    
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    # 1. Obter Stream e Configuração (respeitando n_classes para sintéticos)
    stream_raw, n_feat, ds_classes = get_dataset(dataset, seed, n_classes)
    # Se o dataset real tiver número fixo de classes (como Elec2), usamos o do dataset
    final_n_classes = ds_classes if dataset.lower() != 'randomtree' else n_classes
    
    # 2. Aplicar Wrapper de Desbalanceamento (Protocolo Alberto Cano)
    # Se ir=None, o stream segue original (ex: Elec2)
    if ir is not None:
        print(f"Aplicando Static Imbalance Ratio: {ir*100}%")
        stream = ImbalancedStream(stream_raw, ir=ir, seed=seed)
    else:
        stream = stream_raw

    loss_f = nn.CrossEntropyLoss() # Definida uma única vez para todos
    # Mix de Arquiteturas Heterogêneas
    ensemble_list = []
    torch.manual_seed(seed)
    for i in range(n_models):
        if i < n_models//3:
            cfg = {"cnn": False, "opt": optim.SGD, "lr": 0.05, "layers": [128, 64], "proj": False}
        elif i < 2*n_models//3:
            cfg = {"cnn": True, "opt": optim.Adam, "lr": 0.01, "layers": [64], "proj": False}
        else:
            cfg = {"cnn": False, "opt": optim.Adam, "lr": 0.005, "layers": [256, 128], "proj": True}
        
        proj = torch.randn(n_feat, n_feat) if cfg["proj"] else None
        if proj is not None: proj, _ = torch.linalg.qr(proj)

        m = classification.Classifier(
            module=FlexibleNeuralNetwork(n_feat, final_n_classes, cfg["layers"], cfg["cnn"], proj),
            loss_fn=loss_f, 
            optimizer_fn=cfg["opt"], 
            lr=cfg["lr"], 
            device=device, 
            is_feature_incremental=False
        )
        ensemble_list.append(m)

    model_artelmb = ARTELight(ensemble_list, drift.ADWIN(delta=1e-3), lambda_val=lambda_val, seed=seed)
    # model_artelmb = ARTELightMB(ensemble_list, drift.ADWIN(delta=0.002), lambda_val=1.0, batch_limit=32, seed=seed)
    
    # Métricas do Survey [cite: 415, 469]
    metric_acc = metrics.Accuracy()
    metric_kappa = metrics.CohenKappa()
    metric_gmean = metrics.GeometricMean()
    
    oh_encoder = preprocessing.OneHotEncoder()
    scaler = preprocessing.StandardScaler()
    X_batch, y_batch, latencies = [], [], []
    # Organização de arquivos seguindo o padrão do repositório
    os.makedirs(f"results/{exp_type}", exist_ok=True)
    # ir_str = f"_ir{ir}" if ir else ""
    # output_file = f"results/{exp_type}/{dataset}{ir_str}_s{seed}_m{n_models}.csv"
    ir_suffix = f"_ir{str(ir).replace('.','')}" if ir is not None else ""
    output_file = f"results/{exp_type}/{dataset}{ir_suffix}_lamb{lambda_val}_s{seed}_m{n_models}.csv"
    
    print(f"Executando: {dataset.upper()} | Tipo: {exp_type} | Seed: {seed} | N_Feat: {n_feat}")

    for count, (x, y) in enumerate(stream, 1):

        # LIMITADOR DE INSTÂNCIAS
        if n_instances is not None and count > n_instances:
            print(f"Alcançado o limite de {n_instances} instâncias. Finalizando...")
            break
        
        # Conversão de label (funciona para binário e multiclasse)
        y_val = int(y) if isinstance(y, (int, float, bool)) else y
        X_batch.append(x); y_batch.append(1 if y else 0)
        
        if len(X_batch) == batch_size:
            start = time.perf_counter()

            # Preprocessing

            
            # X_df = pd.DataFrame(X_batch)
            # oh_encoder.learn_many(X_df)
            # X_oh = oh_encoder.transform_many(X_df).astype(np.float64)
            # if hasattr(X_oh, "sparse"): X_oh = X_oh.sparse.to_dense()
            # scaler.learn_many(X_oh)
            # X_sc = scaler.transform_many(X_oh)
            
            # # Padding dinâmico baseado no n_feat da configuração
            # data_fix = np.zeros((batch_size, n_feat))
            # cols = min(X_sc.shape[1], n_feat)
            # data_fix[:, :cols] = X_sc.iloc[:, :cols].values
            # X_dicts = pd.DataFrame(data_fix, columns=[f"f{j}" for j in range(n_feat)]).to_dict(orient='records')
            
            X_dicts = fast_preprocess(X_batch, oh_encoder, scaler, n_feat)
            
            dur_prep = (time.perf_counter() - start)
            
            # Test-then-Train
            any_drift = False
            for i in range(batch_size):
                t_pred = time.perf_counter()
                y_pred = model_artelmb.predict_one(X_dicts[i])
                dur_pred = (time.perf_counter() - t_pred)
                
                if y_pred is not None:
                    metric_acc.update(y_batch[i], y_pred)
                    metric_kappa.update(y_batch[i], y_pred)
                    metric_gmean.update(y_batch[i], y_pred)
                t_learn = time.perf_counter()
                model_artelmb.learn_one(X_dicts[i], y_batch[i])
                dur_learn = (time.perf_counter() - t_learn)
                    

            latencies.append((time.perf_counter() - start) * 1000)
            X_batch, y_batch = [], []

            if count % 2000 < batch_size:
                import gc
                gc.collect()
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()
                
                ram = psutil.Process().memory_info().rss / (1024 * 1024)
                vram = torch.cuda.memory_allocated() / (1024*1024) if torch.cuda.is_available() else 0
                avg_lat = sum(latencies[-20:])/20 / batch_size
                
                stats_dict = {
                    "Instancia": count, 
                    "Dataset": dataset, 
                    "Seed": seed,
                    "Accuracy": metric_acc.get(), 
                    "Kappa": metric_kappa.get(),
                    "GMean": metric_gmean.get(),
                    "Latencia_ms": avg_lat, 
                    "Drifts": model_artelmb.total_drifts,
                    "RAM_MB": ram, 
                    "VRAM_MB": vram,
                    "T_Prep": dur_prep, "T_Pred": dur_pred, "T_Learn": dur_learn
                }
                log_results_to_csv(output_file, stats_dict)
                print(f"Inst: {count} | Acc: {stats_dict['Accuracy']:.2%} | Kappa: {stats_dict['Kappa']:.2f} | Drifts: {stats_dict['Drifts']}")

In [9]:
def run_static_imbalance_suite(datasets_list=['agrawal', 'sea'], seeds=[123456789], ir_list=[0.1, 0.05, 0.01]):
    """
    Executa automaticamente os cenários de desbalanceamento estático.
    ir=0.1  -> 10% classe minoritária (Moderado)
    ir=0.05 -> 5%  classe minoritária (Severo)
    ir=0.01 -> 1%  classe minoritária (Extremo)
    """
    total_exps = len(datasets_list) * len(seeds) * len(ir_list)
    curr = 1
    
    print(f"Iniciando bateria de {total_exps} experimentos...")
    
    for ds in datasets_list:
        for seed in seeds:
            for ir in ir_list:
                print(f"\n[{curr}/{total_exps}] Processando: {ds} | IR: {ir*100}% | Seed: {seed}")
                try:
                    main(
                        dataset=ds,
                        ir=ir,
                        seed=seed,
                        exp_type='static_ir',
                        n_models=30,
                        batch_size=32
                    )
                except Exception as e:
                    print(f"Erro no experimento {ds} IR {ir}: {e}")
                curr += 1
    print("\nBateria de testes concluída com sucesso!")

In [10]:
def run_full_experiment_suite():
    # Configurações de Datasets Sintéticos (Protocolo Cano)
    sinteticos = ['agrawal', 'sea']
    lambdas = [6, 10]
    irs = [0.1, 0.01] # 10% e 1%
    seeds = [123456789]
    n_inst = 100000 # 100k é o padrão para estabilidade
    
    print("Iniciando Suíte de Experimentos Otimizada")

    for ds in sinteticos:
        for lamb in lambdas:
            for ir in irs:
                for s in seeds:
                    print(f"\n# Executando: {ds} | IR: {ir} | Lambda: {lamb} | Seed: {s}")
                    main(
                        dataset=ds,
                        ir=ir,
                        seed=s,
                        exp_type='static_ir',
                        n_models=30,
                        batch_size=32,
                        n_instances=n_inst,
                        lambda_val=lamb
                    )

    # Execução do Elec2 (Real) - Lambda 6 e 10 para comparação
    print("\n# Executando Dataset Real: Elec2")
    for lamb in lambdas:
        main(
            dataset='elec2',
            seed=123456789,
            exp_type='binary',
            n_models=30,
            batch_size=32,
            n_instances=None, # Roda o arquivo todo (45k)
            lambda_val=lamb
        )

In [11]:
if __name__ == "__main__":
    # Verifica se estamos no Jupyter ou Terminal
    # O Jupyter geralmente tem 'ipykernel_launcher' ou '-f' nos argumentos
    is_jupyter = any('ipykernel' in arg or '-f' in arg for arg in sys.argv)
    
    # Se rodado via linha de comando
    if not is_jupyter and len(sys.argv) > 1:
        parser = argparse.ArgumentParser()
        parser.add_argument('--dataset', type=str, default='elec2')
        parser.add_argument('--ir', type=float, default=None)
        parser.add_argument('--seed', type=int, default=123456789)
        parser.add_argument('--exp_type', type=str, default='binary')
        parser.add_argument('--n_models', type=int, default=30)
        parser.add_argument('--n_classes', type=int, default=2)
        parser.add_argument('--batch_size', type=int, default=32)
        parser.add_argument('--n_instances', type=int, default=100000)
        parser.add_argument('--lambda_val', type=int, default=6)
        args = parser.parse_args()
        
        main(
            dataset=args.dataset, ir=args.ir, seed=args.seed,
            exp_type=args.exp_type, n_models=args.n_models,
            n_classes=args.n_classes, batch_size=args.batch_size,
            n_instances=args.n_instances, lambda_val=args.lambda_val
        )
    else:
        # Execução via Notebook (parâmetros manuais abaixo)
        print("Ambiente Notebook detectado. Usando parâmetros padrão...")
        # main(dataset='agrawal', ir=0.1, seed=123456789, exp_type='static_ir')
        # main(dataset='agrawal', ir=0.01, seed=123456789, exp_type='static_ir')
        # main(
        #     dataset='elec2', 
        #     seed=123456789, 
        #     exp_type='binary', 
        #     batch_size=32,
        #     n_models=30, n_classes=2
        # )
        # Exemplo de execução para os dois principais geradores do Survey
        # run_static_imbalance_suite(
        #     datasets_list=['agrawal', 'sea', 'randomtree'], 
        #     seeds=[123456789], 
        #     ir_list=[0.1, 0.05, 0.01]
        # )
        run_full_experiment_suite()
        
        # main(dataset='elec2', seed=123456789, exp_type='binary', batch_size=32, n_models=30, n_classes=2)


Ambiente Notebook detectado. Usando parâmetros padrão...
Iniciando Suíte de Experimentos Otimizada

# Executando: agrawal | IR: 0.1 | Lambda: 6 | Seed: 123456789
Aplicando Static Imbalance Ratio: 10.0%
Executando: AGRAWAL | Tipo: static_ir | Seed: 123456789 | N_Feat: 10
Inst: 2016 | Acc: 96.18% | Kappa: 0.76 | Drifts: 1
Inst: 4000 | Acc: 96.28% | Kappa: 0.77 | Drifts: 19
Inst: 6016 | Acc: 96.78% | Kappa: 0.80 | Drifts: 38
Inst: 8000 | Acc: 96.67% | Kappa: 0.79 | Drifts: 58
Inst: 10016 | Acc: 96.60% | Kappa: 0.78 | Drifts: 81
Inst: 12000 | Acc: 96.55% | Kappa: 0.78 | Drifts: 95
Inst: 14016 | Acc: 96.53% | Kappa: 0.78 | Drifts: 110
Inst: 16000 | Acc: 96.51% | Kappa: 0.78 | Drifts: 133
Inst: 18016 | Acc: 96.44% | Kappa: 0.77 | Drifts: 158
Inst: 20000 | Acc: 96.37% | Kappa: 0.77 | Drifts: 184
Inst: 22016 | Acc: 96.31% | Kappa: 0.77 | Drifts: 201
Inst: 24000 | Acc: 96.17% | Kappa: 0.76 | Drifts: 221
Inst: 26016 | Acc: 96.13% | Kappa: 0.76 | Drifts: 239
Inst: 28000 | Acc: 96.13% | Kappa: 0.7