# 1. Anotador de Proteínas Hipotéticas

# 2. Instalação e configuração rápida

In [1]:
# Instala pacotes Python essenciais
!pip install -q requests pandas biopython bioservices tqdm openpyxl matplotlib seaborn

In [9]:
# Imports centrais (somente uma vez)
from google.colab import drive
from pathlib import Path
import subprocess, os, requests, pprint
import pandas as pd
import Bio
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from Bio.Seq import Seq
import matplotlib
import matplotlib.pyplot as plt
import bioservices, tqdm

In [3]:
# Opcional: instalar DIAMOND (defina INSTALL_DIAMOND True/False)
INSTALL_DIAMOND = True
if INSTALL_DIAMOND:
    try:
        subprocess.run(["apt-get","update"], check=True)
        subprocess.run(["apt-get","install","-y","diamond-aligner"], check=True)
    except Exception:
        url = "https://github.com/bbuchfink/diamond/releases/latest/download/diamond-linux64.tar.gz"
        subprocess.run(["wget","-q", url, "-O", "/tmp/diamond.tar.gz"], check=True)
        subprocess.run(["tar","-xzf","/tmp/diamond.tar.gz","-C","/tmp"], check=True)
        binpath = "/tmp/diamond"
        os.chmod(binpath, 0o755)
        subprocess.run(["mv", binpath, "/usr/local/bin/diamond"], check=True)
    subprocess.run(["diamond","version"], check=False)

In [4]:
# Monta Drive (se preciso) e mostra versões das bibliotecas usadas
drive.mount('/content/drive', force_remount=False)

print("requests", requests.__version__)
print("pandas", pd.__version__)
print("biopython", getattr(Bio, "__version__", "unknown"))
print("bioservices", getattr(bioservices, "__version__", "unknown"))
print("tqdm", getattr(tqdm, "__version__", "unknown"))
print("matplotlib", matplotlib.__version__)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
requests 2.32.4
pandas 2.2.2
biopython 1.86
bioservices unknown
tqdm 4.67.1
matplotlib 3.10.0


# 3. Parâmetros do usuário

In [5]:
# Parâmetros editáveis pelo usuário (ajuste conforme necessário)
FASTA_PATH = "/content/drive/MyDrive/protein_annotation/input_sequences.fasta"
OUTDIR = "/content/drive/MyDrive/protein_annotation/results"
RUN_DIAMOND = True
DIAMOND_DB = "/content/diamond_db/uniprot.dmnd"
UNIPROT_REST = "https://rest.uniprot.org"
TIMEOUT = 30  # segundos
BATCH_SIZE = 200

IDENTITY_THRESHOLD = 70.0  # porcentagem
COVERAGE_THRESHOLD = 70.0  # porcentagem
EVALUE_CUTOFF = 1e-5

MIN_LENGTH = 50
MAX_LENGTH = 5000

RUN_INTERPRO = False
RUN_SIGNALP = False
RUN_TMHMM = False

CACHE_DIR = f"{OUTDIR}/cache"
LOG_LEVEL = "INFO"

In [6]:
# Busca um nome no UniProt, permite selecionar hits e salva FASTA em FASTA_PATH
PAGE_SIZE = 20

query = input("Digite o nome da proteína para buscar no UniProt (ou pressione Enter para pular): ").strip()
if not query:
    print("Busca ignorada.")
