# setup

In [None]:
PATH_ARQUIVOS = r"data"
LANG = "por" # por eng
persist_directory = "chroma/chroma_db"  # Diretório onde o banco de dados será salvo

## torch

In [None]:
import torch

# setting device on GPU if available, else CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)
print()

#Additional Info when using cuda
if device.type == 'cuda':
    print(torch.cuda.get_device_name(0))
    print('Memory Usage:')
    print('Allocated:', round(torch.cuda.memory_allocated(0)/1024**3,1), 'GB')
    print('Cached:   ', round(torch.cuda.memory_reserved(0)/1024**3,1), 'GB')

## docling

In [None]:
from docling.datamodel.base_models import InputFormat
from docling_core.types.doc import ImageRefMode
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode, EasyOcrOptions, TesseractOcrOptions, OcrMacOptions
from docling.datamodel.settings import settings

IMAGE_RESOLUTION_SCALE = 2.0

# Define pipeline options for PDF processing
pipeline_options = PdfPipelineOptions(
    do_table_structure=True,  # Enable table structure detection
    do_ocr=True,  # Enable OCR
    # full page ocr and language selection
    #ocr_options=EasyOcrOptions(force_full_page_ocr=True, lang=["en"]),  # Use EasyOCR for OCR
    ocr_options=TesseractOcrOptions(force_full_page_ocr=True, lang=[LANG]),  # Uncomment to use Tesseract for OCR
    #ocr_options = OcrMacOptions(force_full_page_ocr=True, lang=['en-US']),
    table_structure_options=dict(
        do_cell_matching=False,  # Use text cells predicted from table structure model
        mode=TableFormerMode.ACCURATE  # Use more accurate TableFormer model
    ),
    generate_page_images=True,  # Enable page image generation
    generate_picture_images=True,  # Enable picture image generation
    images_scale=IMAGE_RESOLUTION_SCALE, # Set image resolution scale (scale=1 corresponds to a standard 72 DPI image)
)

# Initialize the DocumentConverter with the specified pipeline options
doc_converter_global = DocumentConverter(
    format_options={
        InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options)
    }
)

## Testes

### docling test

In [None]:
from pathlib import Path
from PyPDF2 import PdfReader, PdfWriter

# Read the PDF file
reader = PdfReader(r"data\pdfs\WEF_The_Global_Cooperation_Barometer_2024.pdf")
writer = PdfWriter()

OUTPUT = Path(r"data\pdfs\page_14.pdf")

# Ensure the requested page exists in the document
if len(reader.pages) >= 14:
    # Add page 14 (index 13 since it's 0-based)
    writer.add_page(reader.pages[13])
else:
    print("The PDF does not contain 14 pages.")

# Save the extracted page to a new PDF file
with OUTPUT.open('wb') as output_pdf:
    writer.write(output_pdf)

print(f"Page 14 has been saved to {OUTPUT}")

In [None]:
result = doc_converter_global.convert(OUTPUT)
display(result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED))

In [None]:
if OUTPUT.exists():
    OUTPUT.unlink()
    print(f"Arquivo {OUTPUT} excluído com sucesso.")
else:
    print(f"Arquivo {OUTPUT} não existe.")

### html

In [None]:
file = Path(r"data\html\python - How to run DeepSeek model locally - Stack Overflow.html")

result = doc_converter_global.convert(file)
display(result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED))

### diversos

In [None]:
file = Path(r"data\docs\word.docx")

result = doc_converter_global.convert(file)
display(result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED))

In [None]:
file = Path(r"data\docs\xlsx.xlsx")

result = doc_converter_global.convert(file)
display(result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED))

### pytesseract test

In [None]:
import pytesseract
from PIL import Image

#pytesseract.pytesseract.tesseract_cmd = r"E:\programas\ia\Tesseract-OCR\tesseract.exe"

image_path = r"data\imagens\Captura de tela 2025-03-13 085540.png"
image = Image.open(image_path)

extracted_text = pytesseract.image_to_string(image, lang=LANG)
print(extracted_text)

### Chunking

#### direct chunking

In [None]:
result = doc_converter_global.convert(Path(r"data\pdfs\monopoly.pdf"))
display(result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED))

In [None]:
from docling.document_converter import DocumentConverter
from docling.chunking import HybridChunker

#conv_res = DocumentConverter().convert(FIRST_10_PAGES)
#doc = conv_res.document

