In [1]:
import pandas as pd 
import numpy as np

In [2]:
from Bio import SeqIO
import pandas as pd

def load_fasta(fasta_path, debug=False):
    """
    Load sequences from a FASTA file into a dictionary.
    
    Args:
        fasta_path: Path to the FASTA file.
        debug: Whether to print debug information.
    
    Returns:
        A dictionary of sequences keyed by protein ID.
    """
    sequences = {}
    for record in SeqIO.parse(fasta_path, "fasta"):
        sequences[record.id] = str(record.seq)
    
    if debug:
        print(f"Loaded {len(sequences)} sequences from {fasta_path}")
        print(f"Sample IDs: {list(sequences.keys())[:3]}")
        print(f"Length range: {min(len(seq) for seq in sequences.values())} - {max(len(seq) for seq in sequences.values())}")
    
    return sequences

def create_gene_mapping(file_path):
    """
    Parse a GFF file and create a mapping from genes to transcripts to proteins.
    
    Args:
        file_path (str): Path to the GFF file.
    
    Returns:
        pd.DataFrame: A DataFrame with columns ['gene_id', 'transcript_id', 'protein_id'].
    """
    mappings = []
    with open(file_path, 'r') as file:
        gene_id = None
        transcript_id = None

        for line in file:
            # Skip comments and empty lines
            if line.startswith("#") or not line.strip():
                continue

            # Split the line into columns
            columns = line.strip().split("\t")
            if len(columns) < 9:
                continue  # Skip malformed lines

            feature_type = columns[2]  # The third column indicates the feature type
            attributes = columns[8]  # The ninth column contains the attributes

            # Parse the attributes into a dictionary
            attr_dict = {}
            for attr in attributes.split(";"):
                if "=" in attr:
                    key, value = attr.split("=", 1)
                    attr_dict[key.strip()] = value.strip()

            # Extract gene_id and transcript_id from mRNA or transcript rows
            if feature_type in {"mRNA", "transcript"}:
                transcript_id = attr_dict.get("ID")
                gene_id = attr_dict.get("Parent")

            # Extract protein_id from CDS rows
            elif feature_type == "CDS":
                protein_id = attr_dict.get("protein_source_id") or attr_dict.get("protein_id")
                if gene_id and transcript_id and protein_id:
                    mappings.append((gene_id, transcript_id, protein_id))

    # Create a DataFrame from the mappings
    df = pd.DataFrame(mappings, columns=["gene_id", "transcript_id", "protein_id"])
    df.loc[:, 'gene_id'] = df.gene_id.str.replace("gene-", "")
    df.loc[:, 'transcript_id'] = df.transcript_id.str.replace("rna-", "")
    return df.drop_duplicates().reset_index(drop=True)
    
def get_longest_transcripts(sequences, mapping_df, debug=False):
    """
    Find the longest transcript for each gene using the mapping DataFrame.

    Args:
        sequences: Dictionary of protein sequences keyed by protein ID.
        mapping_df: DataFrame with columns ['gene_id', 'transcript_id', 'protein_id'].
        debug: Whether to print debug information.

    Returns:
        A dictionary of longest transcripts keyed by gene ID.
    """
    # Filter the mapping DataFrame to include only rows where the protein_id exists in the sequences
    mapping_df = mapping_df[mapping_df["protein_id"].isin(sequences.keys())]

    if debug:
        print(f"Filtered mapping file to {len(mapping_df)} rows with valid protein IDs.")
        print(f"Sample mapping rows:\n{mapping_df.head()}")

    # Group transcripts by gene ID
    gene_transcripts = mapping_df.groupby("gene_id").apply(
        lambda group: group[["transcript_id", "protein_id"]].to_dict(orient="records")
    ).to_dict()

    if debug:
        print(f"Found {len(gene_transcripts)} unique genes in the mapping file.")
        print(f"Sample gene-transcript mappings: {list(gene_transcripts.items())[:3]}")

    # Keep the longest transcript for each gene
    longest_transcripts = {}
    for gene_id, transcript_records in gene_transcripts.items():
        # Map protein IDs to sequences and find the longest
        valid_transcripts = [
            (record["transcript_id"], record["protein_id"], sequences[record["protein_id"]])
            for record in transcript_records
            if record["protein_id"] in sequences
        ]

        if not valid_transcripts:
            if debug:
                print(f"Warning: No valid transcripts found for gene {gene_id}")
            continue

        # Find the longest transcript
        longest = max(valid_transcripts, key=lambda x: len(x[2]))
        longest_transcripts[gene_id] = (longest[0], longest[2])  # (transcript_id, sequence)

        if debug and len(valid_transcripts) > 1:
            lengths = [(t[0], len(t[2])) for t in valid_transcripts]
            print(f"\nGene {gene_id} transcripts:")
            for tid, length in lengths:
                print(f"  {tid}: {length} aa{'*' if tid == longest[0] else ''}")

    return longest_transcripts

