In [1]:
#Step 1 - Import necessary modules
from Bio import Entrez, SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.SeqUtils import gc_fraction
from Bio.SeqFeature import CompoundLocation
from collections import Counter
from skbio import DNA
import pandas as pd
import numpy as np
import ast
import logging
import itertools
from tqdm import tqdm
import time
import os, pathlib
import math

Entrez.email = "praneels2005@gmail.com"

In [2]:
def validate_row(row, index=None):
    """
    Validates a row from cleaned_features.csv.
    Prints EXACTLY which field(s) failed and why.
    Returns True if valid, False otherwise.
    """

    def is_nan(x):
        return isinstance(x, float) and math.isnan(x)

    failed = False   # Will flip to True if any validation rule fails

    prefix = f"[Row {index}] " if index is not None else ""

    # ---------------------------------------------------------
    # 1. Sequence and CDS boundary checks
    # ---------------------------------------------------------

    if is_nan(row["Sequence_length"]) or row["Sequence_length"] <= 0:
        print(prefix + "Invalid Sequence_length:", row["Sequence_length"])
        failed = True

    if is_nan(row["cds_length"]) or row["cds_length"] <= 0:
        print(prefix + "Invalid cds_length:", row["cds_length"])
        failed = True

    if is_nan(row["protein length"]) or row["protein length"] < 0:
        print(prefix + "Invalid protein length:", row["protein length"])
        failed = True

    if is_nan(row["cds_start"]) or not (0 <= row["cds_start"] < row["Sequence_length"]):
        print(prefix + f"Invalid cds_start: {row['cds_start']} (Seq_len={row['Sequence_length']})")
        failed = True

    if is_nan(row["cds_end"]) or not (0 < row["cds_end"] <= row["Sequence_length"]):
        print(prefix + f"Invalid cds_end: {row['cds_end']} (Seq_len={row['Sequence_length']})")
        failed = True

    if not is_nan(row["cds_start"]) and not is_nan(row["cds_end"]):
        if row["cds_end"] <= row["cds_start"]:
            print(prefix + f"cds_end ({row['cds_end']}) <= cds_start ({row['cds_start']})")
            failed = True

    # ---------------------------------------------------------
    # 2. GC content validation
    # ---------------------------------------------------------

    if is_nan(row["GC_content"]) or not (0 <= row["GC_content"] <= 1):
        print(prefix + "Invalid GC_content:", row["GC_content"])
        failed = True

    # ---------------------------------------------------------
    # 3. Nucleotide composition
    # ---------------------------------------------------------

    for col in ["pct_A", "pct_T", "pct_C", "pct_G"]:
        if is_nan(row[col]) or not (0 <= row[col] <= 100):
            print(prefix + f"Invalid nucleotide percentage {col}: {row[col]}")
            failed = True

    # Sum should be around 100, but allow wide tolerance due to rounding.
    nuc_sum = row["pct_A"] + row["pct_T"] + row["pct_C"] + row["pct_G"]

    # Accept anything between 85 and 115 (very lenient),
    if not (85 <= nuc_sum <= 115):
        print(prefix + f"Nucleotide sum ~= {nuc_sum} (expected ≈100%)")
        failed = True

    # ---------------------------------------------------------
    # 4. Amino acid composition
    # ---------------------------------------------------------

    aa_cols = [
        'aa_A','aa_C','aa_D','aa_E','aa_F','aa_G','aa_H','aa_I','aa_K','aa_L',
        'aa_M','aa_N','aa_P','aa_Q','aa_R','aa_S','aa_T','aa_V','aa_W','aa_Y'
    ]

    for col in aa_cols:
        if is_nan(row[col]) or row[col] < 0:
            print(prefix + f"Invalid amino-acid value {col}: {row[col]}")
            failed = True

    # ---------------------------------------------------------
    # 5. Codon usage validation
    # ---------------------------------------------------------

    for col in (c for c in row.index if c.startswith("codon_")):
        if is_nan(row[col]):
            print(prefix + f"NaN codon value {col} → Treating as 0 (not failing)")
            continue
        if row[col] < 0:
            print(prefix + f"Negative codon value {col}: {row[col]}")
            failed = True

    # ---------------------------------------------------------
    # 6. K-mer validation
    # ---------------------------------------------------------

    for col in (c for c in row.index if c.startswith("kmer_")):
        if is_nan(row[col]):
            print(prefix + f"NaN kmer value {col} → Treating as 0 (not failing)")
            continue
        if row[col] < 0:
            print(prefix + f"Negative kmer value {col}: {row[col]}")
            failed = True

    return not failed

