## Testes usando o pipeline de classificador
- Em conjunto com a API

## 1 - Lendo dados do CSV

In [None]:
import pandas as pd

In [None]:
df_operacao = pd.read_csv("dados_operacao.csv")
df_metanol = pd.read_csv("dados_metanol.csv")

In [None]:
df_operacao.columns, df_metanol.columns

In [None]:
df_operacao = df_operacao[["Conteúdo", "Link"]].rename(columns={"Conteúdo": "conteudo", "Link": "link"})
df_metanol = df_metanol[["conteudo", "link"]]

In [None]:
df_operacao.columns, df_metanol.columns

In [None]:
df = df_operacao + df_metanol
df.head()

## 2 - Injetando dados do dataframe na API

In [None]:
url = "http://localhost:8000/api"

In [None]:
import aiohttp
from tqdm.asyncio import tqdm

In [None]:
async def enviar(LIMITE:int=None):
    async with aiohttp.ClientSession() as session:
        if LIMITE:
            for tweet in tqdm(
                df.head(LIMITE).itertuples(),
                total=LIMITE,
                desc=f"Enviando tweets do csv com limite {LIMITE}"
            ):
                res=await session.post(
                    f"{url}/enqueue",
                    json={"msg":tweet.conteudo}
                )
                res.raise_for_status()
        else:
            for tweet in tqdm(
                df.itertuples(),
                total=len(df),
                desc="Enviando tweets do csv"
            ):
                res=await session.post(
                    f"{url}/enqueue",
                    json={"msg":tweet.conteudo}
                )
                res.raise_for_status()
        

In [None]:
await enviar(2000)
# await enviar(1000)

## 3 - Consumindo dados com o classificador

In [None]:
from app.processor.classifier import BERTClassifier
import asyncio

In [None]:
async def classificar(WORKERS: int = None, BATCH_SIZE: int = None, LIMITE: int = None):
    bert = None
    
    try:
        classifier_kwargs = {}
        if WORKERS is not None:
            classifier_kwargs['num_workers'] = WORKERS
        if BATCH_SIZE is not None:
            classifier_kwargs['batch_size'] = BATCH_SIZE
        
        bert = BERTClassifier(**classifier_kwargs)
        await bert.initialize()

        async def stop():
            if LIMITE:
                await asyncio.sleep(LIMITE)
            else:
                while bert._running:
                    await asyncio.sleep(1)
            await bert.stop()

        await asyncio.gather(
            bert.start_consuming(),
            stop()
        )
    except Exception as e:
        raise
    finally:
        if bert:
            try:
                await bert.stop()
            except:
                pass

#### O classificador é configurável

In [None]:
await classificar(WORKERS=8, BATCH_SIZE=12)

## 4 - Gerador de métricas consumindo do 'dequeue' da API

In [None]:
import json

