In [23]:
import os
from dotenv import load_dotenv
import google.generativeai as genai
from google.api_core.exceptions import ResourceExhausted
import time
import json
from json import JSONDecodeError
import re
from tqdm import tqdm

In [2]:
# Carrega as variáveis de ambiente do arquivo .env
load_dotenv()

# Acessa a chave de API do Gemini
GEMINI_API_KEY  = os.getenv("GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)

# Define o modelo
generation_config = {
  "temperature": 0.3,
}

safety_settings={
    'HATE': 'BLOCK_NONE',
    'HARASSMENT': 'BLOCK_NONE',
    'SEXUAL' : 'BLOCK_NONE',
    'DANGEROUS' : 'BLOCK_NONE'
    }


In [3]:
def load_diseases_from_json(file_path):
    """
    Carrega as doenças de um arquivo JSON, que pode ser uma lista ou um dicionário.

    Args:
        file_path (str): O caminho para o arquivo JSON.

    Returns:
        list: Uma lista contendo as doenças.
    """
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            data = json.load(file)

        if isinstance(data, list):  # Verifica se é uma lista
            return data  # Se for lista, retorna diretamente
        elif isinstance(data, dict):  # Verifica se é um dicionário
            return list(data.keys())  # Se for dicionário, retorna as chaves

    except FileNotFoundError:
        print(f"Erro: Arquivo '{file_path}' não encontrado.")
        return []
    except json.JSONDecodeError:
        print(f"Erro: Falha ao decodificar o JSON em '{file_path}'.")
        return []


In [4]:
diseases_medline = load_diseases_from_json('empty_symptoms_medline.json')
print(len(diseases_medline))
diseases_who = load_diseases_from_json('empty_symptoms_who.json')
print(len(diseases_who))

90
151


In [5]:
def merge_disease_lists(list1, list2):
    """
    Mescla duas listas de doenças, priorizando a primeira lista em caso de duplicatas.

    Argumentos:
        list1 (list): A primeira lista de doenças.
        list2 (list): A segunda lista de doenças.

    Retorna:
        list: Uma lista mesclada de doenças, priorizando a primeira lista em caso de duplicatas.
    """
    # Use um conjunto para verificação rápida de duplicatas
    disease_set = set(list1)
    merged_list = list1.copy()  # Comece com a primeira lista
    
    # Adicione doenças da segunda lista se elas não estiverem na primeira lista
    for disease in list2:
        if disease not in disease_set:
            merged_list.append(disease)
            disease_set.add(disease)
    
    return merged_list



In [6]:
diseases_raw = merge_disease_lists(diseases_medline, diseases_who)
len(diseases_raw)

240

In [7]:
model_diseases = genai.GenerativeModel(model_name="gemini-1.0-pro",      
                              generation_config=generation_config,
                              safety_settings=safety_settings)

In [8]:
def is_disease(disease: str, model_diseases=model_diseases) -> bool:
    """
    Verifica se um determinado nome corresponde a uma doença, usando um modelo especificado ou um padrão.

    Args:
        nome (str): O nome a ser verificado.
        modelo_doencas: O modelo de classificação de doenças (opcional).
            Se None, tenta usar um 'modelo_doencas' global, se disponível.

    Returns:
        bool: True se o nome for uma doença, False caso contrário.
    """

    try:
        prompt = f"""As an expert medical professional, is "{disease}" a recognized disease?
        Output only:
            1: if the name is identified as a disease.
            0: otherwise.

        Examples:
            Antibiotic Resistance: 0
            vaccine: 0
            Anaemia: 1

        Your turn:
            {disease}:
        """
        resposta = model_diseases.generate_content(prompt)
        match = re.search(r"[01]", resposta.text)  
        if match:
            return bool(int(match.group(0)))  # Converte '1' para True, '0' para False
        else:
            print(f"Formato de resposta inesperado para '{disease}': {resposta.text}")
            return False

    except (AttributeError, TypeError, ValueError) as e:
        print(f"Erro ao processar '{disease}': {e}")
        return False

In [12]:
def fetch_valid_diseases(diseases_raw):
    delay = 10  # Initial delay
    max_retries = 3  # Maximum number of retries
    retries = 0
    diseases = []
    for disease in tqdm(diseases_raw, desc="Fetching Valid Diseases", unit="disease"):
        while retries < max_retries:
            try:
                if is_disease(disease):
                    diseases.append(disease)
                break  
            except ResourceExhausted:
                retries += 1
                delay *= 2  
                time.sleep(delay)  
        retries = 0  
        time.sleep(delay)  
    return diseases

In [13]:
diseases = fetch_valid_diseases(diseases_raw)

Fetching Valid Diseases: 100%|██████████| 10/10 [01:47<00:00, 10.74s/disease]


In [24]:
def save_to_json(data, filename, indent=4):
    """
    Salva um dicionário em um arquivo JSON.

    Args:
        data: O dicionário a ser salvo.
        filename: O nome do arquivo JSON.
    """
    try:
        with open(filename, "w") as json_file:
            json.dump(data, json_file, indent=indent)
    except TypeError as e:
        raise TypeError(f"Erro ao serializar dados para JSON: {e}")
    except OSError as e:
        raise OSError(f"Erro ao salvar o arquivo JSON: {e}")
    except JSONDecodeError as e:
        raise JSONDecodeError(f"Erro ao codificar dados em JSON: {e}")

In [25]:
save_to_json(diseases, "diseases_gemini.json", indent=4)

In [26]:
def create_few_shot_from_json(file_path, limit=10):
    """
    Carrega exemplos few-shot a partir de um arquivo JSON, com limite opcional.

    Args:
        file_path: O caminho para o arquivo JSON contendo os dados.
        limit: O número máximo de exemplos a serem retornados (padrão: 10).

    Returns:
        Uma lista de strings formatadas para few-shot learning, limitada ao número especificado.
    """
    try:
        with open(file_path, 'r') as file:
            data_dict = json.load(file)

        few_shot_examples = [
            f"{tipo}: {texto}"
            for key, values in data_dict.items()
            for tipo, texto in [("input", key), ("output", ", ".join(map(str, values)))]
        ][:limit]  # Limita a lista aos primeiros 'limit' itens
        return few_shot_examples
    except FileNotFoundError:
        print(f"Arquivo não encontrado: {file_path}")
        return []
    except json.JSONDecodeError:
        print(f"Erro ao decodificar o JSON em: {file_path}")
        return []

In [27]:
# Uso da função
few_shot = create_few_shot_from_json('diseases_symptoms_medline.json')

In [29]:
instruction_symptoms = """You are an expert medical professional, equipped with comprehensive knowledge of human diseases and their symptoms
You create correct and simple symptoms list that is easy to understand.
Without title
Without comments
"""

model_symptoms = genai.GenerativeModel(model_name="gemini-1.5-pro",
                              generation_config=generation_config,
                              safety_settings=safety_settings,
                              system_instruction=instruction_symptoms)

In [28]:
def generate_symptoms_with_ai(few_shot, diseases, model_symptoms=model_symptoms, delay=60):
    """
    Gera sintomas para doenças usando um modelo de IA.

    Args:
        model: O modelo de IA a ser usado.
        few_shot: Exemplos few-shot para o modelo.
        diseases: Uma lista de doenças para as quais gerar sintomas.
        delay: Tempo de espera entre as chamadas à API (em segundos).

    Returns:
        dict: Um dicionário onde as chaves são as doenças e os valores são listas de sintomas.
    """

    diseases_symptoms_ai = {}

    for disease in tqdm(diseases, desc="Generating Symptoms", unit="disease"):
        output_text = few_shot + [f'input: {disease}', 'output:']
        
        try:
            response = model_symptoms.generate_content(output_text)
            if not response.parts or len(response.parts) != 1 or "text" not in response.parts[0]:
                raise ValueError("A resposta não contém uma parte de texto válida.")
            
            symptoms = re.sub(r"[^a-zA-Z\s,]", "", response.text)
            symptoms_list = [symptom.strip() for symptom in symptoms.split(', ')]
            diseases_symptoms_ai[disease] = symptoms_list
        except (AttributeError, TypeError, ValueError) as e:
            print(f"Erro ao gerar sintomas para '{disease}': {e}")
        
        time.sleep(delay)

    return diseases_symptoms_ai

In [20]:
# Uso da função (exemplo)
diseases_symptoms_ai = generate_symptoms_with_ai(few_shot=few_shot, diseases=diseases)

Generating Symptoms: 100%|██████████| 7/7 [07:12<00:00, 61.74s/disease]


In [22]:
# Salva o dicionário em um arquivo JSON
save_to_json(diseases_symptoms_ai, "diseases_symptoms_gemini.json")