chunker = HybridChunker(tokenizer="BAAI/bge-small-en-v1.5")  # set tokenizer as needed
chunk_iter = chunker.chunk(result.document)

# Convert the iterator to a list to count the chunks
chunks = list(chunk_iter)
num_chunks = len(chunks)

# Print the number of chunks
print(f"The document has been divided into {num_chunks} chunks.")

In [None]:
print(chunks[1].text)

#### chroma chunking

obs: precisa do ollama executando `ollama serve`

In [None]:
from langchain_ollama import OllamaEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma

In [None]:
# display(result.document.export_to_markdown())
from langchain_core.documents import Document

local_path = r"data\pdfs\monopoly.pdf"
result = doc_converter_global.convert(Path(local_path))
documento = Document(page_content=result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED), metadata={"source": local_path})

print(documento)

In [None]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=7500, chunk_overlap=100)
chunks = text_splitter.split_documents([documento])

In [None]:
print(f"num chunks: {len(chunks)}")
print(chunks[0])

# Separar documentos

In [None]:
import os

def separar_arquivos(diretorio):
    """
    Varre um diretório e suas subpastas, separando arquivos de imagem de outros tipos de arquivo.

    Args:
        diretorio (str): O caminho do diretório a ser varrido.

    Returns:
        tuple: Uma tupla contendo duas listas: imagens e documentos.
    """

    imagens = []
    documentos = []

    extensoes_imagens = ['.jpg', '.jpeg', '.png', '.gif', '.bmp']  # Adicione outras extensões se necessário

    for raiz, subpastas, arquivos in os.walk(diretorio):
        for arquivo in arquivos:
            caminho_arquivo = os.path.join(raiz, arquivo)
            nome_arquivo, extensao = os.path.splitext(arquivo)
            extensao = extensao.lower()

            if extensao in extensoes_imagens:
                imagens.append(caminho_arquivo) #adiciona o caminho completo
            else:
                documentos.append(caminho_arquivo) #adiciona o caminho completo

    return imagens, documentos

imagens, documentos = separar_arquivos(PATH_ARQUIVOS)

# print("Imagens:")
# for imagem in imagens: print(imagem)

# print("\nDocumentos:")
# for documento in documentos: print(documento)

# Document chunks

In [None]:
from langchain_ollama import OllamaEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma

# display(result.document.export_to_markdown())
from langchain_core.documents import Document

# img
import pytesseract
from PIL import Image
#pytesseract.pytesseract.tesseract_cmd = r"E:\programas\ia\Tesseract-OCR\tesseract.exe"
#end img

text_splitter = RecursiveCharacterTextSplitter(chunk_size=7500, chunk_overlap=100)

#local_path = r"data\pdfs\monopoly.pdf"
def get_chunks_doc(local_path):
    result = doc_converter_global.convert(Path(local_path))
    documento = Document(page_content=result.document.export_to_markdown(image_mode=ImageRefMode.EMBEDDED), metadata={"source": local_path})
    chunks = text_splitter.split_documents([documento])
    return chunks


def get_chunks_image(local_path):
    image = Image.open(local_path)
    extracted_text = pytesseract.image_to_string(image, lang=LANG)

    documento = Document(page_content=extracted_text, metadata={"source": local_path})
    chunks = text_splitter.split_documents([documento])
    return chunks

# Chromadb

obs: precisa do ollama executando `ollama serve`

## backup

In [None]:
# no batch

# def check_and_add_document(collection, document, page_index, embedding_model):
#     """Verifica se um documento já existe na coleção e o adiciona se não existir."""
#     document_id = generate_id(document, page_index)

#     # Verifica se o ID já existe na coleção
#     results = collection.get(ids=[document_id])
#     if results['ids'] and document_id in results['ids']:
#         print(f"Documento com ID {document_id} já existe na coleção.")
#         return

#     # Adiciona o documento à coleção
#     embedding = embedding_model.embed_documents([document.page_content])[0]
#     collection.add(documents=[document.page_content], ids=[document_id], embeddings=[embedding], metadatas=[document.metadata])
#     print(f"Documento com ID {document_id} adicionado à coleção.")