else:
    params = {"query": query, "format": "json", "size": PAGE_SIZE, "fields": "accession,protein_name,organism_name,length"}
    try:
        resp = requests.get(f"{UNIPROT_REST}/uniprotkb/search", params=params, timeout=TIMEOUT)
        resp.raise_for_status()
    except Exception as e:
        print("Erro na busca UniProt:", e)
        raise SystemExit

    data = resp.json()
    results = data.get("results", [])
    if not results:
        print("Nenhum resultado encontrado para:", query)
    else:
        entries = []
        for r in results:
            acc = r.get("primaryAccession") or r.get("accession") or ""
            pname = ""
            pd_desc = r.get("proteinDescription", {}) or {}
            rn = pd_desc.get("recommendedName")
            if isinstance(rn, dict):
                full = rn.get("fullName")
                pname = full.get("value", "") if isinstance(full, dict) else str(full or "")
            if not pname:
                pname = pd_desc.get("submissionNames", [{}])[0].get("value", "") or r.get("uniProtkbId", "")
            org = (r.get("organism") or {}).get("scientificName", "") or r.get("organism", "")
            length = (r.get("sequence") or {}).get("length") or r.get("length", "")
            entries.append({"acc": acc, "name": pname, "organism": org, "length": length})

        for i, e in enumerate(entries, 1):
            print(f"{i}. {e['acc']} | {e['name']} | {e['organism']} | len={e['length']}")

        choice = input("Escolha número(s) separados por vírgula, 'a' para salvar todos, ou Enter para cancelar: ").strip().lower()
        if not choice:
            print("Operação cancelada.")
        else:
            if choice == "a":
                indices = list(range(1, len(entries) + 1))
            else:
                try:
                    indices = [int(x.strip()) for x in choice.split(",") if x.strip()]
                except ValueError:
                    print("Entrada inválida. Operação cancelada.")
                    indices = []

            if indices:
                # Salva as sequências selecionadas sobrescrevendo o arquivo (modo "w")
                os.makedirs(os.path.dirname(FASTA_PATH) or ".", exist_ok=True)
                saved = 0
                with open(FASTA_PATH, "w") as fh:
                  for idx in indices:
                    if 1 <= idx <= len(entries):
                      acc = entries[idx - 1]["acc"]
                      try:
                        fa = requests.get(f"{UNIPROT_REST}/uniprotkb/{acc}.fasta", timeout=TIMEOUT)
                        fa.raise_for_status()
                        fh.write(fa.text.rstrip() + "\n")
                        saved += 1
                      except Exception as e:
                          print(f"Falha ao baixar {acc}: {e}")
                    else:
                      print(f"Índice fora do intervalo: {idx}")
                if saved:
                    print(f"{saved} sequência(s) salva(s) em: {FASTA_PATH}")
                else:
                    print("Nenhuma sequência foi salva.")

Digite o nome da proteína para buscar no UniProt (ou pressione Enter para pular): P68871
1. P68871 | Hemoglobin subunit beta | Homo sapiens | len=147
Escolha número(s) separados por vírgula, 'a' para salvar todos, ou Enter para cancelar: a
1 sequência(s) salva(s) em: /content/drive/MyDrive/protein_annotation/input_sequences.fasta


In [None]:
# Valida caminhos e cria diretórios necessários
FASTA_PATH = os.path.expanduser(FASTA_PATH)
OUTDIR = os.path.expanduser(OUTDIR)
CACHE_DIR = os.path.expanduser(CACHE_DIR)
DIAMOND_DB = os.path.expanduser(DIAMOND_DB)

os.makedirs(OUTDIR, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)

print("Parâmetros atuais:")
pprint.pprint({
    "FASTA_PATH": FASTA_PATH,
    "OUTDIR": OUTDIR,
    "CACHE_DIR": CACHE_DIR,
    "RUN_DIAMOND": RUN_DIAMOND,
    "DIAMOND_DB": DIAMOND_DB,
    "UNIPROT_REST": UNIPROT_REST,
    "TIMEOUT": TIMEOUT,
    "BATCH_SIZE": BATCH_SIZE,
    "IDENTITY_THRESHOLD": IDENTITY_THRESHOLD,
    "COVERAGE_THRESHOLD": COVERAGE_THRESHOLD,
    "EVALUE_CUTOFF": EVALUE_CUTOFF,
    "MIN_LENGTH": MIN_LENGTH,
    "MAX_LENGTH": MAX_LENGTH,
    "RUN_INTERPRO": RUN_INTERPRO,
    "RUN_SIGNALP": RUN_SIGNALP,
    "RUN_TMHMM": RUN_TMHMM,
    "LOG_LEVEL": LOG_LEVEL,
})

if not Path(FASTA_PATH).is_file():
    print(f"AVISO: arquivo FASTA não encontrado em: {FASTA_PATH}")
