In [17]:
# Cell 1 - Imports & configuration
from Bio import SeqIO, pairwise2
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.SeqFeature import SeqFeature, FeatureLocation
from Bio.SeqUtils import gc_fraction
from Bio.pairwise2 import format_alignment
from Bio import Entrez
import matplotlib.pyplot as plt
import numpy as np
import os
import textwrap

# -------- USER CONFIG ----------
DNA_FASTA = "sequence.fasta"            # local DNA FASTA
PROT_FASTA = "rcsb_pdb_4ETQ.fasta"      # local Protein FASTA
OUT_DIR = "pipeline_outputs"            # output directory
ENTREZ_EMAIL = "laxminarasimha2k4@gmail.com"                       # set if you want to fetch from NCBI (optional)
# -------------------------------

if ENTREZ_EMAIL:
    Entrez.email = ENTREZ_EMAIL

os.makedirs(OUT_DIR, exist_ok=True)
print("Config set. Outputs ->", OUT_DIR)

Config set. Outputs -> pipeline_outputs


In [19]:
# Cell 2 - Parsing module
def parse_fasta(path, fmt="fasta"):
    """
    Parse sequences from a file and return list of SeqRecord.
    Accepts FASTA, GenBank, EMBL if format changed.
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"Input file not found: {path}")
    recs = list(SeqIO.parse(path, fmt))
    if not recs:
        raise ValueError(f"No sequences parsed from {path}")
    return recs

In [20]:
# Cell 3 - Analysis module: stats, codon usage, AA composition, k-mer counts
from collections import Counter

def seq_stats(record):
    s = str(record.seq)
    stats = {
        "id": record.id,
        "description": record.description,
        "length": len(s),
        "gc_percent": round(gc_fraction(s) * 100, 3)
    }
    return stats

def codon_usage(dna_seq):
    seq = str(dna_seq).upper()
    counts = Counter()
    for i in range(0, len(seq) - 2, 3):
        codon = seq[i:i+3]
        if len(codon) == 3:
            counts[codon] += 1
    return dict(counts)

def kmer_counts(seq, k=4):
    seqs = str(seq).upper()
    cnt = Counter()
    for i in range(0, len(seqs) - k + 1):
        cnt[seqs[i:i+k]] += 1
    return dict(cnt)

def aa_composition(prot_seq):
    s = str(prot_seq)
    return dict(Counter(s))

In [21]:
# Cell 4 - ORF finder (robust mapping both strands)
def find_orfs_all_frames(dna_seq, min_aa_len=30):
    """
    Find ORFs in all 6 frames.
    Returns list of dicts: {strand, frame, start, end, aa_seq}
    start,end are 0-based on original DNA, end is exclusive.
    min_aa_len is minimum amino acids (default 30 aa -> 90 nt)
    """
    dna = str(dna_seq).upper()
    seq_len = len(dna)
    rc = str(Seq(dna).reverse_complement())

    orfs = []

    def scan_sequence(sequence, strand, offset_in_original=0):
        # sequence is a linear DNA string in orientation we are scanning
        for frame in range(3):
            i = frame
            while i <= len(sequence) - 3:
                codon = sequence[i:i+3]
                if codon == "ATG":  # start codon
                    # find next stop
                    j = i
                    aa = []
                    stop_found = False
                    while j <= len(sequence) - 3:
                        cod = sequence[j:j+3]
                        aa.append(str(Seq(cod).translate()))
                        if Seq(cod).translate() == "*":
                            stop_found = True
                            break
                        j += 3
                    if stop_found:
                        aa_seq = "".join(aa)[:-1]  # remove '*' from end
                        if len(aa_seq) >= min_aa_len:
                            # map coordinates to original sequence
                            start_in_seq = i
                            end_in_seq = j + 3  # end exclusive in this scanned sequence
                            if strand == +1:
                                start_orig = offset_in_original + start_in_seq
                                end_orig = offset_in_original + end_in_seq
                            else:
                                # when scanning rc, map rc positions to original:
                                # rc index r corresponds to original index: orig = seq_len - r - 1
                                # For stretch [start_in_seq, end_in_seq) on rc, the corresponding original coordinates:
                                # orig_end = seq_len - start_in_seq
                                # orig_start = seq_len - end_in_seq
                                orig_start = seq_len - end_in_seq
                                orig_end = seq_len - start_in_seq
                                start_orig = orig_start
                                end_orig = orig_end
                            orfs.append({
                                "strand": strand,
                                "frame": frame if strand==+1 else -frame-1,
                                "start": int(start_orig),
                                "end": int(end_orig),
                                "aa_seq": Seq( "".join([str(Seq(sequence[k:k+3]).translate()) for k in range(i, j, 3)]) )
                            })
                    # move i to next codon after start to avoid infinite loop
                    i += 3
                else:
                    i += 1

    scan_sequence(dna, +1)
    scan_sequence(rc, -1)
    # sort by length (nt) desc
    orfs.sort(key=lambda o: (o['end'] - o['start']), reverse=True)
    return orfs

# quick note: ORF finder prefers canonical ATG starts and standard stops.

In [22]:
# Cell 5 - Translation helpers
def translate_dna_region(dna_seq, start, end, strand=+1):
    """
    Extract DNA region [start:end) and translate.
    strand = +1 or -1. Returns SeqRecord of protein.
    """
    seq = Seq(str(dna_seq))
    if strand == +1:
        subseq = seq[start:end]
    else:
        subseq = seq[start:end].reverse_complement()
    aa = subseq.translate(to_stop=False)
    rec_id = f"translated_{start}_{end}_{'fwd' if strand==1 else 'rev'}"
    return SeqRecord(aa, id=rec_id, description=f"translation of {start}-{end} strand {strand}")

def save_fasta(seqrec, path):
    SeqIO.write(seqrec, path, "fasta")
    print("Saved:", path)


In [23]:
# Cell 6 - Alignment & alignment metrics
def pairwise_protein_align(seqA, seqB):
    """
    Global alignment with simple scoring. Returns best alignment and identity.
    """
    # Use pairwise2 globalms: match=2, mismatch=-1, gap open=-0.5, gap extend=-0.1
    alns = pairwise2.align.globalms(str(seqA), str(seqB), 2, -1, -0.5, -0.1)
    if not alns:
        return None, 0.0
    best = alns[0]
    a, b, score, begin, end = best
    matches = sum(1 for x,y in zip(a,b) if x == y and x != '-' and y != '-')
    length = sum(1 for x,y in zip(a,b) if x != '-' and y != '-')
    identity = matches / length if length > 0 else 0.0
    return best, identity

def save_alignment_text(aln, path):
    if aln is None:
        print("No alignment to save.")
        return
    with open(path, "w") as fh:
        fh.write(format_alignment(*aln))
    print("Alignment saved to:", path)

In [24]:
# Cell 7 - Visualization utilities
def plot_lengths(dna_rec, prot_rec, translated_rec, outdir):
    labels = ["DNA (nt)", "Provided Protein (aa)", "Translated (aa)"]
    values = [len(dna_rec.seq), len(prot_rec.seq), len(translated_rec.seq)]
    plt.figure(figsize=(6,4))
    plt.bar(labels, values)
    plt.ylabel("Length")
    plt.title("Sequence lengths")
    plt.tight_layout()
    p = os.path.join(outdir, "lengths.png")
    plt.savefig(p)
    plt.close()
    print("Saved:", p)

def plot_gc(dna_rec, outdir):
    val = gc_fraction(dna_rec.seq) * 100
    plt.figure(figsize=(4,3))
    plt.bar(["GC%"], [val])
    plt.ylabel("Percentage")
    plt.title(f"GC content: {dna_rec.id}")
    plt.tight_layout()
    p = os.path.join(outdir, "gc_content.png")
    plt.savefig(p)
    plt.close()
    print("Saved:", p)

def plot_top_codons(codon_dict, outdir, topn=10):
    items = sorted(codon_dict.items(), key=lambda x: x[1], reverse=True)[:topn]
    codons, counts = zip(*items) if items else ([],[])
    plt.figure(figsize=(8,4))
    plt.bar(codons, counts)
    plt.xticks(rotation=45)
    plt.title("Top codons")
    plt.tight_layout()
    p = os.path.join(outdir, "top_codons.png")
    plt.savefig(p)
    plt.close()
    print("Saved:", p)

def plot_alignment_identity(aln, outdir):
    if aln is None:
        print("No alignment to plot.")
        return
    a,b,score,begin,end = aln
    identity = [1 if (x == y and x != '-') else 0 for x,y in zip(a,b)]
    window = max(1, int(len(identity)*0.02))
    running = np.convolve(identity, np.ones(window)/window, mode='valid')
    plt.figure(figsize=(8,3))
    plt.plot(running)
    plt.title("Sliding-window alignment identity (window=%d)"%window)
    plt.ylabel("Identity fraction")
    plt.xlabel("Window position")
    plt.tight_layout()
    p = os.path.join(outdir, "alignment_identity.png")
    plt.savefig(p)
    plt.close()
    print("Saved:", p)


In [25]:
# Cell 8 - Annotation: add predicted CDS features and write GenBank
def annotate_and_write_genbank(dna_rec, orfs, out_path, max_features=10):
    # Ensure annotations exist
    dna_rec.features = getattr(dna_rec, "features", [])
    count = 0
    for i, orf in enumerate(orfs):
        if count >= max_features:
            break
        qualifiers = {
            "note": f"predicted_orf_{i+1}",
            "translation": str(orf["aa_seq"])
        }
        f = SeqFeature(FeatureLocation(int(orf["start"]), int(orf["end"])), type="CDS", qualifiers=qualifiers)
        dna_rec.features.append(f)
        count += 1

    # Minimal annotations required to write GenBank
    dna_rec.annotations = getattr(dna_rec, "annotations", {})
    dna_rec.annotations.setdefault("molecule_type", "DNA")
    dna_rec.annotations.setdefault("source", dna_rec.description)
    # Write GenBank file
    SeqIO.write(dna_rec, out_path, "genbank")
    print("Annotated GenBank written to:", out_path)


In [26]:
# Cell 9 - Optional DB access module (NCBI Entrez)
def fetch_nucleotide(accession):
    if not ENTREZ_EMAIL:
        raise RuntimeError("Set ENTREZ_EMAIL before using Entrez fetch.")
    handle = Entrez.efetch(db="nucleotide", id=accession, rettype="fasta", retmode="text")
    rec = SeqIO.read(handle, "fasta")
    return rec

def fetch_protein_by_pdb(pdb_id):
    # Some PDB chains are available via 'protein' db using PDB accession; if not, use alternative sources.
    if not ENTREZ_EMAIL:
        raise RuntimeError("Set ENTREZ_EMAIL before using Entrez fetch.")
    handle = Entrez.efetch(db="protein", id=pdb_id, rettype="fasta", retmode="text")
    rec = SeqIO.read(handle, "fasta")
    return rec


In [27]:
# Cell 10 - Run full pipeline (orchestration)
def run_full_pipeline(dna_path=DNA_FASTA, prot_path=PROT_FASTA, outdir=OUT_DIR):
    print("=== Pipeline start ===")
    # 1) Parse
    dna_recs = parse_fasta(dna_path)
    prot_recs = parse_fasta(prot_path)
    dna = dna_recs[0]
    prot = prot_recs[0]
    print("Parsed DNA:", dna.id, "length", len(dna.seq))
    print("Parsed Protein:", prot.id, "length", len(prot.seq))

    # 2) Analysis
    print("\n-- Basic stats --")
    print("DNA stats:", seq_stats(dna))
    print("Protein stats:", seq_stats(prot))

    codons = codon_usage(dna.seq)
    kmers = kmer_counts(dna.seq, k=4)
    aa_comp = aa_composition(prot.seq)
    print("\nTop 8 codons:", sorted(codons.items(), key=lambda x: x[1], reverse=True)[:8])

    # 3) ORF finding
    print("\n-- Finding ORFs (min 30 aa) --")
    orfs = find_orfs_all_frames(dna.seq, min_aa_len=30)
    print(f"Found {len(orfs)} ORFs. Top 3 (start,end,aa_len,strand):")
    for o in orfs[:3]:
        print(" ", o['start'], o['end'], len(o['aa_seq']), o['strand'])

    # 4) Translation: pick longest ORF (if exists) else translate frame 0 to_stop
    if orfs:
        longest = orfs[0]
        translated_rec = translate_dna_region(dna.seq, longest['start'], longest['end'], strand=1 if longest['strand']>0 else -1)
    else:
        translated_rec = SeqRecord(dna.seq.translate(to_stop=True), id=dna.id + "_translated_frame0", description="translated_frame0")
    # save translated
    translated_path = os.path.join(outdir, "translated.fasta")
    save_fasta(translated_rec, translated_path)

    # 5) Alignment (translated vs provided protein)
    print("\n-- Aligning translated protein to provided protein --")
    alignment, identity = pairwise_protein_align(translated_rec.seq, prot.seq)
    if alignment is None:
        print("Alignment not produced.")
    else:
        print(f"Alignment identity: {identity*100:.2f}%")
        save_alignment_text(alignment, os.path.join(outdir, "alignment.txt"))

    # 6) Visualization
    plot_lengths(dna, prot, translated_rec, outdir)
    plot_gc(dna, outdir)
    plot_top_codons(codons, outdir, topn=12)
    plot_alignment_identity(alignment, outdir)

    # 7) Annotation & write GenBank
    gbk_out = os.path.join(outdir, f"{dna.id}_annotated.gb")
    annotate_and_write_genbank(dna, orfs, gbk_out, max_features=10)

    # 8) Save summary report (text)
    summary_path = os.path.join(outdir, "summary.txt")
    with open(summary_path, "w") as fh:
        fh.write("Pipeline Summary\n")
        fh.write("================\n")
        fh.write(f"DNA: {dna.id} length {len(dna.seq)}\n")
        fh.write(f"Protein: {prot.id} length {len(prot.seq)}\n")
        fh.write(f"Translated used: {translated_rec.id} length {len(translated_rec.seq)}\n")
        fh.write(f"ORFs found: {len(orfs)}\n")
        if alignment:
            fh.write(f"Alignment identity: {identity*100:.2f}%\n")
        fh.write("\nTop codons:\n")
        for c,v in sorted(codons.items(), key=lambda x:x[1], reverse=True)[:12]:
            fh.write(f"{c}: {v}\n")
    print("Summary written to:", summary_path)

    print("\n=== Pipeline finished. Outputs in:", outdir, "===\n")

# Run the pipeline now
run_full_pipeline()

=== Pipeline start ===
Parsed DNA: NM_001143.2 length 926
Parsed Protein: 4ETQ_1|Chains length 221

-- Basic stats --
DNA stats: {'id': 'NM_001143.2', 'description': 'NM_001143.2 Homo sapiens amelogenin Y-linked (AMELY), transcript variant 1, mRNA', 'length': 926, 'gc_percent': 51.188}
Protein stats: {'id': '4ETQ_1|Chains', 'description': '4ETQ_1|Chains A, C[auth H]|anti-vaccinia D8L antigen murine monoclonal IgG2a antibody LA5, heavy chain|Mus musculus (10090)', 'length': 221, 'gc_percent': 58.333}

Top 8 codons: [('CAG', 22), ('CCT', 17), ('CCC', 16), ('CCA', 15), ('CTG', 14), ('CAC', 13), ('CAA', 11), ('ATG', 11)]

-- Finding ORFs (min 30 aa) --
Found 45 ORFs. Top 3 (start,end,aa_len,strand):
  192 771 192 1
  192 771 192 1
  192 771 192 1
Saved: pipeline_outputs/translated.fasta

-- Aligning translated protein to provided protein --
Alignment identity: 100.00%
Alignment saved to: pipeline_outputs/alignment.txt
Saved: pipeline_outputs/lengths.png
Saved: pipeline_outputs/gc_content.p