In [0]:
%pip install -q pypdf python-docx python-pptx sentence-transformers faiss-cpu tqdm google-generativeai anthropic openai


In [0]:
dbutils.library.restartPython()

In [0]:
from dataclasses import dataclass
from typing import List, Dict, Any, Tuple
import os, re, json, time, uuid, math
from datetime import datetime

import numpy as np
from tqdm import tqdm

from pyspark.sql import functions as F
from pyspark.sql import types as T

from pypdf import PdfReader
import docx
from pptx import Presentation

# Embeddings e reranking 
from sentence_transformers import SentenceTransformer
import faiss

# LLMs
import openai
import anthropic
import google.generativeai as genai
import requests

# Databricks secrets
OPENAI_KEY   = dbutils.secrets.get("OPENAI",   "OPENAI_API_KEY")
ANTH_KEY     = dbutils.secrets.get("CLAUDE",   "ANTHROPIC_API_KEY")
GEMINI_KEY   = dbutils.secrets.get("GEMINI",   "GEMINI_API_KEY")
DEEPSEEK_KEY = dbutils.secrets.get("DEEPSEEK", "DEEPSEEK_API_KEY")

# Caminho do documento 
DOC_PATH = "/Volumes/bronze/default/documentos_agent/Manual de Grandes Felinos CarnÃ­voros.pdf"

CATALOG = "bronze"
SCHEMA  = "default"

TABLE_DOCS_RAW   = f"{CATALOG}.{SCHEMA}.docs_raw"
TABLE_CHUNKS     = f"{CATALOG}.{SCHEMA}.docs_chunks"
TABLE_EMBEDS     = f"{CATALOG}.{SCHEMA}.docs_embeddings"
TABLE_AUDIT      = f"{CATALOG}.{SCHEMA}.rag_audit"

# Evitar appends (LimitaÃ§Ãµes do Free Edition): 
OVERWRITE_MODE = "overwrite"

# ParÃ¢metros de chunking
CHUNK_SIZE = 800
CHUNK_OVERLAP = 120

# Modelo de embeddings (multilÃ­ngue, open, bom em PT-BR e bom pra diversas linguas)
# embeddings sÃ£o representaÃ§Ãµes numÃ©ricas de objetos, como palavras, frases, imagens, Ã¡udios ou atÃ© mesmo usuÃ¡rios e itens. Eles transformam esses dados complexos em vetores densos de nÃºmeros.
EMBED_MODEL_NAME = "BAAI/bge-m3"

# Top-k padrÃ£o
TOP_K_DEFAULT = 6
# ParÃ¢metro que determina quantos chunks o sistema de busca deve encontrar e enviar para o LLM gerar a resposta.



# Seed FAISS
np.random.seed(42)

#Ã­ndice vetorial da mesma maneira todas as vezes,
#Ele forÃ§a o FAISS a construir o seu Ã­ndice vetorial da mesma maneira todas as vezes


In [0]:
def read_pdf(path: str) -> str:
    reader = PdfReader(path)
    texts = []
    for page in reader.pages:
        try:
            texts.append(page.extract_text() or "")
        except Exception:
            texts.append("")
    return "\n".join(texts)

def read_docx(path: str) -> str:
    d = docx.Document(path)
    paras = []
    for p in d.paragraphs:
        paras.append(p.text)
    return "\n".join(paras)

def read_pptx(path: str) -> str:
    prs = Presentation(path)
    slides_txt = []
    for i, slide in enumerate(prs.slides):
        buf = []
        for shape in slide.shapes:
            if hasattr(shape, "text"):
                buf.append(shape.text)
        slides_txt.append(f"[SLIDE {i+1}]\n" + "\n".join(buf))
    return "\n\n".join(slides_txt)

def load_text_from_path(path: str) -> Tuple[str, str]:
    ext = os.path.splitext(path)[1].lower()
    if ext == ".pdf":
        return read_pdf(path), "pdf"
    elif ext in [".docx"]:
        return read_docx(path), "docx"
    elif ext in [".pptx"]:
        return read_pptx(path), "pptx"
    else:
        raise ValueError(f"ExtensÃ£o nÃ£o suportada: {ext} (suporte: .pdf, .docx, .pptx)")