else:
    print("Arquivo FASTA encontrado.")

Parâmetros atuais:
{'BATCH_SIZE': 200,
 'CACHE_DIR': '/content/drive/MyDrive/protein_annotation/results/cache',
 'COVERAGE_THRESHOLD': 70.0,
 'DIAMOND_DB': '/content/diamond_db/uniprot.dmnd',
 'EVALUE_CUTOFF': 1e-05,
 'FASTA_PATH': '/content/drive/MyDrive/protein_annotation/input_sequences.fasta',
 'IDENTITY_THRESHOLD': 70.0,
 'LOG_LEVEL': 'INFO',
 'MAX_LENGTH': 5000,
 'MIN_LENGTH': 50,
 'OUTDIR': '/content/drive/MyDrive/protein_annotation/results',
 'RUN_DIAMOND': True,
 'RUN_INTERPRO': False,
 'RUN_SIGNALP': False,
 'RUN_TMHMM': False,
 'TIMEOUT': 30,
 'UNIPROT_REST': 'https://rest.uniprot.org'}
Arquivo FASTA encontrado.


# 4. Carregar e checar FASTA

In [None]:
# Carrega e valida o FASTA
import re
AA_RE = re.compile(r'^[A-Za-z\*\-\.]+$')

if not os.path.isfile(FASTA_PATH):
    raise FileNotFoundError(f"FASTA não encontrado: {FASTA_PATH}")

records = []
with open(FASTA_PATH, "r") as fh:
    for rec in SeqIO.parse(fh, "fasta"):
        seq = str(rec.seq).strip()
        records.append({
            "seq_id": rec.id,
            "header": rec.description,
            "sequence": seq,
            "length": len(seq),
            "valid_chars": bool(AA_RE.match(seq))
        })

if not records:
    raise ValueError("Nenhuma sequência lida do FASTA.")

df = pd.DataFrame(records)
seq_counts = df["sequence"].value_counts()
id_counts = df["seq_id"].value_counts()

df["duplicate_sequence"] = df["sequence"].map(lambda s: seq_counts.get(s, 0) > 1)
df["duplicate_id"] = df["seq_id"].map(lambda i: id_counts.get(i, 0) > 1)

print(f"Total sequências lidas: {len(df)}")
print(f"Sequências únicas (por sequência): {seq_counts.size}")
print(f"IDs duplicados: {int(df['duplicate_id'].sum())}")
print(f"Sequências com caracteres inválidos: {int((~df['valid_chars']).sum())}")
print("Comprimentos: min=%d median=%d max=%d" % (int(df['length'].min()), int(df['length'].median()), int(df['length'].max())))

Total sequências lidas: 1
Sequências únicas (por sequência): 1
IDs duplicados: 0
Sequências com caracteres inválidos: 0
Comprimentos: min=147 median=147 max=147


In [None]:
# Filtra por comprimento, remove duplicatas por sequência (mantém a primeira) e salva FASTA filtrado
FILTERED_FASTA = os.path.join(OUTDIR, "input_sequences.filtered.fasta")

kept = []
seen_seqs = set()
for _, row in df.iterrows():
    L = row["length"]
    if not row["valid_chars"]:
        continue
    if L < MIN_LENGTH or L > MAX_LENGTH:
        continue
    if row["sequence"] in seen_seqs:
        continue
    seen_seqs.add(row["sequence"])
    rec = SeqRecord(Seq(row["sequence"]), id=row["seq_id"], description=row["header"])
    kept.append(rec)

if kept:
    with open(FILTERED_FASTA, "w") as outfh:
        SeqIO.write(kept, outfh, "fasta")
print(f"Sequências mantidas: {len(kept)}; arquivo salvo em: {FILTERED_FASTA}")

Sequências mantidas: 1; arquivo salvo em: /content/drive/MyDrive/protein_annotation/results/input_sequences.filtered.fasta


In [None]:
# Gera um histograma simples dos comprimentos e salva em OUTDIR
png_path = os.path.join(OUTDIR, "length_distribution.png")

