In [9]:
from Bio import SeqIO
import pandas as pd
import gffutils
import os
import yaml

In [None]:
class GenomeManager:
    def __init__(self, base_path: str, genome_id: str):
        """
        base_path: root directory containing all genome subfolders (e.g. "data/ncbi_dataset/data")
        genome_id: name of the genome folder (e.g. "GCA_900636475.1")
        """
        self.genome_dir = os.path.join(base_path, genome_id)
        self.fna_path = None
        self.gtf_path = None
        self.gbff_path = None
        self.gff_path = None
        self._find_files()

    def _find_files(self):
        """Scan the genome directory and assign file paths for .fna, .gtf, .gbff, .gff"""
        for fname in os.listdir(self.genome_dir):
            lower = fname.lower()
            full = os.path.join(self.genome_dir, fname)
            if lower.endswith(".fna") or lower.endswith(".fa") or lower.endswith(".fasta"):
                self.fna_path = full
            elif lower.endswith(".gtf"):
                self.gtf_path = full
            elif lower.endswith(".gbff") or lower.endswith(".gbk") or lower.endswith(".gbff"):
                self.gbff_path = full
            elif lower.endswith(".gff") or lower.endswith(".gff3"):
                self.gff_path = full

    def summary(self):
        """Print what files are detected in this genome folder."""
        print(f"Genome directory: {self.genome_dir}")
        print(f"  FASTA (.fna): {self.fna_path}")
        print(f"  GTF (.gtf):   {self.gtf_path}")
        print(f"  GBFF (.gbff): {self.gbff_path}")
        print(f"  GFF (.gff/.gff3): {self.gff_path}")

    def read_fna(self, sequence_itself: bool = True):
        """Read the FASTA (.fna) file, return list of SeqRecord objects."""
        if not self.fna_path:
            raise FileNotFoundError("No .fna file found in genome directory.")
        genome = list(SeqIO.parse(self.fna_path, "fasta"))

        if not len(genome) == 1:
            raise ValueError(f"Expected 1 sequence in .fna file, found {len(genome)}")

        seq = genome[0]
        
        if sequence_itself: return seq.seq
        else: return seq

    def load_gff_from_gffutils(self):
        """Load GFF file using gffutils."""
        if not self.gff_path:
            raise FileNotFoundError("No .gff file found in genome directory.")
        # Implement GFF parsing logic here
        
        gff_db = gffutils.create.create_db(data=self.gff_path, dbfn=':memory:', force=True)
        return gff_db
    
    def load_gff(self):
        """Load GFF file using gffutils."""
        if not self.gff_path:
            raise FileNotFoundError("No .gff file found in genome directory.")
        # Implement GFF parsing logic here
        
        # Use read_csv with comment="#" so lines starting with “#” are ignored
        gff_db = pd.read_csv(self.gff_path, sep="\t", comment="#", header=None,
            names=["seqid", "source", "feature", "start", "end", "score", "strand", "phase", "attributes"],
            dtype={
                "seqid": str,
                "source": str,
                "feature": str,
                "start": int,
                "end": int,
                "score": str,    # may be “.” or numeric
                "strand": str,
                "phase": str,
                "attributes": str
            },
            # In case there are “bad lines” (e.g. wrong number of columns), skip them
            on_bad_lines="skip"
        )

        return gff_db
    
    def extract_genes(self, find_method: str = "random", n: int = 100):
        """Extract gene sequences from fasta file based on GFF annotations."""

        if find_method not in ["random", "first"]:
            raise ValueError(f"Unknown find_method: {find_method}")

        gff_db = self.load_gff()
        genome_seq = self.read_fna(sequence_itself=True)

        # Filter for gene features
        genes = gff_db[gff_db['feature'] == 'gene']
        if genes.empty:
            raise ValueError("No gene features found in GFF file.")
        elif len(genes) < n:
            raise ValueError(f"Requested {n} genes, but only found {len(genes)} in GFF file.")
        
        if find_method == "random":
            sampled_genes = genes.sample(n=n)
        elif find_method == "first":
            sampled_genes = genes.head(n)

        gene_sequences = []
        for _, row in sampled_genes.iterrows():
            start = row['start'] - 1  # Convert to 0-based index
            end = row['end']          # End is inclusive in GFF but this is 1-based
            strand = row['strand']
            gene_seq = genome_seq[start:end]
            if strand == '-':
                gene_seq = gene_seq.reverse_complement()
            gene_sequences.append(gene_seq)

        return gene_sequences