In [0]:
def chunk_text(text: str, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP) -> List[str]:
    # Quebra simples por parÃ¡grafos + fallback por caracteres
    paragraphs = [p.strip() for p in re.split(r"\n{2,}", text) if p.strip()]
    if not paragraphs:
        paragraphs = [text]

    chunks = []
    buf = ""
    for p in paragraphs:
        if len(buf) + len(p) + 1 <= chunk_size:
            buf = (buf + "\n" + p).strip()
        else:
            # fecha o chunk atual
            if buf:
                chunks.append(buf)
            # inicia novo acumulador, com overlap do final do anterior
            if chunks and overlap > 0:
                tail = buf[-overlap:]
                buf = (tail + "\n" + p).strip()
            else:
                buf = p
        # flush final
    if buf:
        chunks.append(buf)

    # Garantir que nenhum chunk extrapole chunk_size
    final = []
    for c in chunks:
        if len(c) <= chunk_size:
            final.append(c)
        else:
            # fatiar por tamanho duro
            start = 0
            while start < len(c):
                end = min(start + chunk_size, len(c))
                final.append(c[start:end])
                start = end - overlap if overlap > 0 else end
    return final


In [0]:
raw_text, filetype = load_text_from_path(DOC_PATH)

raw_df = spark.createDataFrame(
    [(str(uuid.uuid4()), os.path.basename(DOC_PATH), DOC_PATH, filetype, raw_text, datetime.utcnow())],
    schema=T.StructType([
        T.StructField("doc_id", T.StringType(), False),
        T.StructField("filename", T.StringType(), False),
        T.StructField("path", T.StringType(), False),
        T.StructField("filetype", T.StringType(), False),
        T.StructField("text", T.StringType(), False),
        T.StructField("ingested_at_utc", T.TimestampType(), False),
    ])
)

# CREATE IF NOT EXISTS
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_DOCS_RAW} (
  doc_id STRING,
  filename STRING,
  path STRING,
  filetype STRING,
  text STRING,
  ingested_at_utc TIMESTAMP
) USING DELTA
""")

#lÃª existente, faz uniÃ£o e OVERWRITE
try:
    existing = spark.table(TABLE_DOCS_RAW)
    union_df = existing.unionByName(raw_df, allowMissingColumns=True)
except Exception:
    union_df = raw_df

union_df.write.mode(OVERWRITE_MODE).format("delta").saveAsTable(TABLE_DOCS_RAW)

display(spark.table(TABLE_DOCS_RAW).limit(5))


In [0]:
from pyspark.sql import functions as F, types as T, Window

# ParÃ¢metros
CHUNK_SIZE = 800
#FragmentaÃ§Ã£o da informaÃ§Ã£o


CHUNK_OVERLAP = 120
#CHUNK_OVERLAP Ã© a quantidade de caracteres que um chunk deve compartilhar com o prÃ³ximo. Dos 800, 120 sÃ£o comuns entre dois chunks. 
# O overlap Ã© crucial para nÃ£o perder contexto nas bordas dos chunks.


# Para teste: sÃ³ o documento atual e textos nÃ£o nulos
raw_df = (
    spark.table(TABLE_DOCS_RAW)
         .filter(F.col("path") == DOC_PATH)
         .filter(F.col("text").isNotNull() & (F.length("text") > 0))
         .select("doc_id", "text")
)

# Passo (janela deslizante com overlap). Garante que step >= 1
step = F.greatest(F.lit(1), F.lit(CHUNK_SIZE - CHUNK_OVERLAP))
# Exemplo: Com 800 - 120 = 680. Cada novo chunk comeÃ§arÃ¡ 680 caracteres depois do anterior, garantindo uma sobreposiÃ§Ã£o de 120 caracteres.


# InÃ­cio de cada janela: 0, step, 2*step, ... atÃ© o Ãºltimo inÃ­cio possÃ­vel
max_start = F.greatest(F.length("text") - F.lit(CHUNK_SIZE), F.lit(0))
starts = F.sequence(F.lit(0), max_start, step)

# Explode as janelas e gera os chunks com substring (1-indexed em Spark)
chunks_df = (
    raw_df
    .withColumn("start", F.explode(starts))
    .withColumn("chunk_text", F.substring(F.col("text"), F.col("start") + F.lit(1), F.lit(CHUNK_SIZE)))
    .withColumn("chunk_text", F.trim(F.col("chunk_text")))
    .filter(F.length("chunk_text") > 0)
)

# Atribui chunk_id sequencial por doc
w = Window.partitionBy("doc_id").orderBy("start")
chunks_df = (
    chunks_df
    .withColumn("chunk_id", F.row_number().over(w) - 1)
    .select("doc_id", "chunk_id", "chunk_text")
)

# Cria a tabela se nÃ£o existir
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_CHUNKS} (
  doc_id STRING,
  chunk_id INT,
  chunk_text STRING
) USING DELTA
""")

