In [35]:
import openai
import os
from docx import Document
from chromadb import PersistentClient
from chromadb.utils import embedding_functions
from chromadb.config import Settings
from spacy import load

In [None]:
# Separe os tokens em partes de max_tokens
def separar_tokens(tokens, max_tokens):
    lista_nova = []
    posicao = 0
    posicao_old = 0
    contador = 0

    while posicao < len(tokens)-1:
        contador+=1
        tcks = tokens[posicao_old:max_tokens*contador]
        for i, x in enumerate(tcks):
            if x == "\n" or i == len(tcks) -1:
                posicao = i + posicao_old
        lista_nova.append(tokens[posicao_old:posicao])
        posicao_old = posicao

    return lista_nova

def separar_sentencas(textos):
    nlp = load("pt_core_news_lg")
    docs = [nlp(t) for t in textos]
    sentencas = []
    for doc in docs:
        tokens = []
        for token in doc:
            if not token.is_stop and not token.is_punct:
                tokens.append(token.lemma_)
        sentencas.append(" ".join(tokens))
    return sentencas

In [43]:
def ler_arquivo(caminho:str) -> str:
    try:
        with open(caminho, "r") as file:
            return file.read().strip()
    except FileNotFoundError:
        print(f"O arquivo '{caminho}' não foi encontrado.")
        return ""
    
def separar_sentencas(textos) -> list[str]:
    nlp = load("pt_core_news_lg")
    docs = [nlp(t) for t in textos]
    sentencas = []
    for doc in docs:
        tokens = []
        for token in doc:
            if not token.is_stop and not token.is_punct:
                tokens.append(token.lemma_)
        sentencas.append(" ".join(tokens))
    return sentencas
    
def preprocessar_docx(doc, tradutor=None) -> list[str]:
        documento = '\n'.join(parag.text for parag in doc.paragraphs)
        textos = documento.split("####")
        textos = separar_sentencas(textos)

        if tradutor:
            textos_traduzidos = []
            for doc in textos:
                textos_traduzidos.append(tradutor(doc)["response"]["choices"][0]["message"]["content"])
            
            return textos_traduzidos
        else:
            return textos

In [24]:
doc = Document("documentos/Clinica dente saudavel.docx")
preprocessar_docx(doc)

