In [None]:
from pathlib import Path
from glob import glob

import requests
import pandas as pd

In [None]:
datadir = "/local/path/to/data/counts/"
files = glob(datadir + "*_normalised.csv")
files = {Path(f).stem.replace("_trimmed_mean_formatted_clean_normalised", ""): f for f in files if "KEGG" in f}
files = {k: pd.read_csv(v, index_col=0, nrows=1).columns.to_list() for k, v in files.items()}
for k, v in files.items():
    print(k, len(v))

### KEGG KO


In [None]:
with open('/data/gpfs/projects/punim1293/vini/db/kegg/genes/ko/ko', 'r', encoding="utf-8", errors="replace") as ko_file:
    lines = ko_file.readlines()

ko_entries = dict()
for ix, line in enumerate(lines):
    if line.startswith("ENTRY"):
        ko_id = line.split()[1]
        ko_entries[ko_id] = lines[ix+2]

ko_entries = {k.replace("ko:", ""): v.replace("NAME        ", "").split(" [EC")[0].strip() for k, v in ko_entries.items()}
ko_missing = [i for i in files["KEGG_ko"] if i not in ko_entries.keys()]
ko_all = {**ko_entries, **{i: i for i in ko_missing}}

### Refresh for attempt 2 (April 2024)

This was completed after modifying the code in the cell above this one.

In [None]:
with open("/local/path/to/data/misc/renaming/ko_entries.tsv", "w") as ko_file:
    ko_file.write("ko\tname\n")
    for k, v in ko_entries.items():
        ko_file.write(f"{k}\t{v}\n")

In [None]:
# missing = pd.read_csv("/local/path/to/data/misc/renaming/ko_missing_2nd_try.csv", header=None)[0].to_list()

In [None]:

def search_kegg_entry(kegg_identifier):
    base_url = "https://rest.kegg.jp/find/ko/"
    full_url = f"{base_url}{kegg_identifier}"

    try:
        response = requests.get(full_url)
        if response.status_code == 200:
            # Parse the response content (assuming it's in plain text format)
            entry_info = response.text.strip()
            return entry_info
        else:
            return f"Error: {response.status_code} - {response.text}"
    except Exception as e:
        return f"Error: {e}"

# for kegg_identifier in missing:
#     result = search_kegg_entry(kegg_identifier)
#     print(f"Information for {kegg_identifier}:\n{result}")


In [None]:
failed = 0
success = dict()

with open("/local/path/to/data/misc/renaming/ko_entries_2nd_try.csv", "a") as f:
    for i, kegg_identifier in enumerate(missing):
        if kegg_identifier in success.keys():
            continue
        if i % 10 == 0:
            print(f"Processed {i}/{len(missing)} entries.", end="\r")
        try:
            result = search_kegg_entry(kegg_identifier)
            success[kegg_identifier] = result
            f.write(result+"\n")
            f.flush()
        except:
            failed += 1
            pass

print("Done. Failed:", failed)

In [None]:
failed

### KEGG Pathway

In [None]:
def parse_kegg_file(file_path):
    kegg_dict = dict()
    current_category = None
    current_subcategory = None

    with open(file_path, 'r') as file:
        for line in file:
            line = line.strip()
            if line.startswith("#") and not line.startswith("##"):
                # Extract category and subcategory information
                current_category = line[1:].strip()
                current_subcategory = None
                kegg_dict[current_category] = {}
            elif line.startswith("##"):
                # Extract subcategory information
                current_subcategory = line[2:].strip()
                kegg_dict[current_category][current_subcategory] = {}
            elif line:
                # Extract pathway information
                parts = line.split("\t")
                kegg_id = str(parts[0])
                pathway_name = str(parts[1])
                if current_subcategory is not None:
                    kegg_dict[current_category][current_subcategory][kegg_id] = pathway_name
                else:
                    kegg_dict[current_category][kegg_id] = pathway_name

    return kegg_dict


def find_key_in_nested_dict(nested_dict, target_key):
    """
    Recursively find a target key in a nested dictionary.

    Parameters:
        - nested_dict: The nested dictionary to search.
        - target_key: The key to find.

    Returns:
        - The value associated with the target key, or None if not found.
    """
    for key, value in nested_dict.items():
        if key == target_key:
            return value
        elif isinstance(value, dict):
            result = find_key_in_nested_dict(value, target_key)
            if result is not None:
                return result
    return None

kegg_pathways = parse_kegg_file("/data/gpfs/projects/punim1293/vini/db/kegg/pathway/pathway.list")

pathway_entries = dict()
pathway_missing = list()
for pathway in files["KEGG_Pathway"]:
    fmt_pathway = str(pathway[-5:])
    if (result := find_key_in_nested_dict(kegg_pathways,fmt_pathway)) is not None:
        pathway_entries[pathway] = result
    else:
        pathway_missing.append(pathway)

pathway_all = {**pathway_entries, **{i: i for i in pathway_missing}}

### KEGG RClass

In [None]:
rclass = pd.read_csv("/data/gpfs/projects/punim1293/vini/db/kegg/genes/ko/ko_rclass.list", sep="\t", names="ko rclass".split(), index_col="rclass")
rclass_entries = [f"rc:{i}" for i in files["KEGG_rclass"] if f"rc:{i}" in rclass.index.to_list()]
rclass_missing = [f"rc:{i}" for i in files["KEGG_rclass"] if f"rc:{i}" not in rclass.index.to_list()]
rclass_entries = rclass.loc[rclass_entries, "ko"].map(ko_entries)
rclass_entries = rclass_entries.groupby(rclass_entries.index).apply(list).to_dict()
rclass_entries = {k: [i for i in v if i == i] for k, v in rclass_entries.items()}
rclass_all = {**rclass_entries, **{i: i for i in rclass_missing}}

### KEGG Reaction

In [None]:
with open("/data/gpfs/projects/punim1293/vini/db/kegg/brite/br/br.list") as br_file:
    lines = br_file.readlines()

lines = {i.split()[1]: i.split()[0] for i in lines if i.startswith("br:")}
lines = {i: lines.get("rn:" + i, None) for i in files["KEGG_Reaction"]}
reaction_missing = [k for k, v in lines.items() if v is None]
lines = {k: v.replace("br:", "") + ".keg" for k, v in lines.items() if v is not None}

In [None]:
def search_br_file(file, reaction):
    parentdir = "/data/gpfs/projects/punim1293/vini/db/kegg/brite/br"
    with open(f"{parentdir}/{file}") as br_file:
        lines = br_file.readlines()

    lines = list({i for i in lines if reaction in i})
    if len(lines) >= 1:
        lines = lines[0][1:].strip()
        lines = lines.replace(reaction, "").strip()
    return lines

In [None]:
reaction_entries = {k: search_br_file(v, k) for k, v in lines.items()}
reaction_all = {**reaction_entries, **{i: i for i in reaction_missing}}

### Save all

In [None]:
for key in files.keys():
    key = key[5:].lower()
    for category in ("_entries", "_missing", "_all"):
        fname = key + category
        with open("/data/gpfs/projects/punim1989/biogo-hub/data/misc/renaming/" + fname + ".tsv", "w") as file:
            if isinstance(locals()[fname], dict):
                for k, v in locals()[fname].items():
                    file.write(f"{k}\t{v}\n")
            else:
                for v in locals()[fname]:
                    file.write(f"{v}\n")