# OVERWRITE (sem union, sem coletar no driver)
(chunks_df
 .write
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .format("delta")
 .saveAsTable(TABLE_CHUNKS)
)

display(spark.table(TABLE_CHUNKS).limit(5))


In [0]:
# Carrega todos os chunks em memÃ³ria para embed
chunks_pd = spark.table(TABLE_CHUNKS).orderBy("doc_id","chunk_id").toPandas()

embed_model = SentenceTransformer(EMBED_MODEL_NAME)
embeds = embed_model.encode(chunks_pd["chunk_text"].tolist(), batch_size=64, normalize_embeddings=True)

# Prepara DF spark com vetor como ARRAY<DOUBLE>
def to_list_float32(v: np.ndarray) -> List[float]:
    return [float(x) for x in v.astype(np.float32).tolist()]

embedded_rows = []
for (doc_id, chunk_id, chunk_text), vec in zip(chunks_pd[["doc_id","chunk_id","chunk_text"]].values, embeds):
    embedded_rows.append( (doc_id, int(chunk_id), to_list_float32(vec)) )

emb_schema = T.StructType([
    T.StructField("doc_id", T.StringType(), False),
    T.StructField("chunk_id", T.IntegerType(), False),
    T.StructField("embedding", T.ArrayType(T.FloatType()), False),
])