['Sobre a Clínica\nA Clínica Dente Saudável foi fundada em 2015 e é especializada em odontologia Preventiva.\nEstamos localizados na R. Dr. Joaquim de Souza Campos Júnior, 145 - Bonfim, Campinas - SP, 13070-718.\nNosso horário de atendimento é de segunda a sexta, das 8h às 18h, e aos sábados das 8h ao meio-dia.\nAceitamos os planos odontológicos Uniodonto, Sorridente e Saúde Bucal.\n\n',
 '\n\nEspecialidades\n•\tDentística - tratamentos como restaurações, clareamento dental, aparelho ortodôntico.\n•\tPeriodontia - tratamento de gengivite, periodontite e doenças na gengiva.\n•\tEndodontia - tratamentos de canal.\n•\tOdontopediatria - odontologia focada no público infantil.\n',
 '\n\nProcedimentos\n•\tConsulta inicial\n•\tLimpeza\n•\tRestauração dentária\n•\tRaspagem supragengival e subgengival\n•\tFacetas dentais\n•\tClareamento dental\n•\tAparelho ortodôntico\n•\tCirurgias orais menores\n•\tExtrações dentárias simples e complexas\n•\tTratamento de canal\n',
 '\n\nSobre o Consultório\nN

In [48]:
class ChromaDBManager:
    def __init__(self, path:str, collection_name:str, ef):
        self.client = PersistentClient(path=path, settings=Settings(allow_reset=True))
        self.collection = self.client.get_or_create_collection(name=collection_name, embedding_function=ef)
    
    def get_max_id(self) -> int:
        ids_str = self.collection.get()["ids"]
        
        return max([int(num) for num in ids_str]) + 1 if ids_str else 0

    def ler_todos_docxs(self, path:str, tradutor=None) -> list[str]:
        sentences = []

        for filename in os.listdir(path):
            if filename.endswith(".docx"):
                doc = Document(os.path.join(path, filename))
                preprocess = preprocessar_docx(doc, tradutor)
                sentences.extend(preprocess)

        if sentences==[]:
            print("Não há arquivos .docx na pasta")
            return False
        
        sentences_ids = [str(i) for i in range(len(sentences))]
        self.collection.add(documents=sentences, ids=sentences_ids)

    def query(self, query_texts:str, n_results:int = 5) -> dict:
        return self.collection.query(query_texts=query_texts, n_results=n_results)

In [49]:
class Assistant:
    def __init__(self, nome_da_empresa:str, api_key_path:str = "credentials/apiKey", pasta_database:str = "dataBase", colecao:str = "colecao1"):
        self.nome_da_empresa = nome_da_empresa
        self.pasta_database = pasta_database
        self.colecao = colecao
        self.openai_api_key = ler_arquivo(api_key_path)
        self.historico_atual = []
        self.openai_ef = embedding_functions.OpenAIEmbeddingFunction(api_key=self.openai_api_key, model_name="text-embedding-ada-002")
        self.db_manager = ChromaDBManager(path=pasta_database, collection_name=colecao, ef=self.openai_ef)

    def enviar_gpt(self, system_role:str, database:str, quest:str, collection_response) -> dict:
        openai.api_key = self.openai_api_key

        historico = self.historico_atual[-20:]
        historico_string = ""
        for dict in historico:
            if dict["role"] == "user":
                historico_string += f"User: {dict['content']}\n"
            if dict["role"] == "assistant":
                historico_string += f"Assistant: {dict['content']}\n"

        historico_string = separar_sentencas([historico_string])

        prompt = [
            {"role": "system", "content": system_role},
            {"role": "user", "content": f"'database':\n###\n{database}\n###\n\n 'historico':\n###\n{historico_string}\n###\n\n Pergunta do usuário: {quest}\n"}
        ]

        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=prompt,
            temperature=0.6,
            max_tokens=150,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0.1
        )
        
        return {'response': response, 'prompt': prompt, 'collection_response': collection_response}
    
    def gpt_tradutor_en(self, pergunta:str) -> dict:
        openai.api_key = self.openai_api_key
        prompt=[{"role": "system","content": "Por favor traduza para o inglês tudo que o usuário enviar. E se o usuário enviar algo em inglês repita exatamente o que ele disse."},
                {"role": "user","content": "I like guarana, and you?"},
                {"role": "assistant","content": "I like guarana, and you?"},
                {"role": "user","content": "O que é ChromaDB?"},
                {"role": "assistant","content": "What is ChromaDB?"},
                {"role": "user","content": pergunta}]

        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=prompt,
            temperature=0.5,
            max_tokens=100,
            top_p=1,
            frequency_penalty=1,
            presence_penalty=0
        )

        return {'response': response, 'prompt': prompt}

    def query(self, pergunta:str, traduzir:bool=False) -> dict:
        if traduzir:
            pergunta_eng = self.gpt_tradutor_en(pergunta=pergunta)["response"]["choices"][0]["message"]["content"]
            
        collection_response = self.db_manager.query(query_texts= pergunta if not traduzir else pergunta_eng, n_results=5)
        collection_database = collection_response["documents"][0]
        system_role = (
            f"Você é um assistente inteligente, educado e dedicado da {self.nome_da_empresa}, que responde perguntas do usuário. "
            "Você possui 'dataBase' para buscar informações e 'historico' para lembrar da conversa com o usuário. "
            "Se você não souber como responder o usuário, responda com: 'Infelizmente essa informação não está na minha base de dados.' "
            "Você deve responder usando um máximo de 150 tokens. "
            "Agora, com base nos seguintes dados, responda a pergunta do usuário: "
        )

        return self.enviar_gpt(system_role=system_role, database=collection_database, quest=pergunta, collection_response=collection_response)
    
    def atualizar_documentos(self, traduzir:bool = False):
        print("\nResetando cliente...")
        self.db_manager.client.reset()
        print("\nCliente resetado.")
        
        print("\nAtualizando cliente...")
        self.db_manager = ChromaDBManager(path=self.pasta_database, collection_name=self.colecao, ef=self.openai_ef)
        print("\nCliente atualizado.")
        
        print("\nLendo todos os documentos...")
        self.db_manager.ler_todos_docxs("documentos", self.gpt_tradutor_en if traduzir else None)
        print("\nDocumentos atualizados.")


    def salvar_historico_atual(self, historico):
        self.historico_atual.extend(historico)

In [50]:
cliente = PersistentClient(path="dataBase")

assistente = Assistant(nome_da_empresa="Clinica Dente Saudavel")
# chroma = ChromaDBManager("dataBase", "colecao2")
# chroma2 = ChromaDBManager("dataBase", "colecao1")

In [39]:
assistente.atualizar_documentos(traduzir=False)


Resetando cliente...

