In [1]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [6]:
!pip install river

Collecting river
  Downloading river-0.21.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Downloading river-0.21.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m27.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: river
Successfully installed river-0.21.2


In [9]:
import pandas as pd
import numpy as np
from river import forest, metrics, drift, stats
import random
import hashlib
import pickle
from datetime import datetime
import os
import traceback
from typing import Optional, Dict, Any
from collections import Counter

# Definir sementes para reprodutibilidade
random.seed(42)
np.random.seed(42)

def convert_unix_timestamp(ts):
    """Converte timestamp Unix em milissegundos para datetime"""
    return pd.to_datetime(ts, unit='ms')

def consistent_hash(value):
    """Função de hash consistente usando SHA-256"""
    return int(hashlib.sha256(str(value).encode('utf-8')).hexdigest(), 16) % 1000

class OnlineNewsRecommender:
    def __init__(
        self,
        n_models=10,
        drift_detector=drift.ADWIN(),
        top_k=10
    ):
        # Inicializa um normalizador para cada feature numérica
        self.scalers = {
            'user_id_hash': stats.Mean(),
            'article_id_hash': stats.Mean(),
            'hour_sin': stats.Mean(),
            'hour_cos': stats.Mean(),
            'day_sin': stats.Mean(),
            'day_cos': stats.Mean(),
            'month_sin': stats.Mean(),
            'month_cos': stats.Mean(),
            'hour': stats.Mean(),
            'article_popularity': stats.Mean(),
            'user_activity': stats.Mean()
        }

        self.vars = {k: stats.Var() for k in self.scalers.keys()}

        # Modelo base com detector de drift
        self.model = forest.ARFClassifier(
            n_models=n_models,
            drift_detector=drift_detector,
            grace_period=50,
            max_features='sqrt',
            seed=42,
            leaf_prediction='nb'
        )

        # Métricas online
        self.metrics = {
            'accuracy': metrics.Accuracy(),
            'f1': metrics.F1(),
            'precision': metrics.Precision(),
            'recall': metrics.Recall(),
            'roc_auc': metrics.ROCAUC(),
            'log_loss': metrics.LogLoss()
        }

        # Contadores e estado
        self.article_counter = Counter()
        self.user_counter = Counter()
        self.current_top = set()
        self.top_k = top_k

    def _normalize_feature(self, name, value):
        """Normaliza uma feature usando média e variância online"""
        # Atualiza estatísticas
        self.scalers[name].update(value)
        self.vars[name].update(value)

        # Calcula z-score
        mean = self.scalers[name].get()
        var = self.vars[name].get()
        std = np.sqrt(var) if var > 0 else 1

        return (value - mean) / (std + 1e-8)

    def _extract_features(self, x):
        """
        Extrai as features do exemplo de entrada.

        Parameters:
        -----------
        x : dict
            Dicionário contendo os dados do exemplo atual.

        Returns:
        --------
        dict
            Dicionário com as features normalizadas.
        """
        timestamp = convert_unix_timestamp(x['click_timestamp'])

        # Features básicas
        features = {
            'user_id_hash': consistent_hash(x['user_id']),
            'article_id_hash': consistent_hash(x['click_article_id']),
            'hour_sin': np.sin(2 * np.pi * timestamp.hour / 24),
            'hour_cos': np.cos(2 * np.pi * timestamp.hour / 24),
            'day_sin': np.sin(2 * np.pi * timestamp.dayofweek / 7),
            'day_cos': np.cos(2 * np.pi * timestamp.dayofweek / 7),
            'month_sin': np.sin(2 * np.pi * timestamp.month / 12),
            'month_cos': np.cos(2 * np.pi * timestamp.month / 12),
            'hour': timestamp.hour,
        }

        # Calcula popularidade excluindo a interação atual
        article_count = self.article_counter.get(x['click_article_id'], 0)
        user_count = self.user_counter.get(x['user_id'], 0)

        total_interactions = sum(self.article_counter.values()) or 1
        total_users = sum(self.user_counter.values()) or 1

        features.update({
            'article_popularity': article_count / total_interactions,
            'user_activity': user_count / total_users,
        })

        # Normaliza features
        normalized_features = {
            name: self._normalize_feature(name, value)
            for name, value in features.items()
            if name in self.scalers
        }

        # Adiciona features categóricas sem normalização
        normalized_features['weekend'] = int(timestamp.dayofweek >= 5)

        return normalized_features

    def learn_one(self, x):
        """Aprende com um exemplo"""
        # Atualiza top articles antes de processar a interação atual para evitar vazamento
        if sum(self.article_counter.values()) % 100 == 0:
            self.current_top = set(
                article for article, _ in
                self.article_counter.most_common(self.top_k)
            )

        # Define target (1 se artigo está no top-k)
        target = int(x['click_article_id'] in self.current_top)

        # Extrai features
        features = self._extract_features(x)

        # Treina o modelo
        self.model.learn_one(features, target)

        # Atualiza contadores após definir o target
        self.article_counter[x['click_article_id']] += 1
        self.user_counter[x['user_id']] += 1

        return self

    def predict_one(self, x):
        """Faz predição para um exemplo"""
        features = self._extract_features(x)
        return self.model.predict_one(features)

    def predict_proba_one(self, x):
        """Retorna probabilidades para um exemplo"""
        features = self._extract_features(x)
        return self.model.predict_proba_one(features)

    def save(self, path):
        """Salva o modelo completo"""
        with open(path, 'wb') as f:
            pickle.dump(self, f)

    @classmethod
    def load(cls, path):
        """Carrega o modelo completo"""
        with open(path, 'rb') as f:
            return pickle.load(f)