In [3]:

def codon_usage(seq):
    '''
    Codon usage bias captures how frequently each of the 64 possible codons appears in the CDS of a gene.
    '''
    dna = DNA(str(seq))
    
    # iterate codons safely; this handles ambiguous bases and validates codons
    codons = [str(dna[i:i+3]) for i in range(0, len(dna)-len(dna) % 3, 3)]

    counts = Counter(codons)
    total = sum(counts.values())

    all_codons = [a+b+c for a in "ATCG" for b in "ATCG" for c in "ATCG"]
    
    usage = {codon: (counts.get(codon, 0) / total) for codon in all_codons}
    
    return usage

In [4]:
def K_MER(Sequence, k=2):
     # generate ALL possible kmers (16 for k=2)
    bases = ["A", "C", "G", "T"]
    all_kmers = ["".join(p) for p in itertools.product(bases, repeat=k)]
    
    Sequence = str(Sequence).upper()
    KMERS = []
    for i in range(0,len(Sequence) - k + 1):
        #All possible 2-MERS in the sequence
        KMERS.append(Sequence[i:i+k])
    counts = Counter(KMERS)
    total = len(KMERS)
    
    return {f"kmer_{mer}": counts.get(mer, 0) / total if total > 0 else 0 for mer in all_kmers}

In [5]:
# list of all 20 standard amino acids
ALL_AMINO_ACIDS = list("ACDEFGHIKLMNPQRSTVWY")

def amino_acid_composition(protein_seq):
    total = len(protein_seq)
    counts = Counter(protein_seq)
    comp = {aa: (count / total) * 100 for aa, count in counts.items()}
    return {aa: comp.get(aa, 0.0) for aa in ALL_AMINO_ACIDS}

In [6]:
def extract_features(gene_name,record):
    cds_start = None
    cds_end = None
    cds_seq = None
    protein = ""
    
    for feature in record.features:
        if feature.type == "CDS":
            # gene_name = feature.qualifiers.get("gene",['unknown_gene'])[0]
            # detects multi-exon CDS
            # genes used are disease oriented or housekeeping genes 
            if isinstance(feature.location, CompoundLocation):
                # count exons from a CompoundLocation
                exon_count = len(feature.location.parts)
            else:
                exon_count = 1
            cds_start = int(feature.location.start)
            cds_end = int(feature.location.end)
            
            cds_seq = feature.location.extract(record.seq)
            if "translation" in feature.qualifiers:
                protein = feature.qualifiers["translation"][0]
            else:
                protein = cds_seq.translate(to_stop=True)

            break
            
    # ---- Safety fallbacks ----
    if cds_seq is None:
        cds_seq = ""
        exon_count = 0
        protein = ""
        
    #Basic sequence metadata
    data = {  'Gene Name': gene_name,
              'Accession': record.annotations.get('accessions', ["UNKNOWN"]),
              'Organism': record.annotations.get('organism', "UNKNOWN"),
              'Taxonomy': record.annotations.get('taxonomy', []),
              'Molecule_type': record.annotations.get('molecule_type', "UNKNOWN"),
              'Sequence_length': len(record.seq),
              'cds_start': cds_start,
              'cds_end': cds_end,
              'cds_length':len(cds_seq),
              'protein length':len(protein),
              'pct_A': (record.seq.count("A")/len(record.seq)*100),
              'pct_T': (record.seq.count("T")/len(record.seq)*100),
              'pct_C': (record.seq.count("C")/len(record.seq)*100),
              'pct_G': (record.seq.count("G")/len(record.seq)*100),
              'GC_content': gc_fraction(record.seq),
            **{f"codon_{codon}":pct for codon, pct in codon_usage(cds_seq).items()},
            **{f"{kmer}": pct for kmer, pct in K_MER(cds_seq, 2).items()},
            **{f"aa_{aa}": pct for aa, pct in amino_acid_composition(protein).items()}}

    #File paths
    mrna_path = f"sequences/{gene_name}/{gene_name}_mrna.fasta"
    cds_path = f"sequences/{gene_name}/{gene_name}_cds.fasta"
    protein_path = f"sequences/{gene_name}/{gene_name}_protein.fasta"

    # Created SeqRecord objects
    # mRNA sequence
    mrna_record = SeqRecord(record.seq, id=f"{gene_name}_mRNA", description="")
    # CDS nucleotide sequence
    cds_record = SeqRecord(cds_seq, id=f"{gene_name}_CDS", description="")
    # Amino acid sequence
    protein_record = SeqRecord(Seq(protein), id=f"{gene_name}_protein", description="")
    
    with open(mrna_path, "w") as f:
        SeqIO.write(mrna_record, f, "fasta")
    
    with open(cds_path, "w") as f:
        SeqIO.write(cds_record, f, "fasta")
    
    with open(protein_path, "w") as f:
        SeqIO.write(protein_record, f, "fasta")
    return data

