# POC: Busca Vetorial Multi-Campo para DNE Real

Sistema de busca inteligente com embeddings separados por campo, scoring dinâmico e filtro por UF.

## Arquitetura
1. **EmbeddingService**: Normalização + geração de embeddings
2. **IndexBuilder**: Construção de índices FAISS por campo
3. **SearchEngine**: Busca vetorial com agregação ponderada
4. **Dataset Real**: Carrega DNE de `data/dne.parquet`

## Features
- ✅ Busca multi-campo (logradouro, bairro, cidade)
- ✅ Pesos dinâmicos baseados em campos presentes
- ✅ Filtro por UF para maior determinismo
- ✅ CEP como match exato (não vetorial)
- ✅ Normalização de abreviações
- ✅ Threshold 0.8 para alta confiança

In [None]:
import pandas as pd
import numpy as np
import json
import re
import faiss
import pickle
from pathlib import Path
from typing import Dict, List, Optional
from sentence_transformers import SentenceTransformer
from unidecode import unidecode
from tqdm import tqdm

## 1. EmbeddingService - Normalização e Embeddings

In [None]:
class EmbeddingService:
    """Serviço para normalização e geração de embeddings de endereços"""
    
    def __init__(self, model_name: str = "neuralmind/bert-base-portuguese-cased"):
        """Inicializa o serviço de embeddings"""
        print(f"Carregando modelo: {model_name}")
        self.model = SentenceTransformer(model_name)
        self.embedding_dim = self.model.get_sentence_embedding_dimension()
    
    @staticmethod
    def normalize_text(text: str) -> str:
        """Normaliza texto de endereço brasileiro"""
        if not text or not isinstance(text, str):
            return ""
        
        text = unidecode(text)
        text = text.lower()
        
        replacements = {
            r'\br\.': 'rua',
            r'\bav\.': 'avenida',
            r'\btrav\.': 'travessa',
            r'\balam\.': 'alameda',
            r'\bpca\.': 'praca',
            r'\bjd\.': 'jardim',
            r'\bvl\.': 'vila',
            r'\bcj\.': 'conjunto',
            r'\bqd\.': 'quadra',
            r'\blt\.': 'lote',
        }
        
        for pattern, replacement in replacements.items():
            text = re.sub(pattern, replacement, text)
        
        text = re.sub(r'[^\w\s]', ' ', text)
        text = re.sub(r'\s+', ' ', text).strip()
        
        return text
    
    def embed_text(self, text: str) -> np.ndarray:
        """Gera embedding para um texto"""
        normalized = self.normalize_text(text)
        if not normalized:
            return np.zeros(self.embedding_dim, dtype=np.float32)
        return self.model.encode(normalized, convert_to_numpy=True, show_progress_bar=False)
    
    def embed_address_fields(self, address: Dict[str, str]) -> Dict[str, np.ndarray]:
        """Gera embeddings para cada campo do endereço"""
        embeddings = {}
        for field in ['logradouro', 'bairro', 'cidade']:
            text = address.get(field, '')
            embeddings[field] = self.embed_text(text)
        return embeddings
    
    def embed_batch(self, texts: list) -> np.ndarray:
        """Gera embeddings para um lote de textos"""
        normalized_texts = [self.normalize_text(t) for t in texts]
        normalized_texts = [t if t else " " for t in normalized_texts]
        embeddings = self.model.encode(
            normalized_texts, 
            convert_to_numpy=True, 
            show_progress_bar=True,
            batch_size=32
        )
        return embeddings.astype(np.float32)

## 2. IndexBuilder - Construção de Índices FAISS

In [None]:
class IndexBuilder:
    """Construtor de índices FAISS para busca vetorial de endereços"""
    
    def __init__(self, embedding_service: EmbeddingService):
        self.embedding_service = embedding_service
        self.indices = {}
        self.dataframe = None
    
    def build_indices(self, df: pd.DataFrame, fields: list = None) -> dict:
        """Constrói índices FAISS para cada campo"""
        if fields is None:
            fields = ['logradouro', 'bairro', 'cidade']
        
        self.dataframe = df.copy()
        n_records = len(df)
        
        print(f"Construindo índices FAISS para {n_records} endereços")
        
        for field in fields:
            texts = df[field].fillna('').astype(str).tolist()
            embeddings = self.embedding_service.embed_batch(texts)
            dimension = embeddings.shape[1]
            index = faiss.IndexFlatL2(dimension)
            index.add(embeddings)
            self.indices[field] = index
        
        return self.indices
    
    def save_indices(self, output_dir: str):
        """Salva índices FAISS e dataframe em disco"""
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        
        for field, index in self.indices.items():
            index_file = output_path / f"{field}_index.faiss"
            faiss.write_index(index, str(index_file))
        
        df_file = output_path / "addresses.parquet"
        self.dataframe.to_parquet(df_file, index=False)
        
        metadata = {
            'fields': list(self.indices.keys()),
            'n_records': len(self.dataframe),
            'embedding_dim': self.embedding_service.embedding_dim
        }
        metadata_file = output_path / "metadata.pkl"
        with open(metadata_file, 'wb') as f:
            pickle.dump(metadata, f)
    
    def load_indices(self, input_dir: str):
        """Carrega índices FAISS e dataframe do disco"""
        input_path = Path(input_dir)
        
        metadata_file = input_path / "metadata.pkl"
        with open(metadata_file, 'rb') as f:
            metadata = pickle.load(f)
        
        df_file = input_path / "addresses.parquet"
        self.dataframe = pd.read_parquet(df_file)
        
        for field in metadata['fields']:
            index_file = input_path / f"{field}_index.faiss"
            self.indices[field] = faiss.read_index(str(index_file))
        
        return self.indices, self.dataframe