class NewsRecommenderSystem:
    """Sistema para gerenciar treinamento e predição do recomendador de notícias."""

    def __init__(self, model_path=None, max_samples=None, timestamp=None, config=None):
        """
        Inicializa o sistema de recomendação.

        Parameters:
        -----------
        model_path : str, optional
            Caminho para o modelo existente.
        max_samples : int, optional
            Número máximo de amostras a serem processadas.
        timestamp : str, optional
            Timestamp para nomear arquivos.
        config : dict, optional
            Configurações do modelo.
        """
        self.model = None
        self.history = {
            'processed_records': 0,
            'start_time': datetime.now(),
            'metrics': {},
            'dataset_info': {},
            'model_info': {},
            'config': config or {}
        }
        self.max_samples = max_samples
        self.timestamp = timestamp or datetime.now().strftime("%Y%m%d_%H%M%S")

        if model_path:
            self._load_existing_model(model_path)
        else:
            self._initialize_new_model()

    def _load_existing_model(self, model_path):
        """Carrega modelo existente."""
        try:
            self.model = OnlineNewsRecommender.load(model_path)
            self.history['model_info'] = {
                'original_model_path': model_path,
                'load_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }
        except Exception as e:
            raise Exception(f"Erro ao carregar modelo: {str(e)}")

    def _initialize_new_model(self):
        """Inicializa novo modelo."""
        n_models = self.history['config'].get('n_models', 15)
        delta = self.history['config'].get('delta', 0.001)
        top_k = self.history['config'].get('top_k', 10)

        drift_detector = drift.ADWIN(delta=delta)

        self.model = OnlineNewsRecommender(
            n_models=n_models,
            drift_detector=drift_detector,
            top_k=top_k
        )
        # Salva as configurações do classificador
        self.history['model_info'] = {
            'initialization_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            'model_type': 'OnlineNewsRecommender',
            'n_models': n_models,
            'drift_detector': type(drift_detector).__name__,
            'drift_detector_params': drift_detector.__dict__,
            'top_k': top_k,
            'model_params': {
                'n_models': n_models,
                'grace_period': self.model.model.grace_period,
                'max_features': self.model.model.max_features,
                'seed': self.model.model.seed,
                'leaf_prediction': self.model.model.leaf_prediction,
            }
        }

    def validate_dataset(self, dataset: pd.DataFrame) -> bool:
        """
        Valida se o dataset tem as colunas necessárias.

        Parameters:
        -----------
        dataset : pd.DataFrame
            O dataset a ser validado.

        Returns:
        --------
        bool
            True se o dataset é válido, levanta ValueError caso contrário.
        """
        required_columns = ['user_id', 'click_article_id', 'click_timestamp']
        missing_columns = [col for col in required_columns if col not in dataset.columns]

        if missing_columns:
            raise ValueError(f"Colunas faltando no dataset: {missing_columns}")

        return True

    def _update_final_metrics(self):
        """
        Atualiza métricas finais no histórico com informações adicionais.
        """
        n_drifts = sum(self.model.model[n].drift_detector.n_detections
                       for n in range(self.model.model.n_models))
        n_warnings = sum(self.model.model[n].drift_detector.n_warnings
                         for n in range(self.model.model.n_models))
        top_articles = dict(self.model.article_counter.most_common(10))

        self.history['metrics'] = {
            'final_accuracy': self.model.metrics['accuracy'].get(),
            'final_f1': self.model.metrics['f1'].get(),
            'final_precision': self.model.metrics['precision'].get(),
            'final_recall': self.model.metrics['recall'].get(),
            'roc_auc': self.model.metrics['roc_auc'].get(),
            'log_loss': self.model.metrics['log_loss'].get(),
            'processed_records': self.history['processed_records'],
            'processing_time': str(datetime.now() - self.history['start_time']),
            'n_drifts': n_drifts,
            'n_warnings': n_warnings,
            'top_articles': str(top_articles),
            'max_samples': self.max_samples
        }

    def analyze_samples(self, dataset):
        """
        Analisa e mostra informações sobre as amostras do dataset com validações adicionais.

        Parameters:
        -----------
        dataset : pd.DataFrame
            O dataset a ser analisado.

        Returns:
        --------
        pd.DataFrame
            O dataset possivelmente limpo.
        """
        try:
            print("\n=== Análise das Amostras ===")

            # Verifica duplicações
            duplicates = dataset.duplicated().sum()
            if duplicates > 0:
                print(f"Aviso: Encontradas {duplicates:,} linhas duplicadas")
                print("Removendo duplicatas...")
                dataset = dataset.drop_duplicates()

            # Contagem de registros
            total_samples = len(dataset)
            total_interactions = dataset.groupby(['user_id', 'click_article_id', 'click_timestamp']).size().sum()

            if total_samples != total_interactions:
                print(f"Aviso: Possível inconsistência na contagem de amostras")
                print(f"Total de linhas: {total_samples:,}")
                print(f"Total de interações únicas: {total_interactions:,}")

            print(f"Total de amostras: {total_interactions:,}")
            print(f"Período: {dataset['timestamp'].min()} até {dataset['timestamp'].max()}")
            print(f"Usuários únicos: {dataset['user_id'].nunique():,}")
            print(f"Artigos únicos: {dataset['click_article_id'].nunique():,}")

            # Verificar integridade dos dados
            print("\nVerificação de integridade:")
            null_counts = dataset.isnull().sum()
            if null_counts.any():
                print("Valores nulos encontrados:")
                print(null_counts[null_counts > 0])
                # Tratar ou remover valores nulos conforme apropriado
                dataset = dataset.dropna()

            # Análise de timestamps
            invalid_timestamps = pd.to_datetime(dataset['click_timestamp'], unit='ms', errors='coerce').isnull().sum()
            if invalid_timestamps > 0:
                print(f"\nAviso: {invalid_timestamps} timestamps inválidos encontrados")
                # Tratar timestamps inválidos
                dataset = dataset.dropna(subset=['click_timestamp'])

            # Análises adicionais podem ser adicionadas aqui...

            return dataset  # Retorna dataset possivelmente limpo

        except Exception as e:
            print(f"Erro analisando amostras: {str(e)}")
            traceback.print_exc()
            return dataset

    def train_and_predict(self, dataset, train_size=100000, output_model_path=None):
        """
        Treina o modelo com uma quantidade definida de amostras e realiza predições em novas amostras.

        Parameters:
        -----------
        dataset : pd.DataFrame
            O dataset a ser processado.
        train_size : int
            Número de amostras para treinamento.
        output_model_path : str, optional
            Caminho para salvar o modelo atualizado.

        Returns:
        --------
        dict
            O histórico do processamento.
        """
        try:
            self.validate_dataset(dataset)

            # Análise das amostras
            print("\nAnalisando amostras...")
            dataset['timestamp'] = pd.to_datetime(
                dataset['click_timestamp'],
                unit='ms',
                errors='coerce'
            )
            dataset = self.analyze_samples(dataset)

            # Limita o número de amostras se max_samples for definido
            if self.max_samples is not None:
                dataset = dataset.head(self.max_samples)

            self.history['dataset_info'] = {
                'total_records': len(dataset),
                'unique_users': dataset['user_id'].nunique(),
                'unique_articles': dataset['click_article_id'].nunique(),
                'start_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                'train_size': train_size,
                'test_size': len(dataset) - train_size,
                'max_samples': self.max_samples
            }

            # Ordena o dataset pelo timestamp
            dataset = dataset.sort_values('timestamp').reset_index(drop=True)

            total_samples = len(dataset)

            if train_size >= total_samples:
                raise ValueError("train_size maior ou igual ao número total de amostras disponíveis.")

            # Dividir dataset em treinamento e teste
            training_data = dataset.iloc[:train_size]
            testing_data = dataset.iloc[train_size:]

            # Treinamento
            print(f"\nTreinando com {len(training_data)} amostras")
            for _, row in training_data.iterrows():
                self.model.learn_one(row.to_dict())
                self.history['processed_records'] += 1

            # Predição
            print(f"\nRealizando predições em {len(testing_data)} amostras")
            predictions = []
            for _, row in testing_data.iterrows():
                x = row.to_dict()
                # Faz predição
                pred = self.model.predict_one(x)
                # Define target
                target = int(x['click_article_id'] in self.model.current_top)
                # Atualiza métricas
                if pred is not None:
                    for metric in self.model.metrics.values():
                        metric.update(target, pred)
                # Armazena predições
                predictions.append({
                    'user_id': x['user_id'],
                    'click_article_id': x['click_article_id'],
                    'prediction': pred,
                    'target': target
                })

            # Salva todas as predições em um único arquivo
            predictions_df = pd.DataFrame(predictions)
            predictions_filename = f'predictions_{self.timestamp}.csv'
            predictions_path = os.path.join('/content/drive/MyDrive/Pesquisa2024/', predictions_filename)
            predictions_df.to_csv(predictions_path, index=False)
            print(f"Predições salvas em '{predictions_path}'")

            # Atualiza o histórico com o caminho das predições
            self.history['predictions_file'] = predictions_path

            if output_model_path:
                self.save_model(output_model_path)

            self._update_final_metrics()
            return self.history

        except Exception as e:
            print(f"Erro fatal processando dataset: {str(e)}")
            traceback.print_exc()
            return None

    def save_model(self, path):
        """Salva o modelo."""
        self.model.save(path)
        self.history['model_info']['final_model_path'] = path
        self.history['model_info']['save_time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def save_history(self):
        """
        Salva o histórico do processamento em um arquivo CSV.
        """
        try:
            # Prepara dados para o CSV
            history_data = {
                'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                'dataset_size': self.history['dataset_info'].get('total_records'),
                'unique_users': self.history['dataset_info'].get('unique_users'),
                'unique_articles': self.history['dataset_info'].get('unique_articles'),
                'train_size': self.history['dataset_info'].get('train_size'),
                'test_size': self.history['dataset_info'].get('test_size'),
                'processed_records': self.history['metrics'].get('processed_records'),
                'max_samples': self.history['dataset_info'].get('max_samples'),
                'accuracy': self.history['metrics'].get('final_accuracy'),
                'f1_score': self.history['metrics'].get('final_f1'),
                'precision': self.history['metrics'].get('final_precision'),
                'recall': self.history['metrics'].get('final_recall'),
                'roc_auc': self.history['metrics'].get('roc_auc'),
                'log_loss': self.history['metrics'].get('log_loss'),
                'processing_time': self.history['metrics'].get('processing_time'),
                'n_drifts': self.history['metrics'].get('n_drifts'),
                'n_warnings': self.history['metrics'].get('n_warnings'),
                'top_articles': self.history['metrics'].get('top_articles'),
                'predictions_file': self.history.get('predictions_file'),
                'model_path': self.history['model_info'].get('final_model_path'),
                'original_model': self.history['model_info'].get('original_model_path'),
                # Inclui as configurações do modelo
                'model_type': self.history['model_info'].get('model_type'),
                'n_models': self.history['model_info'].get('n_models'),
                'delta': self.history['config'].get('delta'),
                'drift_detector': self.history['model_info'].get('drift_detector'),
                'drift_detector_params': str(self.history['model_info'].get('drift_detector_params')),
                'top_k': self.history['model_info'].get('top_k'),
                'model_params': str(self.history['model_info'].get('model_params'))
            }

            history_df = pd.DataFrame([history_data])

            # Define arquivo apropriado
            base_path = '/content/drive/MyDrive/Pesquisa2024/'
            filename = 'training_history.csv'
            filepath = os.path.join(base_path, filename)

            # Garante que o diretório existe
            os.makedirs(os.path.dirname(filepath), exist_ok=True)

            # Verifica se arquivo existe
            if os.path.exists(filepath):
                existing_df = pd.read_csv(filepath)
                history_df = pd.concat([existing_df, history_df], ignore_index=True)

            history_df.to_csv(filepath, index=False)
            print(f"Histórico salvo em {filepath}")

        except Exception as e:
            print(f"Erro salvando histórico: {str(e)}")
            traceback.print_exc()

def main(input_model: Optional[str] = None,
         dataset_path: Optional[str] = None,
         output_model: Optional[str] = None,
         max_samples: Optional[int] = None,
         train_size: int = 100000,
         config: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
    """
    Função principal para executar o sistema de recomendação.

    Parameters:
    -----------
    input_model : str, optional
        Caminho do modelo existente.
    dataset_path : str
        Caminho do arquivo CSV com os dados.
    output_model : str, optional
        Caminho onde salvar o modelo atualizado.
    max_samples : int, optional
        Número máximo de amostras a serem processadas.
    train_size : int, optional
        Número de amostras para treinamento.
    config : dict, optional
        Configurações do modelo.

    Returns:
    --------
    dict
        Histórico do processamento ou None em caso de erro.
    """
    try:
        if not dataset_path:
            raise ValueError("dataset_path é obrigatório")

        # Diretórios
        base_dir = '/content/drive/MyDrive/Pesquisa2024/'
        models_dir = os.path.join(base_dir, 'models')
        os.makedirs(models_dir, exist_ok=True)

        # Nome do modelo com timestamp
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_model_name = f'news_recommender_{timestamp}.pkl'
        output_model_path = output_model or os.path.join(models_dir, output_model_name)

        # Inicializa sistema com as configurações
        system = NewsRecommenderSystem(
            model_path=input_model,
            max_samples=max_samples,
            timestamp=timestamp,
            config=config
        )

        # Carrega dataset
        print(f"Carregando dataset de {dataset_path}...")
        dataset = pd.read_csv(dataset_path)

        # Processa dataset (treinamento e predição)
        history = system.train_and_predict(
            dataset=dataset,
            train_size=train_size,
            output_model_path=output_model_path
        )

        # Salva histórico
        system.save_history()

        return history

    except Exception as e:
        print(f"Erro na execução principal: {str(e)}")
        traceback.print_exc()
        return None

if __name__ == "__main__":

    # Diretórios
    base_dir = '/content/drive/MyDrive/Pesquisa2024/'
    models_dir = os.path.join(base_dir, 'models')
    os.makedirs(models_dir, exist_ok=True)

    # Nome do modelo com timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_model_name = f'news_recommender_{timestamp}.pkl'
    output_model_path = os.path.join(models_dir, output_model_name)

    # Configuração
    config = {
        'n_models': 15,
        'delta': 0.001,
        'top_k': 10
    }

    history = main(
        dataset_path='/content/drive/MyDrive/Pesquisa2024/dataset_interacoes.csv',
        max_samples=1500000,  # Controle de quantidade de amostras
        train_size=100000,     # Número de amostras para treinamento
        config=config
    )

    if history:
        print("\nProcessamento concluído com sucesso!")

        # Mostra estatísticas finais
        print("\n=== Estatísticas Finais ===")
        print(f"Total de registros processados: {history['metrics']['processed_records']:,}")
        print(f"Tempo total de processamento: {history['metrics']['processing_time']}")

        if history['metrics'].get('n_drifts'):
            print(f"Drifts detectados: {history['metrics']['n_drifts']}")

        # Mostra métricas finais
        print("\nMétricas Finais:")
        print(f"Acurácia: {history['metrics']['final_accuracy']:.4f}")
        print(f"F1 Score: {history['metrics']['final_f1']:.4f}")
        print(f"Precisão: {history['metrics']['final_precision']:.4f}")
        print(f"Recall: {history['metrics']['final_recall']:.4f}")
        print(f"ROC AUC: {history['metrics']['roc_auc']:.4f}")
        print(f"Log Loss: {history['metrics']['log_loss']:.4f}")

        # Mostra top artigos se disponível
        if history['metrics'].get('top_articles'):
            print("\nTop Artigos:")
            top_articles = eval(history['metrics']['top_articles'])
            for article, count in top_articles.items():
                print(f"Artigo {article}: {count:,} interações")

Carregando dataset de /content/drive/MyDrive/Pesquisa2024/dataset_interacoes.csv...

Analisando amostras...

=== Análise das Amostras ===
Aviso: Encontradas 153,943 linhas duplicadas
Removendo duplicatas...
Aviso: Possível inconsistência na contagem de amostras
Total de linhas: 2,988,182
Total de interações únicas: 2,988,181
Total de amostras: 2,988,181
Período: 2017-10-01 03:00:00.026000 até 2017-11-13 20:04:14.886000
Usuários únicos: 322,897
Artigos únicos: 46,034

Verificação de integridade:
Valores nulos encontrados:
click_timestamp    1
timestamp          1
dtype: int64

Treinando com 500000 amostras


KeyboardInterrupt: 