# Extract annotation information from UniProt

Note: Requires internet connection to download information from the UniProt.

## Setup
### Import packages

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
from rbc_gem_utils import (
    ANNOTATION_PATH,
    COBRA_CONFIGURATION,
    DATABASE_PATH,
    INTERIM_PATH,
    ROOT_PATH,
    compare_tables,
    read_rbc_model,
    show_versions,
    visualize_comparison,
)
from rbc_gem_utils.database.uniprot import (
    UNIPROT_EXPECTED_VERSION,
    get_isoform_value_from_entry_UniProt,
    get_label_miriam_mapping_UniProt,
    get_query_fields_UniProt,
    get_version_UniProt,
    parse_isoforms_UniProt,
    query_UniProt,
)

# Display versions of last time notebook ran and worked
show_versions()

### Define configuration
#### COBRA Configuration

In [None]:
COBRA_CONFIGURATION

## Check UniProt version
If the UniProt version does not match the expected version, it is because UniProt has been updated since the last time this code was utilized. 

### Expected UniProt version: 2023_05
* According to [UniProt](https://www.uniprot.org/help/downloads), updates to the database are made every eight weeks. 
* Last release utilized: [2023_05](https://www.uniprot.org/uniprotkb/statistics) published on **Wed Nov 08 2023**.

In [None]:
version = get_version_UniProt()
try:
    assert (
        UNIPROT_EXPECTED_VERSION == version
    ), "Expected and retrieved versions are not the same"
except AssertionError as e:
    warn("Expected and retrieved versions are not the same")
    print(f"Expected UniProt version: {UNIPROT_EXPECTED_VERSION}")
    print(f"Retrieved UniProt version {version}.")
else:
    print("Retrieved version matches expected version.")

In [None]:
if UNIPROT_EXPECTED_VERSION != version:
    # Use different directory paths for unexpected behavior
    database_dirpath = f"{ROOT_PATH}{INTERIM_PATH}"
    annotation_dirpath = f"{ROOT_PATH}{INTERIM_PATH}"
else:
    database_dirpath = f"{ROOT_PATH}{DATABASE_PATH}"
    annotation_dirpath = f"{ROOT_PATH}{ANNOTATION_PATH}"

## Load RBC-GEM model
### Current Version: 0.2.0

In [None]:
model = read_rbc_model(filetype="xml")
model

## Download data from UniProt

### Get IDs for query
#### Using a customized method

In [None]:
from_db = "GeneID"  # From NCBI Gene ID

annotation_type = "genes"
annotation_key = "custom"

df_model = pd.DataFrame.from_dict(
    {gene.id: gene.id.split("_AT")[0] for gene in model.genes},
    orient="index",
    columns=[annotation_key],
)
df_model.index.name = annotation_type
df_model = df_model.reset_index(drop=False)


query_ids = df_model[annotation_key].unique()
assert len(set(query_ids)) == len(query_ids), "Duplicate IDs in list to query"
model_search_mapping = df_model.set_index(annotation_type)[annotation_key].to_dict()
print(f"Number of model genes associated with query: {len(model_search_mapping)}")
print(f"Number of unique IDs to query: {len(query_ids)}")

### Set universal query parameters

In [None]:
miriam_query_fields = get_query_fields_UniProt(miriam_only=True)
miriam_query_fields

In [None]:
# Extract all relevant information for now and save
query_parameters = {
    "query": " && ".join(
        [
            "(reviewed:true)",
            "(organism_id:9606)",  # Homo sapiens (Human)
        ]
    ),
    "format": "tsv",
    "size": 500,
    "compressed": True,
    "fields": ",".join(miriam_query_fields),
}

## Run queries

In [None]:
database_tag = "UniProt"
compare = True
overwrite = True

all_query_results = {}

### Initial query

In [None]:
query_key = "initial"
df_results, failed_ids = query_UniProt(
    query_ids,
    query_parameters=query_parameters,
    from_db="GeneID",
)
if failed_ids:
    print(failed_ids)
all_query_results[query_key] = df_results
df_results

### Address failed IDs
IDs that failed mapping are all pseudogenes in this case. 
They have UniProt IDs that can be looked up.

In [None]:
retry_ids = {
    # Failed ID, new ID
    "2713": "Q14409",
    "2974": "O75343",
    "6526": "P53794",
}

In [None]:
query_key = "retry_1"
df_results, failed_ids = query_UniProt(
    list(sorted(retry_ids.values())),
    from_db="UniProtKB",
    query_parameters=query_parameters,
)
if failed_ids:
    print(failed_ids)
all_query_results[query_key] = df_results
model_search_mapping.update(
    {
        k: retry_ids[v]
        for k, v in model_search_mapping.items()
        if v in retry_ids and v not in failed_ids
    }
)
df_results

## Map query results to model
### Concat and cleanup query results

In [None]:
print(f"Number of unique queries: {len(all_query_results)}")
df_query_results = pd.concat(tuple(all_query_results.values()))
df_query_results = df_query_results.set_index("From").drop_duplicates()
df_query_results = df_query_results.replace("", float("nan"))
df_query_results

### Save extracted data to database

In [None]:
df_database = df_query_results.reset_index(drop=True).drop_duplicates()
if compare:
    try:
        df_previous = pd.read_csv(
            f"{database_dirpath}/{database_tag}_{version}.tsv",
            sep="\t",
            index_col=0,
            dtype=str,
        )
    except FileNotFoundError:
        df_previous = pd.DataFrame([], columns=["Entry"], dtype=str)
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    df_comparision = compare_tables(
        df_previous.set_index("Entry"), df_database.set_index("Entry")
    )
    ax = visualize_comparison(df_comparision)

if overwrite:
    df_database.to_csv(f"{database_dirpath}/{database_tag}_{version}.tsv", sep="\t")
else:
    df_database.to_csv(
        f"{ROOT_PATH}{INTERIM_PATH}/{database_tag}_{version}.tsv", sep="\t"
    )
df_database

In [None]:
df_model = pd.DataFrame.from_dict(
    {"From": model_search_mapping},
    orient="columns",
)
df_model.index.name = annotation_type
df_model = df_model.reset_index(drop=False)
df_annotations = pd.merge(
    df_model, df_query_results, left_on="From", right_index=True, how="outer"
)
df_annotations = df_annotations.drop(["From"], axis=1).set_index("genes")
df_annotations

### Format UniProt information for annotation files
#### Map to chosen MIRIAMs
As formatting may be needed for some MIRIAMS, keep it simple for now until formatting methods are developed. 

In [None]:
# Keeping it simple for now, group items regardless of isoforms for the time being
uniprot_miriam_mapping = get_label_miriam_mapping_UniProt(
    get_query_fields_UniProt(miriam_only=True)
)
df_annotations = df_annotations.loc[:, list(uniprot_miriam_mapping)].rename(
    uniprot_miriam_mapping, axis=1
)
df_annotations = df_annotations.loc[
    :,
    [
        "uniprot",
        "uniprot.isoform",
        "hgnc.symbol",
        "ncbigene",
        "ccds",
        "refseq",
        "mim",
        "drugbank",
    ],
]
df_annotations

In [None]:
# Identify isoforms
df_isoforms = df_annotations.loc[:, ["uniprot", "uniprot.isoform"]].copy()
df_isoforms = parse_isoforms_UniProt(df_isoforms, add_canonical=True)
double_check = []
for gene, (uniprot_id, isoforms) in df_isoforms.loc[
    :, ["uniprot", "uniprot.isoform"]
].iterrows():
    isoforms = isoforms.split(";")
    isonum = gene.split("_AT")[-1]
    expected_isoform = f"{uniprot_id}-{isonum}"
    if expected_isoform in isoforms:
        df_isoforms.loc[gene, "uniprot.isoform"] = expected_isoform
    else:
        double_check += [gene]
        df_isoforms.loc[gene, "uniprot.isoform"] = float("nan")

df_annotations["uniprot.isoform"] = df_isoforms["uniprot.isoform"]
columns = df_annotations.columns.difference(
    [
        # Uniprot, UniProt Isoform and the "canonical" custom column are not formatted for isoforms
        "uniprot",
        "uniprot.isoform",
        "uniprot.canonical",
    ]
)

for idx, row in df_annotations.loc[:, columns].iterrows():
    uniprot_id, isoform_id = df_isoforms.loc[
        idx, ["uniprot", "uniprot.isoform"]
    ].fillna("")
    # No isoform ID set, just aggregate all without regards to isoform.
    row = row.apply(lambda x: get_isoform_value_from_entry_UniProt(x, isoform_id))
    df_annotations.loc[idx, columns] = row.values
df_annotations = df_annotations.reset_index(drop=False)

if compare:
    try:
        df_previous = pd.read_csv(
            f"{annotation_dirpath}/{annotation_type}_{database_tag}.tsv",
            sep="\t",
            index_col=0,
            dtype=str,
        )
    except FileNotFoundError:
        df_previous = pd.DataFrame([], columns=[annotation_type], dtype=str)
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    # Comparisons should be done with annotation type as index to ensure correct entries get compared.
    df_comparision = compare_tables(
        df_previous.set_index(annotation_type),
        df_annotations.set_index(annotation_type),
    )
    ax = visualize_comparison(df_comparision)

if overwrite:
    df_annotations.to_csv(
        f"{annotation_dirpath}/{annotation_type}_{database_tag}.tsv", sep="\t"
    )
else:
    df_annotations.to_csv(
        f"{ROOT_PATH}{INTERIM_PATH}/{annotation_type}_{database_tag}.tsv", sep="\t"
    )
df_annotations

#### Double check
These didn't have isoforms, with current database mapping.

In [None]:
df_annotations.set_index(annotation_type).loc[double_check]