# Parse / Extract consistent needed data


In [None]:
import pandas as pd
from pathlib import Path

## Parse Kim Excel


In [None]:
# Define paths
DATA_PATH = Path("..") / "data"
INPUT_PATH = DATA_PATH / "raw" / "2023_Kim.xlsx"
OUTPUT_PATH = DATA_PATH / "interim" / "toxprot_2017_old.csv"

# Define constants
SHEET_NAME = "ToxProt11.2017"
COLUMNS_TO_EXTRACT = [
    "Entry",
    "Organism",
    "Protein families",
    "Length (aa)",
    "Fragments",
    "Toxic dose",
    "PTM",
]

# Read specific columns from the Excel sheet
df = pd.read_excel(INPUT_PATH, sheet_name=SHEET_NAME, usecols=COLUMNS_TO_EXTRACT)

# Rename 'Length (aa)' to 'Length' and 'Fragments' to 'Fragment'
df = df.rename(columns={"Length (aa)": "Length", "Fragments": "Fragment"})

# Ensure output directory exists and save as CSV
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(OUTPUT_PATH, index=False)

# Display information about the processed data
print(f"Data extracted and saved to {OUTPUT_PATH}")
print(f"Shape of the extracted data: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
print("First few rows:")
display(df.head())


## Parse SwissProt 2017-11 (extracted from [FTP](https://ftp.uniprot.org/pub/databases/uniprot/previous_major_releases/release-2017_11/))


In [None]:
def update_protfams(df):
    # Split on common delimiters and take first part
    df["Protein families"] = df["Protein families"].str.split(r"[.,;]").str[0]
    print(
        f"Unique protein families after splitting: {df['Protein families'].nunique()}"
    )

    # Map of family name corrections
    family_corrections = {
        "I1 superfamily": "Conotoxin I1 superfamily",
        "O1 superfamily": "Conotoxin O1 superfamily",
        "O2 superfamily": "Conotoxin O2 superfamily",
        "E superfamily": "Conotoxin E superfamily",
        "F superfamily": "Conotoxin F superfamily",
        "Conotoxin M family": "Conotoxin M superfamily",
        "Conotoxin B2 family": "Conotoxin B2 superfamily",
        "Conotoxin O1 family": "Conotoxin O1 superfamily",
        "Conotoxin O2 family": "Conotoxin O2 superfamily",
    }

    # Apply all corrections at once
    df["Protein families"] = df["Protein families"].replace(family_corrections)
    print(
        f"Unique protein families after processing: {df['Protein families'].nunique()}"
    )

    return df


# Define the generalizable function to create a FASTA file
def create_fasta_file(
    df: pd.DataFrame,
    entry_col: str,
    sequence_col: str,
    fasta_output_path: Path,
    signal_peptide_range_column: str = None,
):
    """Creates a FASTA file from a DataFrame, optionally removing signal peptides."""
    fasta_output_path.parent.mkdir(parents=True, exist_ok=True)

    with open(fasta_output_path, "w") as f_out:
        for _, row in df.iterrows():
            entry = row[entry_col]
            original_sequence = row[sequence_col]

            # Ensure sequence is a valid non-empty string
            if not isinstance(original_sequence, str) or not original_sequence:
                continue

            sequence_to_write = original_sequence

            # If a signal peptide range column is provided, attempt to remove the signal peptide
            if signal_peptide_range_column:
                signal_range_str = row.get(signal_peptide_range_column)

                # Process if range string is a valid string and contains a hyphen (e.g., "1-22")
                if isinstance(signal_range_str, str) and "-" in signal_range_str:
                    try:
                        # Signal peptide range (e.g., "1-22") is 1-based; slice at end_pos_1based for mature protein.
                        end_pos_1based = int(signal_range_str.split("-")[1])

                        if end_pos_1based > 0:
                            # Slice from end_pos_1based. Python handles end_pos_1based >= len correctly (empty string).
                            sequence_to_write = original_sequence[end_pos_1based:]
                        # If end_pos_1based is not positive, original_sequence is kept (invalid range for cut).
                    except (ValueError, IndexError):
                        # If parsing fails (e.g. "1-foo", "1-", "-"), keep the original sequence.
                        pass

            # Write to FASTA only if the (potentially modified) sequence is not empty
            if sequence_to_write:
                f_out.write(f">{entry}\n")
                f_out.write(f"{sequence_to_write}\n")

    print(f"FASTA file created at {fasta_output_path}")

In [None]:
def process_toxprot_tsv(
    tsv_input_path: Path,
    update_protfams_func,
    create_fasta_func,
    display_func=display,
):
    """
    Process a ToxProt TSV file:
    - Reads the TSV
    - Renames columns for consistency
    - Updates protein families
    - Merges Gene Ontology columns
    - Writes a cleaned CSV (without sequence and original GO columns)
    - Writes a FASTA file (removing signal peptide if present)
    - Displays summary info

    Args:
        tsv_input_path (Path): Path to the input TSV file.
        update_protfams_func (callable): Function to update protein families.
        create_fasta_func (callable): Function to create FASTA file.
        display_func (callable, optional): Function to display DataFrame (default: display).
    """
    import pandas as pd

    # Infer output paths
    base = tsv_input_path.with_suffix("")
    csv_output_path = base.with_suffix(".csv")
    fasta_output_path = base.with_suffix(".fasta")

    # Read columns from the TSV file
    # Try to infer columns present in the file
    with open(tsv_input_path, "r") as f:
        header = f.readline().strip().split("\t")
    # Required columns
    required_cols = [
        "Entry",
        "Organism",
        "Organism (ID)",
        "Protein families",
        "Length",
        "Fragment",
        "Toxic dose",
        "Post-translational modification",
        "Sequence",
        "Signal peptide (range)",
        "Protein existence",
    ]
    # Optional GO columns
    go_cols = [
        "Gene Ontology (GO)",
        "Gene Ontology (biological process)",
        "Gene Ontology (cellular component)",
        "Gene Ontology (molecular function)",
    ]
    # Only use columns that exist in the file
    usecols = [col for col in required_cols + go_cols if col in header]

    df = pd.read_csv(tsv_input_path, sep="\t", usecols=usecols)

    # Rename 'Post-translational modification' to 'PTM' if present
    if "Post-translational modification" in df.columns:
        df = df.rename(columns={"Post-translational modification": "PTM"})

    # Update protein families
    df = update_protfams_func(df)

    # Merge GO columns if present
    go_merge_cols = [
        col
        for col in [
            "Gene Ontology (biological process)",
            "Gene Ontology (cellular component)",
            "Gene Ontology (molecular function)",
        ]
        if col in df.columns
    ]
    if go_merge_cols:

        def merge_go_terms(row):
            terms = []
            for col in go_merge_cols:
                val = row.get(col)
                if pd.notnull(val):
                    val = str(val).strip()
                    if val:
                        terms.append(val)
            return "; ".join(terms) if terms else pd.NA

        df["Gene Ontology (GO)"] = df.apply(merge_go_terms, axis=1)

    # Create FASTA file if 'Sequence' and 'Entry' columns exist
    if "Sequence" in df.columns and "Entry" in df.columns:
        signal_peptide_col = (
            "Signal peptide (range)" if "Signal peptide (range)" in df.columns else None
        )
        create_fasta_func(
            df, "Entry", "Sequence", fasta_output_path, signal_peptide_col
        )

    # Prepare columns for CSV output: remove 'Sequence', 'Signal peptide (range)', and original GO columns
    drop_cols = ["Sequence", "Signal peptide (range)"] + go_merge_cols
    columns_for_csv = [col for col in df.columns if col not in drop_cols]

    # Save CSV
    df.to_csv(csv_output_path, index=False, columns=columns_for_csv)

    # Display info
    print(f"TSV data processed. FASTA file created at {fasta_output_path}.")
    print(
        f"CSV data (without sequence and original GO columns) saved to {csv_output_path}"
    )

    df_csv_content_view = df[columns_for_csv]
    print(f"Shape of the data saved to CSV: {df_csv_content_view.shape}")
    print(f"Columns in CSV: {df_csv_content_view.columns.tolist()}")
    print("First few rows (as saved to CSV):")
    display_func(df_csv_content_view.head())


# Process 2017 dataset
process_toxprot_tsv(
    DATA_PATH / "interim" / "toxprot_2017.tsv",
    update_protfams,
    create_fasta_file,
    display_func=display,
)

## Parse SwissProt release: [2025-01](https://ftp.uniprot.org/pub/databases/uniprot/previous_major_releases/release-2025_01/knowledgebase/)


In [None]:
# Process 2025 dataset
process_toxprot_tsv(
    DATA_PATH / "interim" / "toxprot_2025.tsv",
    update_protfams,
    create_fasta_file,
    display_func=display,
)

# Get taxonomy info

Process taxonomic information from a CSV file using taxopy.


In [None]:
import pandas as pd
import taxopy
from pathlib import Path


def setup_db_paths():
    """Setup and return the database paths."""
    home_dir = Path.home() / ".cache"
    db_dir = home_dir / "taxopy_db"
    db_dir.mkdir(parents=True, exist_ok=True)
    nodes_file = db_dir / "nodes.dmp"
    names_file = db_dir / "names.dmp"
    merged_file = db_dir / "merged.dmp"

    return db_dir, nodes_file, names_file, merged_file


def initialize_taxdb():
    """Initialize and return the taxonomy database."""
    # Get the database paths
    db_dir, nodes_file, names_file, merged_file = setup_db_paths()

    if nodes_file.exists() and names_file.exists():
        print(f"Loading existing taxopy database from {db_dir}")
        taxdb = taxopy.TaxDb(
            nodes_dmp=str(nodes_file),
            names_dmp=str(names_file),
            merged_dmp=str(merged_file),
        )
    else:
        print(f"Downloading taxopy database to {db_dir}")
        taxdb = taxopy.TaxDb(taxdb_dir=str(db_dir), keep_files=True)

    return taxdb


def get_taxonomy_info(taxon_id, taxdb):
    """Get order, family, genus, species info for a taxon ID."""
    # Get the Taxon object
    taxon = taxopy.Taxon(taxon_id, taxdb)

    # Get the rank information
    ranks = taxon.rank_name_dictionary

    return {
        "taxon_name": taxon.name,
        "phylum": ranks.get("phylum", ""),
        "class": ranks.get("class", ""),
        "order": ranks.get("order", ""),
        "family": ranks.get("family", ""),
        "genus": ranks.get("genus", ""),
        "species": ranks.get("species", ""),
    }  # superkingdom, kingdom, phylum, class, order, family, genus, species


def build_taxonomy_cache(df, taxdb):
    """Build a cache of taxonomy information for all unique organism IDs."""
    taxonomy_cache = {}
    for taxon_id in df["Organism (ID)"].unique():
        if pd.notna(taxon_id):
            taxonomy_cache[taxon_id] = get_taxonomy_info(taxon_id, taxdb)

    return taxonomy_cache


def add_taxonomy_columns(df, taxonomy_cache):
    """Add taxonomy columns to the dataframe."""

    # Create a mapping function that extracts all taxonomy info at once
    def get_taxonomy_info(taxon_id):
        cache_entry = taxonomy_cache.get(taxon_id, {})
        return pd.Series(
            {
                "Scientific_Name": cache_entry.get("taxon_name", ""),
                "Phylum": cache_entry.get("phylum", ""),
                "Class": cache_entry.get("class", ""),
                "Order": cache_entry.get("order", ""),
                "Family": cache_entry.get("family", ""),
                "Genus": cache_entry.get("genus", ""),
                "Species": cache_entry.get("species", ""),
            }
        )

    # Apply the mapping function once to get all columns
    taxonomy_df = df["Organism (ID)"].apply(get_taxonomy_info)

    # Concatenate the new columns with the original dataframe
    return pd.concat([df, taxonomy_df], axis=1)


def process_dataframe(input_path, output_path, taxdb):
    """Process the dataframe: load, add taxonomy, remove Organism column, save."""
    # Load the dataframe
    print(f"Loading data from {input_path}")
    df = pd.read_csv(input_path)

    # Build the taxonomy cache
    print("Building taxonomy cache...")
    taxonomy_cache = build_taxonomy_cache(df, taxdb)

    # Add taxonomy columns
    print("Adding taxonomy columns...")
    df = add_taxonomy_columns(df, taxonomy_cache)

    # Remove the Organism column if it exists
    if "Organism" in df.columns:
        print("Removing 'Organism' column...")
        df = df.drop(columns=["Organism"])

    # Save the updated dataframe
    print(f"Saving processed data to {output_path}")
    df.to_csv(output_path, index=False)
    print(f"Processing complete. Data saved to {output_path}")

In [None]:
print("Initializing taxonomy database...")
taxdb = initialize_taxdb()

# Process ToxProt 2017-11
input_path = "../data/interim/toxprot_2017.csv"
output_path = "../data/processed/toxprot_2017.csv"
process_dataframe(input_path, output_path, taxdb)

# Process ToxProt 2025-03
input_path = "../data/interim/toxprot_2025.csv"
output_path = "../data/processed/toxprot_2025.csv"
process_dataframe(input_path, output_path, taxdb)

# Differentiate between marine and terrestrial organism


In [None]:
import json

# Load the marine/terrestrial mapping
mapping_path = "../data/raw/marine_terrestrial.json"

# Load the marine/terrestrial mapping
with open(mapping_path, "r") as f:
    habitat_mapping = json.load(f)


# Function to determine habitat based on order and genus
def determine_habitat(row):
    order = row["Order"]
    genus = row.get("Genus", "")  # Get genus if available, otherwise empty string

    # Check if order is in clear_orders
    if order in habitat_mapping["clear_orders"]["terrestrial"]:
        return "terrestrial"
    elif order in habitat_mapping["clear_orders"]["marine"]:
        return "marine"

    # Check if order is in ambiguous_orders
    if order in habitat_mapping["ambiguous_orders"]:
        # Check if genus is in the terrestrial list for this order
        if genus in habitat_mapping["ambiguous_orders"][order].get("terrestrial", {}):
            return "terrestrial"
        # Check if genus is in the marine list for this order
        elif genus in habitat_mapping["ambiguous_orders"][order].get("marine", {}):
            return "marine"

    # If we can't determine, return 'unknown'
    return "unknown"


# Process 2017 dataset
csv_path_2017 = "../data/processed/toxprot_2017.csv"
df_2017 = pd.read_csv(csv_path_2017)
df_2017["Habitat"] = df_2017.apply(determine_habitat, axis=1)
print("ToxProt 2017 habitat distribution:")
print(df_2017["Habitat"].value_counts())
df_2017.to_csv(csv_path_2017, index=False)
print(f"Updated {csv_path_2017} with habitat information")

# Process 2025 dataset
csv_path_2025 = "../data/processed/toxprot_2025.csv"
df_2025 = pd.read_csv(csv_path_2025)
df_2025["Habitat"] = df_2025.apply(determine_habitat, axis=1)
print("\nToxProt 2025 habitat distribution:")
print(df_2025["Habitat"].value_counts())
df_2025.to_csv(csv_path_2025, index=False)
print(f"Updated {csv_path_2025} with habitat information")