## 1. Construct the Position Probbility Matrix (PPM) 

In [41]:
# Load YAML file and read data paths
with open('config.yaml', 'r') as f:
    config = yaml.safe_load(f)

data_path = config['data_path']
ref_genome = config['ref_genome']

#Loading reference genome
ref_genome_path = os.path.join(data_path, ref_genome)
print(f"Reference genome path: {ref_genome_path}")
ref_genome = GenomeManager(config['data_path'], config['ref_genome'])

# Load genome files from data path
genome = ref_genome.read_fna(sequence_itself=True)
gff_database = ref_genome.load_gff()
genome

Reference genome path: data/ncbi_dataset/data\GCA_900636475.1


Seq('GAATTATCGGAGGAGCGACTTGGCGACAAGTTGTGATGGTGGTAGGAATTGTTT...AGG')

Locating 1100 genes of the organism that have been predicted based upon homology.

In [42]:
genes = ref_genome.extract_genes(find_method="random", n=1100)
print(f"Extracted {len(genes)} gene sequences.")
print(genes[:5])  # Print first 5 gene sequences as a sample

Extracted 1100 gene sequences.
[('ID=gene-NCTC10713_00727;Name=ciaH;gbkey=Gene;gene=ciaH;gene_biotype=protein_coding;locus_tag=NCTC10713_00727', Seq('ATGCTGAATAAACTAAAAAAAACATGGTATGCGGATGATTTTTCTTATTTCATT...TAA')), ('ID=gene-NCTC10713_01775;Name=gcp;gbkey=Gene;gene=gcp;gene_biotype=protein_coding;locus_tag=NCTC10713_01775', Seq('ATGAATGATAGATATATTTTAGCTTTTGAGACGTCCTGCGATGAGACTAGCGTG...TAA')), ('ID=gene-NCTC10713_01700;Name=rpsM;gbkey=Gene;gene=rpsM;gene_biotype=protein_coding;locus_tag=NCTC10713_01700', Seq('ATGGCTCGTATTGCTGGAGTTGATATTCCAAATGACAAACGTGTAGTTGTTTCA...TAA')), ('ID=gene-NCTC10713_01523;Name=NCTC10713_01523;gbkey=Gene;gene_biotype=tRNA;locus_tag=NCTC10713_01523', Seq('TGGTCTCATAGCTCAGCTGGATAGAGCATTCGCCTTCTAAGCGAACGGTCGCAG...CAT')), ('ID=gene-NCTC10713_01087;Name=NCTC10713_01087;gbkey=Gene;gene_biotype=protein_coding;locus_tag=NCTC10713_01087', Seq('ATGGTTTGGATTGTAAATATACTTTATAATTTGGTAAGAGTGTATTCATTTATT...TAA'))]


## 3. Other DNAs PPM

In [31]:
# load genome files from data path
genomes = [f for f in os.listdir(data_path) if os.path.isdir(os.path.join(data_path, f))]

# Print the list of genome files
print("Genome files in data path:")
for genome_path in genomes:
    genome_path = os.path.join(data_path, genome_path)
    print(genome_path)


Genome files in data path:
data/ncbi_dataset/data\GCA_001457635.1
data/ncbi_dataset/data\GCA_019046945.1
data/ncbi_dataset/data\GCA_019048645.1
data/ncbi_dataset/data\GCA_900475505.1
data/ncbi_dataset/data\GCA_900636475.1
data/ncbi_dataset/data\GCA_900637025.1