In [7]:
# Gene names extracted from txt file
notebook_dir = pathlib.Path().resolve()
os.chdir(notebook_dir)
file_path = str(notebook_dir / "Genes.txt")

try:
    with open(file_path) as f:
        genes = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
    print(f"ERROR: The file '{file_path}' was not found.")

In [8]:
# Full comprehensive data pipeline
'''
Used mRNA because it is:

consistent across genes

contains complete CDS + protein

avoids genomic complications
'''
'''
BRCA1
DMD
HTT
TP53
APOE
CFTR
FBN1
ACTB
COX1
RPLP0
GAPDH
HBB
B2M
HPRT1
BRCA2
'''
dicts = []

try:
    for gene_name in genes:
        if not gene_name:
                continue

        # Step 1: Search
        try:
            search = Entrez.esearch(
                db="nucleotide",
                term=f"{gene_name}[gene] AND refseq[filter] AND mRNA[filter]",
                retmax=100
            )
            ids = Entrez.read(search)["IdList"]
        except Exception as e:
            print(f"Search failed for {gene_name}: {e}")
            continue

        if not ids:
            print(f"No RefSeq mRNA found for {gene_name}. Skipping.")
            continue

        # Step 2: Fetch  
        try:
            handle = Entrez.efetch(
                db="nucleotide",
                id = ",".join(ids),
                rettype="gb",
                retmode="text"
            )
            records = list(SeqIO.parse(handle, "genbank"))
        except Exception as e:
            print(f"Fetch failed for {gene_name}: {e}")
            continue

        # Process records
        for record in records:
            features = extract_features(gene_name, record)

            if features is None:
                print(f"Invalid feature extraction for {record.id}")
                continue

            dicts.append(features)

            # Saves raw GenBank file
            RawPath = f'raw_data/raw_{record.id}.gb'
            with open(RawPath, 'w') as f:
                f.write(record.format("gb"))
except Exception as e:
    print(f"Unexpected error: {e}")

In [9]:
# Converts dictionary to dataframe
df = pd.DataFrame(dicts)

In [10]:
#Cleaning the dataframe
# Convert Accession from "['NM_000']" → "NM_000"
df["Accession"] = df["Accession"].apply(lambda x: x[0])

# Convert Taxonomy list-string into a simplified string
df["Taxonomy"] = df["Taxonomy"].apply(lambda x: ";".join(x))

In [11]:
# Validating the features of the dataframe
valid_mask = df.apply(validate_row, axis=1)
df_clean = df[valid_mask].reset_index(drop=True)
print(f"Removed {len(df) - len(df_clean)} invalid rows.")

Removed 0 invalid rows.


In [12]:
df_clean.shape

(1375, 115)

In [13]:
# Converting dataframe to csv file
df_clean.to_csv("cleaned_features.csv", index=False)