# # exemplo inline de chunks
# # chunks = [
# #     Document(page_content="This is a document about pineapple", metadata={"source": "file1.txt"}),
# #     Document(page_content="This is a document about oranges", metadata={"source": "file2.txt"}),
# #     Document(page_content="Another document about pineapple", metadata={"source": "file1.txt"}),
# #     Document(page_content="New document about oranges", metadata={"source": "file2.txt"}),
# # ]

# embedding_model = OllamaEmbeddings(model="nomic-embed-text")
# # Inicializa o cliente ChromaDB
# chroma_client = chromadb.PersistentClient(path=persist_directory, settings=Settings(allow_reset=False))

# collection_name = "local-rag"

# def chroma_indexing(chunks, collection_name = "local-rag"):
#     # Obtém ou cria a coleção
#     collection = chroma_client.get_or_create_collection(name=collection_name)
#     print("Coleção carregada.")

#     # Itera sobre os chunks e os adiciona à coleção, verificando se já existem
#     for i, chunk in enumerate(chunks):
#         check_and_add_document(collection, chunk, i, embedding_model)

#     # Exemplo para verificar o ID do primeiro chunk
#     if chunks:
#         first_chunk_id = generate_id(chunks[0], 0)
#         print(f"ID do primeiro chunk: {first_chunk_id}")

#     # # Exemplo de query para verificar se os dados foram adicionados corretamente
#     # query_embedding = embedding_model.embed_query("fruit information")
#     # query_results = collection.query(query_embeddings=[query_embedding], n_results=2)
#     # print("\nResults of query:")
#     # for result in query_results['documents'][0]:
#     #     print(result)
#     return collection

# batch

## chroma batch

In [None]:
import hashlib
import os
import chromadb
from chromadb.config import Settings
from langchain.docstore.document import Document
#from langchain_community.embeddings import OllamaEmbeddings
from langchain_ollama import OllamaEmbeddings 
from pathlib import Path

def generate_id_filename(filename, page_index):
    filename = os.path.basename(filename)
    base_id = hashlib.sha256(filename.encode()).hexdigest()
    return f"{base_id}_{page_index}"    

def generate_id(document, page_index):
    """Gera um ID único baseado no nome do arquivo e no índice da página."""
    source = document.metadata['source']
    return generate_id_filename(os.path.basename(source), page_index)



embedding_model = OllamaEmbeddings(model="nomic-embed-text")
chroma_client = chromadb.PersistentClient(path=persist_directory, settings=Settings(allow_reset=False))
COLLECTION_NAME = "local-rag"

def chroma_indexing(path_arquivos=PATH_ARQUIVOS, collection_name=COLLECTION_NAME, embedding_model=embedding_model, chroma_client=chroma_client):
    """Indexa chunks em lote no ChromaDB."""

    imagens, documentos = separar_arquivos(path_arquivos)

    collection = chroma_client.get_or_create_collection(name=collection_name)

    for imagem in imagens:
        id_aux = generate_id_filename(imagem, 0)
        results = collection.get(ids=[id_aux])
        if results['ids'] and id_aux in results['ids']:
            print(f"Documento com ID {id_aux} | {os.path.basename(imagem)} já existe na coleção.")
            continue
        
        chroma_indexing_batch(get_chunks_image(imagem), collection, embedding_model)

    for documento in documentos:
        id_aux = generate_id_filename(documento, 0)
        results = collection.get(ids=[id_aux])
        if results['ids'] and id_aux in results['ids']: 
            print(f"Documento com ID {id_aux} | {os.path.basename(documento)} já existe na coleção.")
            continue

        chroma_indexing_batch(get_chunks_doc(documento), collection, embedding_model)

    return collection
    

def chroma_indexing_batch(chunks, collection=None, embedding_model=embedding_model):
    """Indexa chunks em lote no ChromaDB."""

    if not chunks or not collection:
        print('Sem chunks e/ou collection is null')
        return

    documents_to_add = []
    ids_to_add = []
    embeddings_to_add = []
    metadatas_to_add = []

    for i, chunk in enumerate(chunks):
        document_id = generate_id(chunk, i)
        results = collection.get(ids=[document_id])

        if results['ids'] and document_id in results['ids']:
            print(f"Documento com ID {document_id} já existe na coleção.")
            continue

        embedding = embedding_model.embed_documents([chunk.page_content])[0]

        documents_to_add.append(chunk.page_content)
        ids_to_add.append(document_id)
        embeddings_to_add.append(embedding)
        metadatas_to_add.append(chunk.metadata)

    if documents_to_add:
        collection.add(documents=documents_to_add, ids=ids_to_add, embeddings=embeddings_to_add, metadatas=metadatas_to_add)
        print(f"Adicionados {len(documents_to_add)} documentos em lote.")

    if chunks:
        first_chunk_id = generate_id(chunks[0], 0)
        print(f"ID do primeiro chunk: {first_chunk_id}")