plt.figure(figsize=(6,3.5))
plt.hist(df['length'], bins=40, color="#2a9d8f", edgecolor="#073642")
plt.xlabel("Comprimento (aa)")
plt.ylabel("Número de sequências")
plt.title("Distribuição de comprimentos")
plt.tight_layout()
plt.savefig(png_path, dpi=150)
plt.close()
print("Histograma salvo em:", png_path)

Histograma salvo em: /content/drive/MyDrive/protein_annotation/results/length_distribution.png


# 5. Busca por similaridade + consulta UniProt

In [None]:
# Baixa e prepara banco de dados UniProt Swiss-Prot para DIAMOND
print("Preparando banco de dados DIAMOND...")
os.makedirs(os.path.dirname(DIAMOND_DB), exist_ok=True)

uniprot_url = "https://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/complete/uniprot_sprot.fasta.gz"
fasta_gz = "/tmp/uniprot_sprot.fasta.gz"
fasta_file = "/tmp/uniprot_sprot.fasta"

print("Baixando UniProt Swiss-Prot (~500MB comprimido, ~1GB descomprimido)...")
print("Isso pode levar alguns minutos...")
subprocess.run(["wget", "-q", "--show-progress", uniprot_url, "-O", fasta_gz], check=True)

print("Descomprimindo...")
subprocess.run(["gunzip", "-f", fasta_gz], check=True)

print("Criando banco DIAMOND...")
subprocess.run(["diamond", "makedb", "--in", fasta_file, "-d", DIAMOND_DB.replace(".dmnd", "")], check=True)

print(f"✓ Banco DIAMOND criado em: {DIAMOND_DB}")
subprocess.run(["rm", fasta_file], check=False)
print("Arquivos temporários removidos.")

In [None]:
# Executa DIAMOND BLASTP se configurado
diamond_results = []
if RUN_DIAMOND:
    if not Path(DIAMOND_DB).is_file():
        print(f"AVISO: banco DIAMOND não encontrado em {DIAMOND_DB}")
        print("Pulando busca DIAMOND.")
    else:
        diamond_out = os.path.join(OUTDIR, "diamond_results.tsv")
        cmd = [
            "diamond", "blastp",
            "-d", DIAMOND_DB,
            "-q", FILTERED_FASTA,
            "-o", diamond_out,
            "--outfmt", "6", "qseqid", "sseqid", "pident", "length", "qlen", "slen", "evalue", "bitscore",
            "--max-target-seqs", "5",
            "--evalue", str(EVALUE_CUTOFF),
            "--sensitive"
        ]
        print("Executando DIAMOND BLASTP...")
        try:
            subprocess.run(cmd, check=True, capture_output=True, text=True)
            print(f"DIAMOND concluído. Resultados em: {diamond_out}")
            
            if os.path.isfile(diamond_out) and os.path.getsize(diamond_out) > 0:
                df_diamond = pd.read_csv(diamond_out, sep="\t", header=None,
                                        names=["qseqid","sseqid","pident","length","qlen","slen","evalue","bitscore"])
                df_diamond["coverage"] = (df_diamond["length"] / df_diamond["qlen"]) * 100
                df_diamond = df_diamond[
                    (df_diamond["pident"] >= IDENTITY_THRESHOLD) &
                    (df_diamond["coverage"] >= COVERAGE_THRESHOLD) &
                    (df_diamond["evalue"] <= EVALUE_CUTOFF)
                ]
                diamond_results = df_diamond.to_dict("records")
                print(f"Hits filtrados: {len(diamond_results)}")
            else:
                print("Nenhum resultado DIAMOND encontrado.")
        except subprocess.CalledProcessError as e:
            print(f"Erro ao executar DIAMOND: {e}")
else:
    print("Busca DIAMOND desabilitada.")

In [None]:
# Extrai IDs únicos do UniProt dos hits DIAMOND
uniprot_ids = set()
if diamond_results:
    for hit in diamond_results:
        sseqid = hit["sseqid"]
        parts = sseqid.split("|")
        if len(parts) >= 2:
            uniprot_ids.add(parts[1])
        else:
            uniprot_ids.add(sseqid.split()[0])

print(f"IDs únicos do UniProt para consulta: {len(uniprot_ids)}")