## 3. SearchEngine - Motor de Busca Multi-Campo

In [None]:
class SearchEngine:
    """Motor de busca vetorial com pesos dinâmicos por campo"""
    
    def __init__(
        self, 
        embedding_service: EmbeddingService,
        indices: Dict[str, faiss.Index],
        dataframe: pd.DataFrame
    ):
        self.embedding_service = embedding_service
        self.indices = indices
        self.dataframe = dataframe
        
        self.base_weights = {
            'with_cep': {
                'cep': 0.30,
                'logradouro': 0.40,
                'bairro': 0.20,
                'cidade': 0.10
            },
            'without_cep': {
                'logradouro': 0.55,
                'bairro': 0.25,
                'cidade': 0.20
            }
        }
        
        self.use_uf_filter = True
        
        self.confidence_threshold = 0.8
    
    def _get_dynamic_weights(self, query: Dict[str, str]) -> Dict[str, float]:
        """Calcula pesos dinâmicos baseado nos campos presentes na query"""
        has_cep = bool(query.get('cep'))
        weights = self.base_weights['with_cep' if has_cep else 'without_cep'].copy()
        available_fields = [f for f in ['logradouro', 'bairro', 'cidade'] if query.get(f)]
        filtered_weights = {k: v for k, v in weights.items() if k in available_fields or k == 'cep'}
        total_weight = sum(filtered_weights.values())
        if total_weight > 0:
            normalized_weights = {k: v / total_weight for k, v in filtered_weights.items()}
        else:
            normalized_weights = filtered_weights
        return normalized_weights
    
    def _calculate_field_similarity(
        self, 
        field: str, 
        query_embedding: np.ndarray, 
        top_k: int = 100
    ) -> tuple:
        """Calcula similaridade para um campo específico"""
        index = self.indices[field]
        query_embedding = query_embedding.reshape(1, -1).astype(np.float32)
        distances, indices = index.search(query_embedding, top_k)
        similarities = 1.0 / (1.0 + distances[0])
        return similarities, indices[0]
    
    def _calculate_cep_match(self, query_cep: str, db_cep: str) -> float:
        """Calcula match exato ou parcial de CEP"""
        if not query_cep or not db_cep:
            return 0.0
        
        query_clean = query_cep.replace('-', '').replace('.', '')
        db_clean = db_cep.replace('-', '').replace('.', '')
        
        if query_clean == db_clean:
            return 1.0
        
        if len(query_clean) >= 5 and len(db_clean) >= 5:
            if query_clean[:5] == db_clean[:5]:
                return 0.5
        
        return 0.0
    
    def search(
        self, 
        query: Dict[str, str], 
        top_k: int = 5,
        search_k: int = 100
    ) -> str:
        """Realiza busca vetorial com scoring dinâmico"""
        weights = self._get_dynamic_weights(query)
        query_embeddings = self.embedding_service.embed_address_fields(query)
        
        candidate_scores = {}
        field_scores_map = {}
        
        for field in ['logradouro', 'bairro', 'cidade']:
            if not query.get(field):
                continue
            
            query_emb = query_embeddings[field]
            similarities, indices = self._calculate_field_similarity(field, query_emb, search_k)
            weight = weights.get(field, 0.0)
            
            for idx, sim in zip(indices, similarities):
                if self.use_uf_filter and query.get('uf'):
                    db_uf = self.dataframe.iloc[idx]['uf']
                    if db_uf != query['uf']:
                        continue
                
                if idx not in candidate_scores:
                    candidate_scores[idx] = 0.0
                    field_scores_map[idx] = {}
                
                candidate_scores[idx] += weight * sim
                field_scores_map[idx][field] = float(sim)
        
        if query.get('cep'):
            cep_weight = weights.get('cep', 0.0)
            for idx in candidate_scores.keys():
                db_cep = self.dataframe.iloc[idx]['cep']
                cep_score = self._calculate_cep_match(query.get('cep'), db_cep)
                candidate_scores[idx] += cep_weight * cep_score
                field_scores_map[idx]['cep'] = cep_score
        
        sorted_candidates = sorted(
            candidate_scores.items(), 
            key=lambda x: x[1], 
            reverse=True
        )[:top_k]
        
        results = []
        for idx, score in sorted_candidates:
            row = self.dataframe.iloc[idx]
            
            if score >= self.confidence_threshold:
                confidence = "high"
            elif score >= 0.6:
                confidence = "medium"
            else:
                confidence = "low"
            
            result = {
                "address": {
                    "logradouro": row['logradouro'],
                    "bairro": row['bairro'],
                    "cidade": row['cidade'],
                    "uf": row['uf'],
                    "cep": row['cep']
                },
                "score": float(score),
                "confidence": confidence,
                "field_scores": field_scores_map.get(idx, {})
            }
            results.append(result)
        
        response = {
            "results": results,
            "query": query,
            "total_found": len(results),
            "weights_used": weights
        }
        
        return json.dumps(response, ensure_ascii=False, indent=2)