def write_fasta(sequences, output_path, debug=False):
    """
    Write sequences to a FASTA file with line wrapping at 60 characters.

    Args:
        sequences: Dictionary of sequences keyed by gene ID.
        output_path: Path to the output FASTA file.
        debug: Whether to print debug information.
    """
    with open(output_path, 'w') as f:
        for gene_id, (transcript_id, sequence) in sequences.items():
            f.write(f">{transcript_id}\n")
            for i in range(0, len(sequence), 60):
                f.write(f"{sequence[i:i+60]}\n")
    
    if debug:
        print(f"Wrote {len(sequences)} sequences to {output_path}")
        print(f"Total residues written: {sum(len(seq[1]) for seq in sequences.values())}")


### what we need

Protein fasta, DNA fasta, GFF. Beginning with two references, one input, one output.

1. Load protein fasta + gff and find longest transcripts
2. Subset to only longest transcripts
3. Run Orthofinder
4. Find main orthologs, calculate synteny, find syntenic orthologs
5. Visualise synteny / Orthology in each species
6. Once we have genes, take each gene and each snp, align them, find the new position in the new reference.

Lets locate our reference genomes in the `resources/reference` folder

In [3]:
import glob

vb_refs = [ref.split("reference/")[1].rstrip(".gff") for ref in glob.glob('../resources/reference/*gff')]
np.sort(vb_refs)

array(['AaegyptiLVP_AGWG', 'Aalbopictus_AalbF5', 'AdirusWRAIR2',
       'AfunestusAfunGA1', 'AgambiaePEST', 'AminimusMINIMUS1',
       'AsinensisChina', 'AstephensiUCISS2018',
       'CquinquefasciatusJHB2020', 'LlongipalpisASM'], dtype='<U24')

In [4]:
# genomes = np.array([
#        'AaegyptiLVP_AGWG', 'Aalbopictus_AalbF5', 'AdirusWRAIR2',
#        'AfunestusAfunGA1', 'AgambiaePEST', 'AminimusMINIMUS1',
#        'AsinensisChina', 'AstephensiUCISS2018',
#        'CquinquefasciatusJHB2020', 'LlongipalpisASM']
# )

genomes = ['AgambiaePEST', 'CquinquefasciatusJHB2020', 'LlongipalpisASM']

for g in genomes:
    sequences = load_fasta(f"../resources/reference/{g}_AnnotatedProteins.fasta", debug=True)
    df_mapping = create_gene_mapping(f"../resources/reference/{g}.gff")
    longest_transcripts = get_longest_transcripts(sequences, df_mapping, debug=False)
    write_fasta(longest_transcripts, f"../results/proteome/{g}.fasta", debug=True)
    print("\n")

Loaded 15328 sequences from ../resources/reference/AgambiaePEST_AnnotatedProteins.fasta
Sample IDs: ['AGAP004677.P162', 'AGAP004677.P163', 'AGAP004678-PA']
Length range: 16 - 16221
Wrote 13107 sequences to ../results/proteome/AgambiaePEST.fasta
Total residues written: 7401200


Loaded 24676 sequences from ../resources/reference/CquinquefasciatusJHB2020_AnnotatedProteins.fasta
Sample IDs: ['CQUJHB000005.P10', 'CQUJHB000005.P11', 'CQUJHB000005.P12']
Length range: 36 - 18876
Wrote 15276 sequences to ../results/proteome/CquinquefasciatusJHB2020.fasta
Total residues written: 8390556


Loaded 20287 sequences from ../resources/reference/LlongipalpisASM_AnnotatedProteins.fasta
Sample IDs: ['XP_055676422.1', 'XP_055676423.1', 'XP_055676424.1']
Length range: 28 - 22183
Wrote 11238 sequences to ../results/proteome/LlongipalpisASM.fasta
Total residues written: 6570367




## Run Orthofinder

### Synteny analysis

In [70]:
import subprocess