emb_df = spark.createDataFrame(embedded_rows, emb_schema)

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_EMBEDS} (
  doc_id STRING,
  chunk_id INT,
  embedding ARRAY<FLOAT>
) USING DELTA
""")

try:
    existing = spark.table(TABLE_EMBEDS)
    union_df = existing.unionByName(emb_df, allowMissingColumns=True)
except Exception:
    union_df = emb_df

union_df.write.mode(OVERWRITE_MODE).format("delta").saveAsTable(TABLE_EMBEDS)

display(spark.table(TABLE_EMBEDS).limit(5))


In [0]:
# Etapa de IndexaÃ§Ã£o do RAG
# Carrega embeddings e monta FAISS
emb_pd = spark.table(TABLE_EMBEDS).orderBy("doc_id","chunk_id").toPandas()


mat = np.vstack(emb_pd["embedding"].apply(lambda v: np.array(v, dtype="float32")).to_list())
index = faiss.IndexFlatIP(mat.shape[1])   
# Similaridade por produto interno (usa embeddings normalizados)
# IndexFlatIP: Este Ã© o tipo de Ã­ndice mais simples (mais lento em grandes escalas, mas muito preciso) que usa Produto Interno (IP - Inner Product) para medir a similaridade. O Produto Interno Ã© usado quando os embeddings foram previamente normalizados (o que Ã© padrÃ£o em muitos modelos), sendo equivalente Ã  similaridade de cosseno.
index.add(mat)
# AdiÃ§Ã£o dos Vetores: index.add(mat): Insere todos os embeddings da matriz (mat) no Ã­ndice FAISS.

# Mapa (linha -> (doc_id, chunk_id))
line2meta = list(zip(emb_pd["doc_id"].tolist(), emb_pd["chunk_id"].tolist()))

# Para recuperar o texto do chunk
chunks_map = {
    (r["doc_id"], int(r["chunk_id"])): r["chunk_text"]
    for r in spark.table(TABLE_CHUNKS).collect()
}


In [0]:
os.environ["OPENAI_API_KEY"] = dbutils.secrets.get("OPENAI", "OPENAI_API_KEY")
os.environ["ANTHROPIC_API_KEY"] = dbutils.secrets.get("CLAUDE", "ANTHROPIC_API_KEY")
os.environ["GEMINI_API_KEY"]     = dbutils.secrets.get("GEMINI", "GEMINI_API_KEY")
os.environ["DEEPSEEK_API_KEY"]   = dbutils.secrets.get("DEEPSEEK", "DEEPSEEK_API_KEY")

def call_openai(prompt: str, model="gpt-4o-mini", temperature=0.0) -> str:
    client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    resp = client.chat.completions.create(
        model=model,
        messages=[
            {"role":"system","content":"Responda em portuguÃªs do Brasil e cite apenas o contexto fornecido."},
            {"role":"user","content": prompt}
        ],
        temperature=temperature
    )
    return resp.choices[0].message.content


def call_anthropic(prompt: str, model="claude-3-haiku-20240307", temperature=0.0) -> str:
    client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

    resp = client.messages.create(
        model=model,
        max_tokens=1500,
        temperature=temperature,
        system="Responda em portuguÃªs do Brasil e cite apenas o contexto fornecido.",
        messages=[{"role": "user", "content": prompt}]
    )
    return "".join([b.text for b in resp.content if b.type=="text"])


def call_gemini(prompt: str, model="gemini-1.5-flash", temperature=0.0) -> str:
    genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
    g = genai.GenerativeModel(model)

    resp = g.generate_content(
        [
            {"text": "Responda em portuguÃªs do Brasil e cite apenas o contexto fornecido."},
            {"text": prompt}
        ],
        generation_config={"temperature": temperature}
    )
    return resp.text


def call_deepseek(prompt: str, model="deepseek-chat", temperature=0.0) -> str:
    api_key = os.getenv("DEEPSEEK_API_KEY")
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    payload = {
        "model": model,
        "messages": [
            {"role":"system","content":"Responda em portuguÃªs do Brasil e cite apenas o contexto fornecido."},
            {"role":"user","content": prompt}
        ],
        "temperature": temperature
    }

    r = requests.post("https://api.deepseek.com/chat/completions", headers=headers, json=payload, timeout=60)
    r.raise_for_status()
    data = r.json()
    return data["choices"][0]["message"]["content"]


def call_llm(provider: str, prompt: str, model: str=None, temperature: float=0.0) -> str:
    p = provider.strip().lower()
    if p == "openai":
        return call_openai(prompt, model or "gpt-4o-mini", temperature)
    if p == "anthropic":
        return call_anthropic(prompt, model or "claude-3-haiku-20240307", temperature)
    if p == "gemini":
        return call_gemini(prompt, model or "gemini-1.5-flash", temperature)
    if p == "deepseek":
        return call_deepseek(prompt, model or "deepseek-chat", temperature)
    raise ValueError("Provider invÃ¡lido. Use: openai | anthropic | gemini | deepseek")

In [0]:
def embed_query(q: str) -> np.ndarray:
    v = embed_model.encode([q], normalize_embeddings=True)[0]
    return v.astype("float32")

def retrieve(q: str, k: int=TOP_K_DEFAULT) -> List[Tuple[str, int, str]]:
    v = embed_query(q)
    D, I = index.search(v.reshape(1, -1), k)
    hits = []
    for score, idx in zip(D[0], I[0]):
        if idx < 0: 
            continue
        doc_id, chunk_id = line2meta[idx]
        hits.append( (doc_id, int(chunk_id), chunks_map[(doc_id, int(chunk_id))]) )
    return hits

def build_prompt(question: str, passages: List[Tuple[str,int,str]]) -> str:
    ctx_parts = []
    for i,(doc_id, chunk_id, text) in enumerate(passages):
        ctx_parts.append(f"[{i}] doc_id={doc_id} chunk_id={chunk_id}\n{text}")
    ctx = "\n\n".join(ctx_parts)
    prompt = f"""VocÃª Ã© um assistente RAG. Responda **APENAS** com base nos trechos abaixo.
Se faltar evidÃªncia, diga explicitamente que nÃ£o sabe.
Inclua referÃªncias entre colchetes usando os Ã­ndices [i] mostrados nos trechos relevantes.