Cliente resetado.

Atualizando cliente...

Cliente atualizado.

Lendo todos os documentos...

Documentos atualizados.


In [51]:
assistente.db_manager.collection.get()

{'ids': ['0', '1', '2', '3'],
 'embeddings': None,
 'metadatas': [None, None, None, None],
 'documents': ['Clínica \n Clínica Dente Saudável fundar 2015 especializar odontologia Preventiva \n estar localizado R. Dr. Joaquim Souza Campos Júnior 145 Bonfim Campinas SP 13070-718 \n horário atendimento 8h 18h sábado 8h meio-dia \n Aceitamos plano odontológico Uniodonto Sorridente Saúde Bucal \n\n',
  '\n\n Especialidades \n \t Dentística tratamento restauração clareamento dental aparelho ortodôntico \n \t periodontia tratamento gengivite periodontite doença gengiva \n \t Endodontia tratamento canal \n \t Odontopediatria odontologia focar público infantil \n',
  '\n\n Procedimentos \n \t consulta inicial \n \t Limpeza \n \t restauração dentário \n \t Raspagem supragengival subgengival \n \t Facetas dentais \n \t Clareamento dental \n \t aparelho ortodôntico \n \t cirurgia oral pequeno \n \t Extrações dentário simples complexo \n \t tratamento canal \n',
  '\n\n Consultório \n consultório co

In [32]:
assistente.db_manager.collection.query(query_texts="Novidades?", n_results=2)

{'ids': [['1', '3']],
 'distances': [[0.40760153661377624, 0.4114927757845372]],
 'metadatas': [[None, None]],
 'embeddings': None,
 'documents': [['\n\n Especialidades \n \t Dentística tratamento restauração clareamento dental aparelho ortodôntico \n \t periodontia tratamento gengivite periodontite doença gengiva \n \t Endodontia tratamento canal \n \t Odontopediatria odontologia focar público infantil \n',
   '\n\n Consultório \n consultório contar 5 moderno sala atendimento equipar tecnologia ponta foco técnica minimamente invasivo humanizar \n Oferecemos tv streaming sala conforto paciente equipe compor 5 dentista especialista 3 auxiliar 2 recepcionista treinar receber eficiência cortesia \n']]}

In [58]:
pergunta = "Quanto tempo mais ou menos?"
process_return = assistente.query(pergunta)
dict_historico = [
            {"role": "user","content": pergunta},
            {"role": "assistant","content": process_return["response"]["choices"][0]["message"]["content"]}
        ]
assistente.salvar_historico_atual(dict_historico)
assistente.historico_atual

Number of requested results 5 is greater than number of elements in index 4, updating n_results = 4


[{'role': 'user', 'content': 'Qual é a melhor para tratar cárie?'},
 {'role': 'assistant',
  'content': 'A melhor especialidade para tratar cárie é a Dentística, que realiza o tratamento de restauração dentária.'},
 {'role': 'user', 'content': 'Fica longe de Mogi Mirim?'},
 {'role': 'assistant',
  'content': 'A Clínica Dente Saudável está localizada em Campinas, SP. Portanto, fica um pouco distante de Mogi Mirim.'},
 {'role': 'user', 'content': 'Quanto tempo mais ou menos?'},
 {'role': 'assistant',
  'content': 'Infelizmente, não tenho informações suficientes para responder a sua pergunta.'}]

In [61]:
historico = assistente.historico_atual[-20:]
historico_string = ""
for dict in historico:
    if dict["role"] == "user":
        historico_string += f"User: {dict['content']}\n"
    if dict["role"] == "assistant":
        historico_string += f"Assistant: {dict['content']}\n"

historico_string

'User: Qual é a melhor para tratar cárie?\nAssistant: A melhor especialidade para tratar cárie é a Dentística, que realiza o tratamento de restauração dentária.\nUser: Fica longe de Mogi Mirim?\nAssistant: A Clínica Dente Saudável está localizada em Campinas, SP. Portanto, fica um pouco distante de Mogi Mirim.\nUser: Quanto tempo mais ou menos?\nAssistant: Infelizmente, não tenho informações suficientes para responder a sua pergunta.\n'

In [62]:
separar_sentencas([historico_string])

['User bom tratar cárie \n Assistant bom especialidade tratar cárie Dentística realizar tratamento restauração dentário \n User ficar Mogi Mirim \n Assistant Clínica Dente Saudável localizado Campinas SP ficar distante Mogi Mirim \n User \n Assistant infelizmente informação suficiente responder pergunta \n']