def run_orthofinder(input_dir, pixi_env="pixi", additional_args=None, debug=False):
    """
    Run OrthoFinder on a given directory using pixi for environment setup.

    Args:
        input_dir (str): Path to the directory containing input files for OrthoFinder.
        pixi_env (str): The pixi environment command (default is 'pixi').
        additional_args (list): Additional arguments to pass to the OrthoFinder command.
        debug (bool): Whether to print debug information.

    Returns:
        None
    """
    # Ensure additional_args is a list
    if additional_args is None:
        additional_args = []

    # Construct the OrthoFinder command
    command = [pixi_env, "run", "orthofinder", "-f", input_dir] + additional_args

    if debug:
        print(f"Running OrthoFinder with the following command:")
        print(" ".join(command))
        print(f"Input directory: {input_dir}")
        if additional_args:
            print(f"Additional arguments: {additional_args}")

    try:
        # Run the command and capture the output
        process = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )

        # Stream the output to the console
        for line in process.stdout:
            print(line, end="")  # Print each line as it is received

        # Wait for the process to complete and capture the return code
        process.wait()

        if process.returncode != 0:
            # If the process failed, print the error output
            error_output = process.stderr.read()
            print(f"\nError: OrthoFinder failed with return code {process.returncode}")
            print(f"Error details:\n{error_output}")
        else:
            if debug:
                print("\nOrthoFinder completed successfully.")

    except FileNotFoundError as e:
        print(f"Error: The command '{command[0]}' was not found. Is pixi installed and in your PATH?")
        if debug:
            print(f"Details: {e}")
    except Exception as e:
        print(f"An unexpected error occurred while running OrthoFinder.")
        if debug:
            print(f"Details: {e}")

In [71]:
# Run OrthoFinder on a directory with additional arguments
input_directory = "../results/proteome/"
additional_arguments = ["-t", "4", "-o", "../results/orthofinder"]  # Example: Use 4 threads
run_orthofinder(input_directory, additional_args=additional_arguments, debug=True)

Running OrthoFinder with the following command:
pixi run orthofinder -f ../results/proteome/ -t 4 -o ../results/orthofinder
Input directory: ../results/proteome/
Additional arguments: ['-t', '4', '-o', '../results/orthofinder']

OrthoFinder version 2.5.5 Copyright (C) 2014 David Emms

2025-01-29 10:24:57 : Starting OrthoFinder 2.5.5
4 thread(s) for highly parallel tasks (BLAST searches etc.)
1 thread(s) for OrthoFinder algorithm

Checking required programs are installed
----------------------------------------
Test can run "mcl -h" - ok
Test can run "fastme -i ../results/orthofinder/Results_Jan29/WorkingDirectory/dependencies/SimpleTest.phy -o ../results/orthofinder/Results_Jan29/WorkingDirectory/dependencies/SimpleTest.tre" - ok

Dividing up work for BLAST for parallel processing
--------------------------------------------------
2025-01-29 10:24:58 : Creating diamond database 1 of 3
2025-01-29 10:24:58 : Creating diamond database 2 of 3
2025-01-29 10:24:58 : Creating diamond database

### Orthofinder results

In [88]:
import os
import pandas as pd