<<<TRECHOS>>>
{ctx}
<<<FIM TRECHOS>>>

Pergunta: {question}
Resposta:"""
    return prompt


In [0]:
# Tabela de auditoria
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_AUDIT} (
  run_id STRING,
  ts_utc TIMESTAMP,
  provider STRING,
  model STRING,
  question STRING,
  top_k INT,
  doc_ids ARRAY<STRING>,
  chunk_ids ARRAY<INT>,
  latency_ms DOUBLE,
  response_preview STRING
) USING DELTA
""")

def audit_log(provider: str, model: str, question: str, top_k: int,
              passages: List[Tuple[str,int,str]], latency_ms: float, response_preview: str):
    row = [(str(uuid.uuid4()),
            datetime.utcnow(),
            provider, model or "", question, int(top_k),
            [p[0] for p in passages],
            [p[1] for p in passages],
            float(latency_ms),
            response_preview[:1000])]
    df = spark.createDataFrame(row, schema=T.StructType([
        T.StructField("run_id", T.StringType(), False),
        T.StructField("ts_utc", T.TimestampType(), False),
        T.StructField("provider", T.StringType(), False),
        T.StructField("model", T.StringType(), False),
        T.StructField("question", T.StringType(), False),
        T.StructField("top_k", T.IntegerType(), False),
        T.StructField("doc_ids", T.ArrayType(T.StringType()), False),
        T.StructField("chunk_ids", T.ArrayType(T.IntegerType()), False),
        T.StructField("latency_ms", T.DoubleType(), False),
        T.StructField("response_preview", T.StringType(), False),
    ]))
    # Sem append: unir com existente e OVERWRITE
    try:
        existing = spark.table(TABLE_AUDIT)
        out = existing.unionByName(df, allowMissingColumns=True)
    except Exception:
        out = df
    out.write.mode(OVERWRITE_MODE).format("delta").saveAsTable(TABLE_AUDIT)


In [0]:
from datetime import datetime, timezone
from openai import OpenAI

# ask() com datetime correto e logs na auditoria
def ask(
    question: str,
    provider: str="openai",
    model: str=None,
    k: int=TOP_K_DEFAULT,
    temperature: float=0.0
) -> Dict[str, Any]:
    
    t0 = time.time()
    
    # Recupera chunks relevantes
    passages = retrieve(question, k=k)
    prompt = build_prompt(question, passages)

    # Chama o LLM escolhido
    answer = call_llm(provider, prompt, model=model, temperature=temperature)

    t1 = time.time()

    # Auditoria
    audit_log(
        provider,
        model or "",
        question,
        k,
        passages,
        (t1 - t0)*1000.0,
        answer
    )

    return {
        "provider": provider,
        "model": model,
        "question": question,
        "top_k": k,
        "passages_used": [
            {"i": i, "doc_id": d, "chunk_id": c}
            for i,(d,c,_) in enumerate(passages)
        ],
        "answer": answer
    }

# ConversaÃ§Ã£o interativa
print("ðŸ¤– Agente RAG ativado! Para sair, digite: sair")

provider = input("Escolha o provedor (openai/anthropic/gemini/deepseek): ").strip().lower()
if provider not in ["openai","anthropic","gemini","deepseek"]:
    print("Provedor invÃ¡lido, usando openai.")
    provider = "openai"

model = input("Modelo (pressione Enter para padrÃ£o): ").strip() or None

try:
    temperature = float(input("Temperatura (0â€“1, padrÃ£o 0): ") or 0)
except:
    temperature = 0

print(f"\nâœ… Provider: {provider}")
print(f"âœ… Modelo: {model or 'padrÃ£o'}")
print(f"âœ… Temperatura: {temperature}")
print("âœ… RAG carregado")
print("\nComece a conversar!\n")

while True:
    question = input("VocÃª: ")
    if question.lower() in ["sair","exit","quit"]:
        print("ðŸ‘‹ Encerrando conversa")
        break
    
    res = ask(question, provider=provider, model=model, temperature=temperature)
    print(f"\nðŸ¤– IA ({provider}):\n{res['answer']}\n")