## Chroma indexing

In [None]:
collection = chroma_indexing(PATH_ARQUIVOS, COLLECTION_NAME, embedding_model, chroma_client)

In [None]:
# collection = chroma_client.get_or_create_collection(name=COLLECTION_NAME)
results = collection.get(limit=5)
display(results)

## backup

In [None]:
# imagens, documentos = separar_arquivos(PATH_ARQUIVOS)
# for documento in documentos:
#     id_aux = generate_id_filename(imagem, 0)


# collection_name = "local-rag"

# for imagem in imagens:
#     chunks = get_chunks_image(imagem)
#     if (chunks is None or len(chunks) <= 0): continue
#     chroma_indexing(chunks, collection_name)

# for documento in documentos:
#     chunks = get_chunks_doc(documento)
#     if (chunks is None or len(chunks) <= 0): continue
#     chroma_indexing(chunks, collection_name)

# print("Documentos indexados")
#----------------------------------------------
# for imagem in imagens:
#     chunks = get_chunks_image(imagem)
#     if chunks:
#         chroma_indexing_batch(chunks, collection_name, embedding_model, chroma_client)

# for documento in documentos:
#     chunks = get_chunks_doc(documento)
#     if chunks:
#         chroma_indexing_batch(chunks, collection_name, embedding_model, chroma_client)

# print("Documentos indexados")
#----------------------------------------------
# chunks_list = []

# for imagem in imagens:
#     chunks = get_chunks_image(imagem)
#     if chunks: chunks_list.append(chunks)

# for documento in documentos:
#     chunks = get_chunks_doc(documento)
#     if chunks: chunks_list.append(chunks)

# chroma_indexing_batch(chunks_list, collection_name, embedding_model, chroma_client)

# print("Documentos indexados")

# Retrieval

In [None]:
from langchain.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_ollama.chat_models import ChatOllama
from langchain_core.runnables import RunnablePassthrough
from langchain.retrievers.multi_query import MultiQueryRetriever

In [None]:
# LLM from Ollama
local_model = "llama3.2"
local_model = "deepseek-r1"
llm = ChatOllama(model=local_model)

In [None]:
QUERY_PROMPT = PromptTemplate(
    input_variables=["question"],
    template="""Você é um assistente de modelo de linguagem de IA. Sua tarefa é gerar cinco
    versões diferentes da pergunta do usuário fornecida para recuperar documentos relevantes de
    um banco de dados vetorial. Ao gerar múltiplas perspectivas sobre a pergunta do usuário, seu
    objetivo é ajudar o usuário a superar algumas das limitações da pesquisa de similaridade 
    baseada em distância. Forneça essas perguntas alternativas separadas por quebras de linha.
    Pergunta original: {question}""",
)

In [None]:
from langchain_ollama import OllamaEmbeddings 
from langchain_chroma import Chroma

embedding_model = OllamaEmbeddings(model="nomic-embed-text")

#vectordb = Chroma(persist_directory=persist_directory, embedding_function=embedding_model)
vector_db = Chroma(
    client=chroma_client,
    collection_name=COLLECTION_NAME,
    embedding_function=embedding_model
)

retriever = MultiQueryRetriever.from_llm(
    vector_db.as_retriever(), 
    llm,
    prompt=QUERY_PROMPT
)

# RAG prompt
template = """Responda à pergunta com base SOMENTE no seguinte contexto:
{context}
Pergunta: {question}
"""

prompt = ChatPromptTemplate.from_template(template)

In [None]:
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

In [None]:
from IPython.display import display, Markdown
display(Markdown(chain.invoke("Como jogar Monopoly ?")))

In [None]:
display(Markdown(chain.invoke("Explique um Voice-Enabled AI e qual a conclusão dos autores ?")))

In [None]:
display(Markdown(chain.invoke("Como executar o DeepSeek com python local ?")))

# Excluir Coleção

In [None]:
# Delete all collections in the db
vector_db.delete_collection()
print(f"Coleção '{COLLECTION_NAME}' deletada.")