class ClassifierMonitor:
    """Monitora e visualiza resultados do classificador"""
    
    def __init__(self, api_url="http://localhost:8000/api"):
        self.api_url = api_url
        self.results = []
    
    async def fetch_message_status(self, session, msg_id):
        """Busca o status de uma mensagem específica"""
        try:
            async with session.get(f"{self.api_url}/dequeue/{msg_id}") as response:
                if response.status == 200:
                    return await response.json()
                return None
        except Exception as e:
            return {"msg_id": msg_id, "error": str(e)}
    
    async def fetch_queue_messages(self, redis_client, queue_name, limit=100):
        """Busca mensagens de uma fila específica"""
        try:
            messages = await redis_client.lrange(queue_name, 0, limit - 1)
            return [json.loads(msg) for msg in messages]
        except Exception as e:
            print(f"Erro ao buscar da fila {queue_name}: {e}")
            return []
    
    async def collect_results(self, limit=None):
        """Coleta todos os resultados classificados da fila de saída"""
        from app.redis import get_async_client
        
        redis_client = await get_async_client()
        
        try:
            classified = await self.fetch_queue_messages(
                redis_client, 
                "norm_queue_out", 
                limit or -1
            )
            
            errors = await self.fetch_queue_messages(
                redis_client, 
                "norm_queue_errors", 
                limit or -1
            )
            
            self.results = classified + errors
            
        finally:
            await redis_client.close()
        
        return self.results
    
    def create_summary_df(self):
        """Cria DataFrame resumido dos resultados coletados"""
        if not self.results:
            return pd.DataFrame()
        
        df = pd.DataFrame(self.results)
        
        if 'classification' in df.columns:
            df['label'] = df['classification'].apply(
                lambda x: x.get('label', 'N/A') if isinstance(x, dict) else 'N/A'
            )
            df['score'] = df['classification'].apply(
                lambda x: x.get('score', 0.0) if isinstance(x, dict) else 0.0
            )
        
        return df
    
    def generate_statistics(self, df):
        """Gera estatísticas resumidas"""
        if df.empty:
            return {}
        
        stats = {
            "total_messages": len(df),
            "classified": len(df[df['status'] == 'classified']),
            "errors": len(df[df['status'] == 'error']),
            "classification_rate": 0.0
        }
        
        if stats['total_messages'] > 0:
            stats['classification_rate'] = (
                stats['classified'] / stats['total_messages'] * 100
            )
        
        if 'label' in df.columns:
            stats['label_distribution'] = df['label'].value_counts().to_dict()
        
        if 'score' in df.columns:
            stats['avg_confidence'] = df[df['status'] == 'classified']['score'].mean()
            stats['min_confidence'] = df[df['status'] == 'classified']['score'].min()
            stats['max_confidence'] = df[df['status'] == 'classified']['score'].max()
        
        if 'error_type' in df.columns:
            error_df = df[df['status'] == 'error']
            if not error_df.empty:
                stats['error_types'] = error_df['error_type'].value_counts().to_dict()
        
        return stats
    
    def display_summary(self, stats):
        """Exibe resumo formatado"""
        print("=" * 60)
        print("RESUMO DOS RESULTADOS DO CLASSIFICADOR")
        print("=" * 60)
        print(f"\nTotal de Mensagens: {stats.get('total_messages', 0)}")
        print(f"Classificadas com Sucesso: {stats.get('classified', 0)}")
        print(f"Erros: {stats.get('errors', 0)}")
        print(f"Taxa de Classificação: {stats.get('classification_rate', 0):.2f}%")
        
        if 'avg_confidence' in stats:
            print(f"\nScores de Confiança:")
            print(f"  Média: {stats['avg_confidence']:.4f}")
            print(f"  Mínimo: {stats['min_confidence']:.4f}")
            print(f"  Máximo: {stats['max_confidence']:.4f}")
        
        if 'label_distribution' in stats:
            print(f"\nDistribuição de Labels:")
            for label, count in stats['label_distribution'].items():
                percentage = (count / stats['total_messages']) * 100
                print(f"  {label}: {count} ({percentage:.2f}%)")
        
        if 'error_types' in stats:
            print(f"\nTipos de Erro:")
            for error_type, count in stats['error_types'].items():
                print(f"  {error_type}: {count}")
        
        print("=" * 60)

In [None]:
async def analyze_results(limit=None):
    """Analisa resultados do classificador e exibe resumo"""
    monitor = ClassifierMonitor()
    
    print("Coletando resultados das filas...")
    results = await monitor.collect_results(limit=limit)
    
    if not results:
        print("Nenhum resultado encontrado nas filas.")
        return None
    
    df = monitor.create_summary_df()
    
    stats = monitor.generate_statistics(df)
    
    monitor.display_summary(stats)
    
    return df, stats

In [None]:
df, stats = await analyze_results(limit=1000)

In [None]:
if df is not None and not df.empty:
    print("\nResultados de Amostra:")
    display(df.head(10))
    
    try:
        import matplotlib.pyplot as plt
        
        if 'label' in df.columns:
            df['label_pt'] = df['label'].map({
                'true': 'Verdadeiro',
                'false': 'Falso',
                True: 'Verdadeiro',
                False: 'Falso'
            }).fillna(df['label'])
            
            fig, axes = plt.subplots(1, 2, figsize=(15, 6))
            
            label_counts = df['label_pt'].value_counts()
            colors = ['#2ecc71', '#e74c3c', '#3498db', '#f39c12']
            explode = [0.05] * len(label_counts)
            
            axes[0].pie(
                label_counts.values, 
                labels=label_counts.index,
                autopct='%1.1f%%',
                startangle=90,
                colors=colors[:len(label_counts)],
                explode=explode,
                shadow=True
            )
            axes[0].set_title('Distribuição de Labels de Classificação', fontsize=14, fontweight='bold')
            
            if 'score' in df.columns:
                classified_scores = df[df['status'] == 'classified']['score']
                axes[1].hist(classified_scores, bins=20, color='#3498db', edgecolor='black', alpha=0.7)
                axes[1].set_title('Distribuição de Score de Confiança', fontsize=14, fontweight='bold')
                axes[1].set_xlabel('Score de Confiança')
                axes[1].set_ylabel('Frequência')
                axes[1].grid(axis='y', alpha=0.3)
            
            plt.tight_layout()
            plt.show()
    except ImportError:
        print("\nInstale matplotlib para visualizações: pip install matplotlib")