## 4. Carregamento do DNE Real

In [None]:
# Caminho para o arquivo DNE
dne_path = Path('../data/dne.parquet')

if not dne_path.exists():
    raise FileNotFoundError(
        f"Arquivo DNE não encontrado em: {dne_path}\n"
        "Por favor, coloque o arquivo 'dne.parquet' na pasta 'data/'"
    )

print(f"Carregando DNE de: {dne_path}")
df_dne_raw = pd.read_parquet(dne_path)

print(f"\nDataset carregado: {len(df_dne_raw)} registros")
print(f"\nColunas disponíveis: {df_dne_raw.columns.tolist()}")

# Mapear colunas do DNE para formato esperado pelo sistema
column_mapping = {
    'logradouro_completo': 'logradouro',
    'bairro_completo': 'bairro',
    'cidade_completo': 'cidade'
}

# Renomear colunas
df_dne = df_dne_raw.rename(columns=column_mapping)

print(f"\nColunas mapeadas:")
for orig, new in column_mapping.items():
    if orig in df_dne_raw.columns:
        print(f"  {orig} → {new}")

# Validar colunas obrigatórias após mapeamento
required_cols = ['logradouro', 'bairro', 'cidade', 'uf', 'cep']
missing_cols = [col for col in required_cols if col not in df_dne.columns]

if missing_cols:
    raise ValueError(
        f"Colunas obrigatórias faltando no DNE: {missing_cols}\n"
        f"Colunas esperadas no arquivo: ['logradouro_completo', 'bairro_completo', 'cidade_completo', 'uf', 'cep']"
    )

print(f"\nDistribuição por UF:")
print(df_dne['uf'].value_counts())

print(f"\nRegistros com bairro vazio: {(df_dne['bairro'].fillna('') == '').sum()}")
print(f"Registros com logradouro vazio: {(df_dne['logradouro'].fillna('') == '').sum()}")

df_dne.head()

## 5. Pipeline Completo - Execução

In [None]:
embedding_service = EmbeddingService(model_name="neuralmind/bert-base-portuguese-cased")

In [None]:
index_builder = IndexBuilder(embedding_service)
indices = index_builder.build_indices(df_dne)

In [None]:
search_engine = SearchEngine(
    embedding_service=embedding_service,
    indices=indices,
    dataframe=df_dne
)
print(f"Motor de busca inicializado (threshold: {search_engine.confidence_threshold})")

## 6. Testes de Busca

In [None]:
# Exemplo de busca - ajuste conforme seus dados reais
query_example = {
    'logradouro': 'Rua das Flores',
    'bairro': 'Centro',
    'cidade': 'São Paulo',
    'uf': 'SP',
    'cep': '01000-000'
}

print("=== Teste de Busca ===")
print(f"Query: {query_example}\n")
result = search_engine.search(query_example, top_k=5)
print(result)

## 7. Salvar Índices (Opcional)

Salva os índices FAISS para reutilização futura sem precisar reconstruir

In [None]:
# Descomente para salvar os índices
# index_builder.save_indices('../data/indices')
# print("Índices salvos em: data/indices/")

## 8. Carregar Índices Salvos (Opcional)

Carrega índices previamente salvos para busca rápida

In [None]:
# Descomente para carregar índices salvos
# index_builder_loaded = IndexBuilder(embedding_service)
# indices_loaded, df_loaded = index_builder_loaded.load_indices('../data/indices')
# search_engine_loaded = SearchEngine(embedding_service, indices_loaded, df_loaded)
# print("Índices carregados com sucesso!")