# 6. Domínios / sinais (opcional)

#7. Agregação e confiança

#8. Exportar relatório

# 9. Notas finais e metadata

In [None]:
# Consulta metadados do UniProt em lotes
from tqdm.notebook import tqdm
import time

uniprot_metadata = {}
ids_list = list(uniprot_ids)

if ids_list:
    for i in tqdm(range(0, len(ids_list), BATCH_SIZE), desc="Consultando UniProt"):
        batch = ids_list[i:i+BATCH_SIZE]
        query = " OR ".join([f"accession:{uid}" for uid in batch])
        params = {
            "query": query,
            "format": "json",
            "size": len(batch),
            "fields": "accession,protein_name,gene_names,organism_name,length,cc_function,ft_domain,go"
        }
        
        try:
            resp = requests.get(f"{UNIPROT_REST}/uniprotkb/search", params=params, timeout=TIMEOUT)
            resp.raise_for_status()
            data = resp.json()
            
            for entry in data.get("results", []):
                acc = entry.get("primaryAccession", "")
                
                prot_desc = entry.get("proteinDescription", {})
                rec_name = prot_desc.get("recommendedName", {})
                full_name = rec_name.get("fullName", {})
                protein_name = full_name.get("value", "") if isinstance(full_name, dict) else str(full_name or "")
                
                genes = entry.get("genes", [])
                gene_name = genes[0].get("geneName", {}).get("value", "") if genes else ""
                
                organism = entry.get("organism", {}).get("scientificName", "")
                length = entry.get("sequence", {}).get("length", 0)
                
                functions = [c.get("texts", [{}])[0].get("value", "") 
                           for c in entry.get("comments", []) 
                           if c.get("commentType") == "FUNCTION"]
                function = functions[0] if functions else ""
                
                domains = [f.get("type", "") 
                          for f in entry.get("features", []) 
                          if f.get("type") == "Domain"]
                
                go_terms = []
                for go in entry.get("uniProtKBCrossReferences", []):
                    if go.get("database") == "GO":
                        go_id = go.get("id", "")
                        props = go.get("properties", [])
                        go_term = next((p.get("value") for p in props if p.get("key") == "GoTerm"), "")
                        if go_id and go_term:
                            go_terms.append(f"{go_id}:{go_term}")
                
                uniprot_metadata[acc] = {
                    "protein_name": protein_name,
                    "gene_name": gene_name,
                    "organism": organism,
                    "length": length,
                    "function": function,
                    "domains": domains,
                    "go_terms": go_terms
                }
        except Exception as e:
            print(f"Erro no lote {i//BATCH_SIZE + 1}: {e}")
        
        time.sleep(0.5)
    
    print(f"Metadados coletados: {len(uniprot_metadata)} entradas")
else:
    print("Nenhum ID para consultar no UniProt.")

In [None]:
# Combina resultados DIAMOND com metadados UniProt
annotated_results = []
for hit in diamond_results:
    sseqid = hit["sseqid"]
    parts = sseqid.split("|")
    uniprot_id = parts[1] if len(parts) >= 2 else sseqid.split()[0]
    
    metadata = uniprot_metadata.get(uniprot_id, {})
    
    annotated_results.append({
        "query_id": hit["qseqid"],
        "uniprot_id": uniprot_id,
        "identity": hit["pident"],
        "coverage": hit["coverage"],
        "evalue": hit["evalue"],
        "bitscore": hit["bitscore"],
        "protein_name": metadata.get("protein_name", ""),
        "gene_name": metadata.get("gene_name", ""),
        "organism": metadata.get("organism", ""),
        "function": metadata.get("function", ""),
        "domains": "; ".join(metadata.get("domains", [])),
        "go_terms": "; ".join(metadata.get("go_terms", []))
    })

df_annotated = pd.DataFrame(annotated_results)
annotation_file = os.path.join(OUTDIR, "protein_annotations.tsv")
df_annotated.to_csv(annotation_file, sep="\t", index=False)
print(f"Anotações salvas em: {annotation_file}")
print(f"Total de anotações: {len(df_annotated)}")