## Construção de Interpretadores - RA2

#### Alunos: Marcelo Wzorek Filho

### Instalação de Bibliotecas

In [14]:
!pip install scikit-learn
!pip install faiss-cpu
!python -m spacy download pt_core_news_sm

Collecting pt-core-news-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/pt_core_news_sm-3.8.0/pt_core_news_sm-3.8.0-py3-none-any.whl (13.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.0/13.0 MB[0m [31m23.4 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('pt_core_news_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


### Importações de Bibliotecas

In [15]:
import os
import re
import string
import requests
import nltk
import spacy
import numpy as np
import faiss
import traceback
import time

from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
from sentence_transformers import SentenceTransformer, util, CrossEncoder
from transformers import pipeline

import gradio as gr

### Tento usar o recurso Question Answering para aprimorar as respostas

In [16]:
try:
    from transformers import pipeline
    QA_AVAILABLE = True
except ImportError:
    QA_AVAILABLE = False

### Finalizo a configuração inicial dos modelos de Processamento de Linguagem Natural.

Garanto que as palavras comuns que não agregam significado estejam disponíveis para serem removidas, otimizando o processamento.

Carrego o modelo de análise linguística do português, incluindo a segmentação de sentenças e o reconhecimento de entidades.

Carrego dois modelos de transformers: SentenceTransformer para gerar embeddings de sentenças e o CrossEncoder para reranking.

Tento carregar um pipeline de Question Answering da biblioteca transformers
Se esse recurso estiver disponivel vai  permitir extrair respostas mais precisas.

In [17]:
try:
    nltk.data.find('corpora/stopwords')
except Exception:
    nltk.download('stopwords')
stop_words = set(stopwords.words('portuguese'))

nlp = spacy.load('pt_core_news_sm')
if 'sentencizer' not in nlp.pipe_names:
    nlp.add_pipe('sentencizer')

EMBEDDING_MODEL_NAME = 'paraphrase-multilingual-MiniLM-L12-v2'
CROSS_ENCODER_NAME = 'cross-encoder/ms-marco-MiniLM-L-6-v2'
cross_encoder = CrossEncoder(CROSS_ENCODER_NAME)

qa_pipeline = None
if QA_AVAILABLE:
    try:
        qa_pipeline = pipeline(
            "question-answering",
            model="pierreguillou/bert-base-cased-squad-v1.1-portuguese",
            tokenizer="pierreguillou/bert-base-cased-squad-v1.1-portuguese"
        )
    except Exception as e:
        qa_pipeline = None

Device set to use cpu


### Preparação do texto do livro.

In [18]:
# Baixa um arquivo de texto a partir de URL do meu GitHub
def download_text_from_github(raw_url: str, output_path: str) -> None:
    print(f"Baixando texto de: {raw_url}")
    resp = requests.get(raw_url)
    resp.raise_for_status()
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(resp.text)
    print(f"Salvo em {output_path}")

# Carrega o conteúdo do arquivo de texto assim que ele já estiver localmente no colab
def load_text_file(path: str, encoding: str = 'utf-8') -> str:
    with open(path, 'r', encoding=encoding) as f:
        return f.read()

# Remove o cabeçalho e rodapé
def strip_gutenberg_headers(text: str) -> str:
    start = re.search(r'CAP[IÍ]TULO\s+I', text, flags=re.IGNORECASE)
    if start:
        text = text[start.start():]
    end = re.search(r'Fim do Texto', text, flags=re.IGNORECASE)
    if end:
        text = text[:end.start()]
    return text

# Normaliza quebras de linha
def normalize_line_breaks(text: str) -> str:
    text = text.replace('\r\n', '\n').replace('\r', '\n')
    text = re.sub(r'\n{3,}', '\n\n', text)
    return text

# Divide o texto em sentenças
def split_sentences(text: str) -> list:
    sentences = []
    for doc in nlp.pipe([text], disable=['parser','ner']):
        for sent in doc.sents:
            txt = sent.text.strip()
            if txt:
                sentences.append(txt)
    print(f"Segmentadas {len(sentences)} sentenças.")
    return sentences

# Cria chunks de sentenças
def build_sliding_windows(sentences: list, window_size: int = 4, overlap: int = 2) -> list:
    windows = []
    for i in range(0, len(sentences), window_size - overlap):
        win = sentences[i:i+window_size]
        if win:
            windows.append(" ".join(win))
    print(f"Construídas {len(windows)} janelas deslizantes (window_size={window_size}, overlap={overlap}).")
    return windows

# Limpa e lematiza os textos
def clean_and_lemmatize_texts(texts: list) -> list:
    cleaned = []
    for doc in nlp.pipe(texts, disable=['parser','ner']):
        tokens = []
        for token in doc:
            if token.is_alpha and token.lemma_.lower() not in stop_words:
                tokens.append(token.lemma_.lower())
        cleaned.append(" ".join(tokens))
    return cleaned

# Extrai entidades nomeadas das sentenças (personagens, locais, etc...)
def extract_entities(sentences: list) -> dict:
    entities = {}
    for doc in nlp.pipe(sentences, disable=['parser']):
        for ent in doc.ents:
            key = ent.text.strip()
            entities.setdefault(key, []).append(doc.text)
    print(f"Entidades extraídas: {len(entities)} chaves únicas.")
    return entities

### Funções para indexação e representação vetorial

In [19]:
# Constrói a matriz para os textos limpos e captura a importância das palavras
def build_tfidf(texts_clean: list) -> (TfidfVectorizer, np.ndarray):
    print("Construindo TF-IDF (ngram 1-2, sublinear_tf)...")
    vectorizer = TfidfVectorizer(ngram_range=(1,2), sublinear_tf=True)
    tfidf_matrix = vectorizer.fit_transform(texts_clean)
    print(f"TF-IDF matrix shape: {tfidf_matrix.shape}")
    return vectorizer, tfidf_matrix

# Carrega o modelo de embeddings de sentenças pré-treinado
def load_sentence_embedding_model(model_name: str):
    print(f"Carregando modelo de embeddings: {model_name}")
    return SentenceTransformer(model_name)

# Gera representações numéricas para cada janela de texto
def build_sentence_embeddings(texts: list, model: SentenceTransformer) -> np.ndarray:
    print("Gerando embeddings dos chunks/janelas (isso pode levar um tempo)...")
    embs = model.encode(texts, convert_to_tensor=False, show_progress_bar=True)
    arr = np.array(embs, dtype='float32')
    print(f"Embeddings shape: {arr.shape}")
    return arr

# Constrói um índice para realizar buscas de similaridade vetorial
def build_faiss_index(embeddings: np.ndarray):
    print("Construindo índice FAISS (IndexFlatIP, com L2-normalização)...")
    faiss.normalize_L2(embeddings)
    d = embeddings.shape[1]
    index = faiss.IndexFlatIP(d)
    index.add(embeddings)
    print(f"Índice FAISS com {index.ntotal} vetores, dimensão {d}.")
    return index

Funções de lógica do Chatbot e geração das respostas

In [20]:
# Detecta a intenção da pergunta do usuário
def detect_intent(question: str) -> str:
    q = question.strip().lower()
    if re.match(r'^(quem)\s*', q): return 'quem'
    if re.match(r'^(onde)\s*', q): return 'onde'
    if re.match(r'^(quando)\s*', q): return 'quando'
    if re.match(r'^(o que|oque)\s*', q): return 'o que'
    if re.match(r'^(por que|porque)\s*', q): return 'por que'
    if re.match(r'^(como)\s*', q): return 'como'
    return 'outro'

# O uso de sinônimos acabou sendo usado so para testes, é mutio trabalhoso ================
SINONIMOS = {}

# Expande a consulta do usuário com sinônimos
def expand_query(question: str) -> str:
    doc = nlp(question)
    termos = [token.lemma_.lower() for token in doc if token.pos_ in ('NOUN','ADJ')]
    extras = []
    for t in termos:
        extras.extend(SINONIMOS.get(t, []))
    if extras:
        return question + " " + " ".join(extras)
    return question
# =========================================================================================

# Tenta extrair uma entidade específica da pergunta
def extract_entity_from_question(question: str) -> str:
    m = re.search(r'rio\s+([A-Za-zÀ-ÖØ-öø-ÿ]+)', question, flags=re.IGNORECASE)
    if m:
        return m.group(1).lower()
    return None

# Reordena os resultados da busca usando o modelo CrossEncoder
def rerank_with_cross_encoder(question: str, candidates: list, top_n: int = 5) -> list:
    if not candidates:
        return []
    pairs = [[question, c] for c in candidates]
    scores = cross_encoder.predict(pairs)
    ranked = sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)
    return [c for c, _ in ranked[:top_n]]

# Extrai a sentença mais relevante de uma janela de texto para a resposta
def extract_best_sentence(question: str, window: str) -> str:
    sents = [s.strip() for s in re.split(r'(?<=[.!?])\s+', window) if s.strip()]
    if not sents:
        return window
    pairs = [[question, s] for s in sents]
    scores = cross_encoder.predict(pairs)
    best_idx = int(np.argmax(scores))
    return sents[best_idx]

# Realiza uma busca combinada nas janelas de texto
def hybrid_search_windows(question: str,
                          windows: list,
                          embed_model: SentenceTransformer,
                          faiss_index,
                          tfidf_vectorizer: TfidfVectorizer,
                          tfidf_matrix,
                          semantic_weight: float = 0.7,
                          keyword_weight: float = 0.3,
                          top_k_semantic: int = 20) -> list:
    q_expanded = expand_query(question)

    # Busca Semântica
    query_emb = embed_model.encode(q_expanded, convert_to_tensor=False)
    query_emb = np.array(query_emb, dtype='float32')
    faiss.normalize_L2(query_emb.reshape(1, -1))

    # Verifica a dimensão antes de search
    if query_emb.shape[0] != faiss_index.d:
        raise ValueError(f"Dimensão inconsistente: query {query_emb.shape[0]}, FAISS index {faiss_index.d}")

    D, I = faiss_index.search(query_emb.reshape(1, -1), top_k_semantic)
    top_indices = I[0]
    sem_scores = D[0]

    # Busca por Palavra-chave
    q_clean_list = clean_and_lemmatize_texts([q_expanded])
    q_clean = q_clean_list[0] if q_clean_list else ""
    q_tfidf = tfidf_vectorizer.transform([q_clean]) if q_clean else tfidf_vectorizer.transform([""])
    q_norm = np.linalg.norm(q_tfidf.data) if q_tfidf.data.size > 0 else 0.0

    results = []
    for idx, sem_score in zip(top_indices, sem_scores):
        win = windows[idx]
        win_vec = tfidf_matrix[idx]

        # Calcula similaridade de cossenos
        dot = q_tfidf.dot(win_vec.T).data
        dot_val = float(dot[0]) if dot.size > 0 else 0.0
        win_norm = np.linalg.norm(win_vec.data) if win_vec.data.size > 0 else 0.0
        kw_score = dot_val / (q_norm * win_norm) if (q_norm > 0 and win_norm > 0) else 0.0

        # Combinação ponderada
        final_score = semantic_weight * float(sem_score) + keyword_weight * kw_score
        results.append({
            'window': win,
            'sem_score': float(sem_score),
            'kw_score': kw_score,
            'final_score': final_score
        })
    results_sorted = sorted(results, key=lambda x: x['final_score'], reverse=True)
    return results_sorted

# Formata a resposta na intenção detectada
def template_response(intent: str, target: str = None, content: str = "") -> str:
    if intent == 'quem':
        if target:
            return f"Parece que você quer saber sobre '{target}'. Aqui está o que encontrei:\n\n{content}"
        else:
            return f"Pergunta de tipo 'quem'. Veja abaixo informações relevantes:\n\n{content}"
    if intent == 'onde':
        return f"Sobre localização/ambiente: veja o trecho relevante:\n\n{content}"
    if intent == 'quando':
        return f"Sugestões de trechos com indicação temporal:\n\n{content}"
    if intent == 'o que':
        return f"Explicação/conceito/descrição:\n\n{content}"
    if intent == 'por que':
        return f"Motivações ou razões encontradas no texto:\n\n{content}"
    if intent == 'como':
        return f"Processos ou descrições de modo:\n\n{content}"
    return f"Aqui estão os trechos mais relevantes para sua pergunta:\n\n{content}"

# Orquestra todo o processo de resposta
def respond(question: str,
            windows: list,
            windows_clean: list,
            embed_model: SentenceTransformer,
            faiss_index,
            tfidf_vectorizer: TfidfVectorizer,
            tfidf_matrix,
            entities_dict: dict,
            top_k_semantic: int = 20,
            n_results: int = 5) -> str:
    if not question or not question.strip():
        return "Por favor, digite uma pergunta."

    intent = detect_intent(question)

    # Ajusta pesos da busca híbrida
    if intent in ('como', 'o que', 'por que'):
        semantic_weight, keyword_weight = 0.5, 0.5
    elif intent in ('quem', 'onde', 'quando'):
        semantic_weight, keyword_weight = 0.7, 0.3
    else:
        semantic_weight, keyword_weight = 0.6, 0.4

    # Tratamento para perguntas 'quem' usando entidades
    if intent == 'quem':
        m = re.match(r'quem\s+(é|foi)\s+(.*)\?*', question.strip(), flags=re.IGNORECASE)
        if m:
            entity_query = m.group(2).strip().rstrip('?').strip()
            found_key = None
            for ent in entities_dict.keys():
                if ent.lower() == entity_query.lower():
                    found_key = ent
                    break
            if not found_key:
                for ent in entities_dict.keys():
                    if entity_query.lower() in ent.lower() or ent.lower() in entity_query.lower():
                        found_key = ent
                        break

            if found_key:
                sents = entities_dict[found_key][:n_results]
                content = "\n".join(f"- {s}" for s in sents)
                return template_response(intent, target=found_key, content=content)

    # Realiza a busca híbrida
    results = hybrid_search_windows(
        question, windows, embed_model, faiss_index,
        tfidf_vectorizer, tfidf_matrix,
        semantic_weight=semantic_weight,
        keyword_weight=keyword_weight,
        top_k_semantic=top_k_semantic
    )

    # Filtra os resultados
    entity = extract_entity_from_question(question)
    if entity:
        filtered = [r for r in results if entity in r['window'].lower()]
        if filtered:
            filtered = sorted(filtered, key=lambda x: x['final_score'], reverse=True)
            top_for_rerank = [r['window'] for r in filtered[: top_k_semantic]]
        else:
            top_for_rerank = [r['window'] for r in results[: top_k_semantic]]
    else:
        top_for_rerank = [r['window'] for r in results[: top_k_semantic]]

    # Reranking dos melhores candidatos
    final_windows = rerank_with_cross_encoder(question, top_for_rerank, top_n=n_results)

    parts = []
    for i, w in enumerate(final_windows):
        best_sent = extract_best_sentence(question, w)

        if qa_pipeline:
            try:
                qa_out = qa_pipeline({'question': question, 'context': w})
                answer_span = qa_out.get('answer', '').strip()
                if answer_span and answer_span.lower() not in best_sent.lower():
                    best_sent = answer_span
            except Exception as qa_e:
                print(f"Erro no pipeline de QA para a janela: {qa_e}")
        parts.append(f"➡ Resultado {i+1}:\n{best_sent}")

    # Formata a resposta final
    content = "\n\n".join(parts) if parts else "Não encontrei trechos relevantes no livro para esta pergunta."
    return template_response(intent, content=content)

# Cria e configura a interface usando gradio.
def create_chatbot_interface(
    windows: list,
    windows_clean: list,
    embed_model: SentenceTransformer,
    faiss_index,
    tfidf_vectorizer: TfidfVectorizer,
    tfidf_matrix,
    entities_dict: dict,
    top_k_semantic: int = 20,
    n_results: int = 5
) -> gr.Blocks:
    def respond_chat(question: str) -> str:
        try:
            return respond(
                question,
                windows, windows_clean,
                embed_model,
                faiss_index,
                tfidf_vectorizer,
                tfidf_matrix,
                entities_dict,
                top_k_semantic=top_k_semantic,
                n_results=n_results
            )
        except Exception as e:
            traceback.print_exc()
            return f"Ocorreu um erro interno ao processar sua pergunta. Por favor, tente novamente. Erro: {type(e).__name__}: {e}"

    with gr.Blocks() as demo:
        gr.Markdown("## Chatbot 'O Guarani'")

        output_box = gr.Textbox(interactive=False, label="Resposta do Chatbot", elem_id="chatbot_output", lines=12, placeholder="As respostas aparecerão aqui...")
        input_box = gr.Textbox(lines=3, placeholder="Ex: Quem é Peri? Onde se passa a história? O que acontece no final?", label="Sua Pergunta", elem_id="question_input")

        with gr.Row():
            submit_btn = gr.Button("Enviar Pergunta")
            clear_btn = gr.Button("Limpar Chat")

        submit_btn.click(fn=respond_chat, inputs=input_box, outputs=output_box)
        clear_btn.click(fn=lambda: "", inputs=None, outputs=input_box)

    return demo

### Fluxo de obtenção e rré-processamento do texto

In [21]:
# Registra o tempo de início
start_time = time.time()
# Repositorio onde deixei o texto original de "O Guarani"
raw_url = "https://raw.githubusercontent.com/marcelowf/O-Guarani-Chat-Bot/main/Anexo_Imprimir_O_Guarani.txt"
text_path = 'o_guarani.txt'

# Baixa o textodo meu Github
download_text_from_github(raw_url, text_path)
text = load_text_file(text_path)
print("\nPrimeiras 300 caracteres do texto recuperado:")
print(text[:300])

# Remove cabeçalhos e normaliza quebras de linha
text = strip_gutenberg_headers(text)
text = normalize_line_breaks(text)
print("\nPrimeiras 500 caracteres do texto após stripping e normalização:")
print(text[:500])

# Segmenta o texto em sentenças
sentences = split_sentences(text)
print("\nExemplo de sentenças segmentadas (primeiras 5):")
for i, s in enumerate(sentences[:5]):
    print(f"[{i+1}] {s}")

# Cria chunks de sentenças a partir do texto
windows = build_sliding_windows(sentences, window_size=4, overlap=2)
print("\nExemplo de janelas deslizantes (primeiras 2):")
for i, w in enumerate(windows[:2]):
    print(f"[{i+1}] {w}")

# Limpa e lematiza as palavras dentro das janelas
windows_clean = clean_and_lemmatize_texts(windows)
print("\nExemplo de janelas limpas e lematizadas (primeiras 2):")
for i, wc in enumerate(windows_clean[:2]):
    print(f"[{i+1}] {wc}")

# Extrai entidades nomeadas
entities_dict = extract_entities(sentences)
print("\nAlgumas entidades extraídas (primeiras 5 chaves):")
for i, (entity, sents) in enumerate(list(entities_dict.items())[:5]):
    print(f"- {entity}: {len(sents)} ocorrências")

# Exibe o tempo gasto com o pré-processamento
print(f"\nPré-processamento textual concluído em {time.time() - start_time:.2f}s.")

Baixando texto de: https://raw.githubusercontent.com/marcelowf/O-Guarani-Chat-Bot/main/Anexo_Imprimir_O_Guarani.txt
Salvo em o_guarani.txt

Primeiras 300 caracteres do texto recuperado:
O Guarani


					José de Alencar





Conteúdo exportado da Wikisource em 13 de junho de 2025





Índice





Primeira Parte


Capítulo I: Cenário

Capítulo II: Lealdade

Capítulo III: A bandeira

Capítulo IV: Caçada

Capítulo V: Loura e morena

Capítulo VI: A volta

Capítulo VII: A prece

Capítulo

Primeiras 500 caracteres do texto após stripping e normalização:
Capítulo I: Cenário

Capítulo II: Lealdade

Capítulo III: A bandeira

Capítulo IV: Caçada

Capítulo V: Loura e morena

Capítulo VI: A volta

Capítulo VII: A prece

Capítulo VIII: Três linhas

Capítulo IX: Amor

Capítulo X: Ao alvorecer

Capítulo XI: No banho

Capítulo XII: A onça

Capítulo XIII: Revelação

Capítulo XIV: A Índia

Capítulo XV: Os três

Segunda Parte

Capítulo I: O carmelita

Capítulo II: Iara!

Capítulo III: Gênio do mal

Capítu

### Finalização

In [22]:
# Registra o tempo da vetorização e indexação
start_vectorization_time = time.time()

# Constrói a matriz para guardar a importância de cada palavra nas janelas de texto
# Na maioria dos casos a matrix vai acabar sendo esparsa porque a maioria das palavras do vocabulário geral não aparece na maioria das suas janelas de texto (como foi visto em sala)
tfidf_vectorizer, tfidf_matrix = build_tfidf(windows_clean)
print("\nAlgumas linhas da Matriz TF-IDF (para as primeiras 5 janelas):")
print(tfidf_matrix[:5].toarray())

# Carrega o modelo de embeddings, usado para converter textos em vetores numéricos que capturam o significado semântico
embed_model = load_sentence_embedding_model(EMBEDDING_MODEL_NAME)

# Gera os embeddings para todas as janelas, permitem buscar com base na similaridade de significado
embeddings = build_sentence_embeddings(windows, embed_model)
print("\nForma dos vetores de embeddings (número de janelas x dimensão do vetor):")
print(embeddings.shape)
print("Primeiros 10 elementos do primeiro vetor de embedding (exemplo de representação numérica):")
print(embeddings[0][:10])

# Vetoriza a busca
faiss_index = build_faiss_index(embeddings)

# Exibe o tempo total gasto na fase de vetorização e indexação.
print(f"\nVetorização e indexação concluídas em {time.time() - start_vectorization_time:.2f}s.")

Construindo TF-IDF (ngram 1-2, sublinear_tf)...
TF-IDF matrix shape: (2881, 54268)

Algumas linhas da Matriz TF-IDF (para as primeiras 5 janelas):
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
Carregando modelo de embeddings: paraphrase-multilingual-MiniLM-L12-v2
Gerando embeddings dos chunks/janelas (isso pode levar um tempo)...


Batches:   0%|          | 0/91 [00:00<?, ?it/s]

Embeddings shape: (2881, 384)

Forma dos vetores de embeddings (número de janelas x dimensão do vetor):
(2881, 384)
Primeiros 10 elementos do primeiro vetor de embedding (exemplo de representação numérica):
[ 0.08045717  0.16845477  0.14468509  0.08331416 -0.14752582  0.00966158
 -0.13236022  0.05228558  0.08348729 -0.09050141]
Construindo índice FAISS (IndexFlatIP, com L2-normalização)...
Índice FAISS com 2881 vetores, dimensão 384.

Vetorização e indexação concluídas em 298.21s.


### Chamando a Interface

In [23]:
# Passando os componentes pré-processados e modelos carregados para a interface
interface = create_chatbot_interface(
    windows, windows_clean,
    embed_model,
    faiss_index,
    tfidf_vectorizer,
    tfidf_matrix,
    entities_dict,
    top_k_semantic=20,
    n_results=5
)

interface.launch(debug=True)

It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://25d79140db6b8f36b8.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://25d79140db6b8f36b8.gradio.live