class OrthoFinderResults:
    """
    A class to parse and store relevant information from OrthoFinder output.
    """

    def __init__(self, results_dir, debug=False):
        """
        Initialize the OrthoFinderResults class.

        Args:
            results_dir (str): Path to the OrthoFinder results directory.
            debug (bool): Whether to print debug information.
        """
        self.results_dir = results_dir
        self.debug = debug
        self.orthologues_stats = None
        self.hierarchical_orthogroups = None
        self.one_to_one_orthologs = None

        if debug:
            print(f"Initializing OrthoFinderResults for directory: {results_dir}")

        # Load relevant files
        self._load_orthologues_stats()
        self._load_hierarchical_orthogroups()

    def _load_orthologues_stats(self):
        """
        Load the one-to-one orthologs statistics file.
        """
        file_path = os.path.join(
            self.results_dir, "Comparative_Genomics_Statistics", "OrthologuesStats_one-to-one.tsv"
        )
        if os.path.exists(file_path):
            if self.debug:
                print(f"Loading one-to-one orthologs from: {file_path}")
            self.orthologues_stats = pd.read_csv(file_path, sep="\t", index_col=0)
        else:
            if self.debug:
                print(f"File not found: {file_path}")
            self.orthologues_stats = None

    def _load_hierarchical_orthogroups(self):
        """
        Load the hierarchical orthogroups file (N0.tsv).
        """
        file_path = os.path.join(
            self.results_dir, "Phylogenetic_Hierarchical_Orthogroups", "N0.tsv"
        )
        if os.path.exists(file_path):
            if self.debug:
                print(f"Loading hierarchical orthogroups from: {file_path}")
            self.hierarchical_orthogroups = pd.read_csv(file_path, sep="\t")
        else:
            if self.debug:
                print(f"File not found: {file_path}")
            self.hierarchical_orthogroups = None

    def get_one_to_one_orthologs(self, species_a, species_b):
        """
        Get one-to-one orthologs between two species.

        Args:
            species_a (str): Name of the first species.
            species_b (str): Name of the second species.

        Returns:
            pd.DataFrame: A DataFrame containing one-to-one orthologs between the two species.
        """
        if self.orthologues_stats is None:
            raise ValueError("One-to-one orthologs statistics file not loaded.")

        if species_a not in self.orthologues_stats.columns or species_b not in self.orthologues_stats.columns:
            raise ValueError(f"Species {species_a} or {species_b} not found in the orthologs statistics.")

        if self.debug:
            print(f"Retrieving one-to-one orthologs between {species_a} and {species_b}.")

        # Filter for one-to-one orthologs
        orthologs = self.orthologues_stats.loc[
            (self.orthologues_stats[species_a] == 1) & (self.orthologues_stats[species_b] == 1)
        ]

        return orthologs

    def get_hierarchical_orthogroup(self, orthogroup_id):
        """
        Get the genes in a specific hierarchical orthogroup.

        Args:
            orthogroup_id (str): The ID of the hierarchical orthogroup.

        Returns:
            pd.DataFrame: A DataFrame containing the genes in the specified orthogroup.
        """
        if self.hierarchical_orthogroups is None:
            raise ValueError("Hierarchical orthogroups file not loaded.")

        if self.debug:
            print(f"Retrieving hierarchical orthogroup: {orthogroup_id}")

        # Filter for the specified orthogroup
        orthogroup = self.hierarchical_orthogroups[
            self.hierarchical_orthogroups["HOG"] == orthogroup_id
        ]

        return orthogroup

    def list_species(self):
        """
        List all species in the OrthoFinder results.

        Returns:
            list: A list of species names.
        """
        if self.orthologues_stats is None:
            raise ValueError("One-to-one orthologs statistics file not loaded.")

        return list(self.orthologues_stats.columns)

    def __repr__(self):
        """
        Provide a pretty string representation of the OrthoFinderResults object.
        """
        repr_str = "OrthoFinderResults Summary\n"
        repr_str += f"Results Directory: {self.results_dir}\n"
        repr_str += "-" * 40 + "\n"

        if self.orthologues_stats is not None:
            repr_str += f"One-to-One Orthologs Loaded: {self.orthologues_stats.shape[0]} genes x {self.orthologues_stats.shape[1]} species\n"
        else:
            repr_str += "One-to-One Orthologs: Not Loaded\n"

        if self.hierarchical_orthogroups is not None:
            repr_str += f"Hierarchical Orthogroups Loaded: {self.hierarchical_orthogroups.shape[0]} orthogroups\n"
        else:
            repr_str += "Hierarchical Orthogroups: Not Loaded\n"

        repr_str += "-" * 40 + "\n"
        repr_str += "Available Methods:\n"
        repr_str += " - list_species(): List all species in the results\n"
        repr_str += " - get_one_to_one_orthologs(species_a, species_b): Retrieve one-to-one orthologs between two species\n"
        repr_str += " - get_hierarchical_orthogroup(orthogroup_id): Retrieve genes in a specific hierarchical orthogroup\n"

        return repr_str

In [93]:
# Initialize the OrthoFinderResults class
results_dir = "../results/orthofinder/Results_Jan29"
irtho = OrthoFinderResults(results_dir, debug=True)

# List all species
species = irtho.list_species()
print("Species:", species)

# Get one-to-one orthologs between two species
orthologs = irtho.get_one_to_one_orthologs("AgambiaePEST-proteins", "Aalbopictus_AalbF5-proteins")
print(orthologs)

# Get a specific hierarchical orthogroup
hog = irtho.get_hierarchical_orthogroup("N0.HOG0000000")
hog

Initializing OrthoFinderResults for directory: ../results/orthofinder/Results_Jan29
Loading one-to-one orthologs from: ../results/orthofinder/Results_Jan29/Comparative_Genomics_Statistics/OrthologuesStats_one-to-one.tsv
Loading hierarchical orthogroups from: ../results/orthofinder/Results_Jan29/Phylogenetic_Hierarchical_Orthogroups/N0.tsv
Species: ['Aalbopictus_AalbF5-proteins', 'AgambiaePEST-proteins', 'CquinquefasciatusJHB2020-proteins']
Retrieving one-to-one orthologs between AgambiaePEST-proteins and Aalbopictus_AalbF5-proteins.
Empty DataFrame
Columns: [Aalbopictus_AalbF5-proteins, AgambiaePEST-proteins, CquinquefasciatusJHB2020-proteins]
Index: []
Retrieving hierarchical orthogroup: N0.HOG0000000


Unnamed: 0,HOG,OG,Gene Tree Parent Clade,Aalbopictus_AalbF5-proteins,AgambiaePEST-proteins,CquinquefasciatusJHB2020-proteins
0,N0.HOG0000000,OG0000000,n0,"XM_019670992.3, XM_029858824.2, XM_019694447.3...",AGAP012560-RA,"CQUJHB004840.R7478, CQUJHB009344.R14386, CQUJH..."
