In [3]:
import math
import time
from Bio.Seq import Seq
from collections import Counter, defaultdict
from typing import Dict, List, Tuple
import numpy as np
import sys

sys.path.insert(0, '..')  
from src.config import KNOWN_RBS_MOTIFS, START_CODONS, STOP_CODONS,  MIN_ORF_LENGTH, LENGTH_REFERENCE_BP
from src.config import SCORE_WEIGHTS, START_CODON_WEIGHTS, START_SELECTION_WEIGHTS, FIRST_FILTER_THRESHOLD, SECOND_FILTER_THRESHOLD
from src.traditional_methods import find_orfs_candidates
from functools import lru_cache


In [None]:

# =============================================================================
# RBS (RIBOSOME BINDING SITE) PREDICTION
# =============================================================================

def find_purine_rich_regions_new(
    sequence: str, 
    min_length: int = 4, 
    min_purine_content: float = 0.6
) -> List[Dict]:
    """Find purine-rich regions using sliding window optimization."""
    purine_regions = []
    seq_len = len(sequence)
    
    if seq_len < min_length:
        return purine_regions
    
    # Pre-compute purine array
    is_purine = [1 if base in 'AG' else 0 for base in sequence]
    
    # CRITICAL: Match original iteration order!
    # Outer loop: iterate over START positions (like original)
    for start in range(seq_len):
        max_length = min(9, seq_len - start + 1)
        
        # Initialize first window at this start position
        if max_length > min_length:
            purine_count = sum(is_purine[start:start + min_length])
            
            # Check first length
            length = min_length
            if length <= seq_len - start:
                purine_fraction = purine_count / length
                if purine_fraction >= min_purine_content:
                    purine_regions.append({
                        'sequence': sequence[start:start + length],
                        'start': start,
                        'end': start + length,
                        'purine_content': purine_fraction,
                        'length': length
                    })
            
            # Slide to longer lengths at this start position
            for length in range(min_length + 1, max_length):
                if start + length > seq_len:
                    break
                
                # Add one more base to the window
                purine_count += is_purine[start + length - 1]
                
                purine_fraction = purine_count / length
                if purine_fraction >= min_purine_content:
                    purine_regions.append({
                        'sequence': sequence[start:start + length],
                        'start': start,
                        'end': start + length,
                        'purine_content': purine_fraction,
                        'length': length
                    })
    
    return purine_regions

def score_motif_similarity_new(sequence: str) -> Tuple[float, str]:
    """Score sequence similarity to known RBS motifs."""
    best_score = 0.0
    best_motif = None
    
    for motif in KNOWN_RBS_MOTIFS:
        for offset in range(max(len(sequence), len(motif))):
            matches = 0
            total_positions = 0
            
            for i in range(len(sequence)):
                motif_pos = i + offset
                if 0 <= motif_pos < len(motif):
                    total_positions += 1
                    if sequence[i] == motif[motif_pos]:
                        matches += 1
            
            if total_positions > 0:
                similarity = matches / total_positions
                
                overlap_length = total_positions
                motif_weight = len(motif) / 6.0  # AGGAGG gets weight 1.0
                
                score = similarity * overlap_length * motif_weight
                
                if score > best_score:
                    best_score = score
                    best_motif = motif
    
    return best_score, best_motif


def predict_rbs_simple_new(sequence: str, orf: Dict, upstream_length: int = 20) -> Dict:
    """Predict RBS using purine content, spacing, and motif similarity."""
    start_pos = orf['start']
    
    if start_pos < upstream_length:
        return {
            'rbs_score': -5.0,
            'spacing_score': 0.0,
            'motif_score': 0.0,
            'best_sequence': None,
            'best_motif': None,
            'spacing': 0,
            'position': 0
        }

    upstream_start = start_pos - upstream_length
    upstream_seq = sequence[upstream_start:start_pos]
    
    purine_regions = find_purine_rich_regions_new(upstream_seq, min_length=4, min_purine_content=0.6)
    
    best_score = -5.0
    best_prediction = None
    
    for region in purine_regions:
        sd_candidate = region['sequence']
        spacing = len(upstream_seq) - region['end']
        
        if spacing < 4 or spacing > 12:
            continue
        elif 6 <= spacing <= 8:
            spacing_score= 3.0  # Optimal
        elif 5 <= spacing <= 10:
            spacing_score= 2.5  # Very good
        elif 4 <= spacing <= 12:
            spacing_score= 1.5  # Good
        
        motif_score, best_motif = score_motif_similarity_new(sd_candidate)
        purine_bonus = (region['purine_content'] - 0.6) * 2.0
        

        combined_score = (
            spacing_score * 2.0 +    
            motif_score * 1.5 +      
            purine_bonus             
        )
        
        if combined_score > best_score:
            best_score = combined_score
            best_prediction = {
                'rbs_score': combined_score,
                'spacing_score': spacing_score,
                'motif_score': motif_score,
                'best_sequence': sd_candidate,
                'best_motif': best_motif,
                'spacing': spacing,
                'position': region['start'],
                'purine_content': region['purine_content'],
                'length': region['length']
            }
    
    return best_prediction or {
        'rbs_score': -5.0,
        'spacing_score': 0.0,
        'motif_score': 0.0,
        'best_sequence': None,
        'best_motif': None,
        'spacing': 0,
        'position': 0
    }


# =============================================================================
# ORF DETECTION
# =============================================================================

def find_orfs_candidates_new(sequence: str, min_length: int = 100) -> List[Dict]:
    """Detect all ORF candidates with dual coordinates and RBS scores."""
    orfs = []
    
    sequences = [
        ('forward', sequence),
        ('reverse', str(Seq(sequence).reverse_complement()))
    ]
    seq_len = len(sequence)

    print("Detecting ORFs and calculating RBS...")

    for strand_name, seq in sequences:
        for frame in range(3):
            active_starts = [] 
            
            for i in range(frame, len(seq) - 2, 3):
                codon = seq[i:i+3]
                
                if len(codon) != 3:
                    break
                
                if codon in START_CODONS:
                    active_starts.append((i, codon))
                    
                elif codon in STOP_CODONS and active_starts:
                    for start_pos, start_codon in active_starts:
                        orf_length = i + 3 - start_pos
                        
                        if orf_length >= min_length:
                            # Create ORF with dual coordinates
                            if strand_name == 'forward':
                                orf = {
                                    'start': start_pos + 1,
                                    'end': i + 3,
                                    'genome_start': start_pos + 1,
                                    'genome_end': i + 3,
                                    'length': orf_length,
                                    'frame': frame,
                                    'strand': 'forward',
                                    'start_codon': start_codon,
                                    'sequence': seq[start_pos:i+3]
                                }
                            else:  # reverse strand
                                orf = {
                                    'start': start_pos + 1,
                                    'end': i + 3,
                                    'genome_start': seq_len - (i + 3) + 1,
                                    'genome_end': seq_len - start_pos,
                                    'length': orf_length,
                                    'frame': frame,
                                    'strand': 'reverse',
                                    'start_codon': start_codon,
                                    'sequence': seq[start_pos:i+3]
                                }
                            
                            # Calculate RBS for this ORF
                            rbs_result = predict_rbs_simple_new(seq, orf, upstream_length=20)
                            orf['rbs_score'] = rbs_result['rbs_score']
                            orf['rbs_motif'] = rbs_result.get('best_motif')
                            orf['rbs_spacing'] = rbs_result.get('spacing', 0)
                            orf['rbs_sequence'] = rbs_result.get('best_sequence')
                            
                            orfs.append(orf)
                    
                    active_starts = []
    
    print(f"Complete: {len(orfs):,} ORFs detected with RBS scores")
    return orfs


In [None]:
import gc
import time

def reliable_benchmark(func, *args, num_runs=5, **kwargs):
    """Run benchmark with proper warm-up and GC control."""
    
    # Warm-up (don't count first run)
    print("Warm-up...", end=" ", flush=True)
    _ = func(*args, **kwargs)
    print("done")
    
    times = []
    for i in range(num_runs):
        # Force garbage collection before each run
        gc.collect()
        
        # Benchmark
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        
        times.append(elapsed)
        print(f"Run {i+1}: {elapsed:.2f}s")
    
    avg = sum(times) / len(times)
    std_dev = (sum((t - avg)**2 for t in times) / len(times)) ** 0.5
    
    print(f"\nAverage: {avg:.2f}s")
    print(f"Std Dev: {std_dev:.2f}s ({std_dev/avg*100:.1f}%)")
    print(f"Range: {min(times):.2f}s - {max(times):.2f}s")
    
    return result, avg, std_dev

# Usage
print("="*80)
print("ORIGINAL")
print("="*80)
orfs_orig, time_orig, std_orig = reliable_benchmark(
    find_orfs_candidates, sequence, min_length=100, num_runs=3
)

print(f"\n{'='*80}")
print("OPTIMIZED")
print("="*80)
orfs_opt, time_opt, std_opt = reliable_benchmark(
    find_orfs_candidates_new, sequence, min_length=100, num_runs=3
)

# Results
print(f"\n{'='*80}")
print("COMPARISON")
print("="*80)
improvement = time_orig - time_opt
percent = (improvement / time_orig) * 100
print(f"\nOriginal:  {time_orig:.2f}s ± {std_orig:.2f}s")
print(f"Optimized: {time_opt:.2f}s ± {std_opt:.2f}s")
print(f"Saved:     {improvement:.2f}s ({percent:.1f}% faster)")

Using cached genome: test_data\NC_000913.3.fasta
Sequence: 4,641,652 bp

COMPARING: find_orfs_candidates() vs find_orfs_candidates_new()

[1/2] Running ORIGINAL method...
Detecting ORFs and calculating RBS...
Complete: 176,315 ORFs detected with RBS scores
      176,315 ORFs in 101.46s

[2/2] Running OPTIMIZED method...
Detecting ORFs and calculating RBS...
Complete: 176,315 ORFs detected with RBS scores
      176,315 ORFs in 77.27s

VERIFICATION

ORF Counts:
  Original:  176,315
  Optimized: 176,315
  [PASS] Counts match!

Comparing ORF details...
  [PASS] All ORF fields match perfectly!

SUCCESS: Optimization verified - results are IDENTICAL!

Performance:
  Original:  101.46s
  Optimized: 77.27s
  Speedup:   31.3% faster

[READY FOR COMMIT]


In [None]:
# ============================================================================
# RBS SCORE VERIFICATION TEST
# ============================================================================
# Specifically tests that RBS scoring is identical after optimization

from Bio import Entrez, SeqIO
from pathlib import Path
import time

# Configuration
EMAIL = "your.email@example.com"  # CHANGE THIS
ACCESSION = "NC_000913.3"  # E. coli K-12 MG1655

# ============================================================================
# STEP 1: Download E. coli genome (only runs once)
# ============================================================================

data_dir = Path('test_data')
data_dir.mkdir(exist_ok=True)
fasta_path = data_dir / f'{ACCESSION}.fasta'

if not fasta_path.exists():
    print("Downloading E. coli genome...")
    Entrez.email = EMAIL
    handle = Entrez.efetch(db="nucleotide", id=ACCESSION, rettype="fasta", retmode="text")
    with open(fasta_path, 'w') as f:
        f.write(handle.read())
    handle.close()
    print(f"Downloaded: {fasta_path}")
else:
    print(f"Using cached genome: {fasta_path}")

# Load sequence
record = SeqIO.read(fasta_path, "fasta")
sequence = str(record.seq)
print(f"Sequence: {len(sequence):,} bp\n")

# ============================================================================
# STEP 2: Run both methods
# ============================================================================

print("="*80)
print("RBS SCORE VERIFICATION TEST")
print("="*80)

# Original method
print("\n[1/2] Running ORIGINAL method...")
start = time.time()
orfs_original = find_orfs_candidates(sequence, min_length=100)
time_original = time.time() - start
print(f"      {len(orfs_original):,} ORFs in {time_original:.2f}s")

# Optimized method  
print("\n[2/2] Running OPTIMIZED method...")
start = time.time()
orfs_optimized = find_orfs_candidates_new(sequence, min_length=100)
time_optimized = time.time() - start
print(f"      {len(orfs_optimized):,} ORFs in {time_optimized:.2f}s")

# ============================================================================
# STEP 3: RBS Score Verification
# ============================================================================

print(f"\n{'='*80}")
print("RBS SCORE VERIFICATION")
print("="*80)

# Check RBS fields exist
rbs_fields = ['rbs_score', 'rbs_spacing', 'rbs_motif', 'rbs_sequence']
print(f"\nChecking RBS fields: {', '.join(rbs_fields)}")

# Count ORFs with RBS data
orig_with_rbs = sum(1 for orf in orfs_original if orf.get('rbs_score') is not None)
opt_with_rbs = sum(1 for orf in orfs_optimized if orf.get('rbs_score') is not None)

print(f"  Original:  {orig_with_rbs:,} ORFs with RBS scores")
print(f"  Optimized: {opt_with_rbs:,} ORFs with RBS scores")

# Detailed RBS comparison
rbs_score_mismatches = []
rbs_spacing_mismatches = []
rbs_motif_mismatches = []
rbs_sequence_mismatches = []

for i, (orig, opt) in enumerate(zip(orfs_original, orfs_optimized)):
    # Check rbs_score
    if orig.get('rbs_score') != opt.get('rbs_score'):
        if isinstance(orig.get('rbs_score'), float) and isinstance(opt.get('rbs_score'), float):
            if abs(orig['rbs_score'] - opt['rbs_score']) > 1e-10:
                rbs_score_mismatches.append((i, orig['rbs_score'], opt['rbs_score']))
        else:
            rbs_score_mismatches.append((i, orig.get('rbs_score'), opt.get('rbs_score')))
    
    # Check rbs_spacing
    if orig.get('rbs_spacing') != opt.get('rbs_spacing'):
        rbs_spacing_mismatches.append((i, orig.get('rbs_spacing'), opt.get('rbs_spacing')))
    
    # Check rbs_motif
    if orig.get('rbs_motif') != opt.get('rbs_motif'):
        rbs_motif_mismatches.append((i, orig.get('rbs_motif'), opt.get('rbs_motif')))
    
    # Check rbs_sequence
    if orig.get('rbs_sequence') != opt.get('rbs_sequence'):
        rbs_sequence_mismatches.append((i, orig.get('rbs_sequence'), opt.get('rbs_sequence')))

# ============================================================================
# STEP 4: Report Results
# ============================================================================

print(f"\n{'='*80}")
print("DETAILED RBS COMPARISON")
print("="*80)

all_rbs_match = True

print(f"\nRBS Score mismatches: {len(rbs_score_mismatches)}")
if rbs_score_mismatches:
    all_rbs_match = False
    print("  First 5 mismatches:")
    for i, orig_val, opt_val in rbs_score_mismatches[:5]:
        print(f"    ORF {i}: {orig_val:.6f} != {opt_val:.6f}")

print(f"\nRBS Spacing mismatches: {len(rbs_spacing_mismatches)}")
if rbs_spacing_mismatches:
    all_rbs_match = False
    print("  First 5 mismatches:")
    for i, orig_val, opt_val in rbs_spacing_mismatches[:5]:
        print(f"    ORF {i}: {orig_val} != {opt_val}")

print(f"\nRBS Motif mismatches: {len(rbs_motif_mismatches)}")
if rbs_motif_mismatches:
    all_rbs_match = False
    print("  First 5 mismatches:")
    for i, orig_val, opt_val in rbs_motif_mismatches[:5]:
        print(f"    ORF {i}: '{orig_val}' != '{opt_val}'")

print(f"\nRBS Sequence mismatches: {len(rbs_sequence_mismatches)}")
if rbs_sequence_mismatches:
    all_rbs_match = False
    print("  First 5 mismatches:")
    for i, orig_val, opt_val in rbs_sequence_mismatches[:5]:
        print(f"    ORF {i}: '{orig_val}' != '{opt_val}'")

# ============================================================================
# FINAL VERDICT
# ============================================================================

print(f"\n{'='*80}")
print("FINAL VERDICT")
print("="*80)

if all_rbs_match:
    print("\n[PASS] All RBS scores are IDENTICAL!")
    print(f"\nPerformance:")
    print(f"  Original:  {time_original:.2f}s")
    print(f"  Optimized: {time_optimized:.2f}s")
    
    if time_optimized < time_original:
        speedup = ((time_original - time_optimized) / time_original) * 100
        print(f"  Speedup:   {speedup:.1f}% faster")
    
    print("\n[READY FOR COMMIT]")
else:
    print("\n[FAIL] RBS scores differ - DO NOT COMMIT!")
    print("The optimization changed the logic.")
    print("Fix the issue before committing.")

print("="*80)

Using cached genome: test_data\NC_000913.3.fasta
Sequence: 4,641,652 bp

RBS SCORE VERIFICATION TEST

[1/2] Running ORIGINAL method...
Detecting ORFs and calculating RBS...


In [16]:
# ============================================================================
# TEST: Compare Purine Region Search Methods
# ============================================================================
# Simple test to verify sliding window gives identical results

# Test on a few sample sequences
test_sequences = [
    "AGGAGGTAAGGAGGTAGCAT",  # 20bp typical upstream
    "AAAAAAAAAA",              # All purines
    "TTTTTTTTTT",              # No purines
    "AGTCAGTCAGTCAGTC",        # Mixed
    "AGGAGG",                  # Exact RBS motif
]

print("="*80)
print("PURINE REGION SEARCH COMPARISON")
print("="*80)

all_match = True
total_time_original = 0
total_time_window = 0

for i, seq in enumerate(test_sequences, 1):
    print(f"\nTest {i}: '{seq}'")
    print("-" * 40)
    
    # Original method
    import time
    start = time.time()
    regions_original = find_purine_rich_regions_new(seq)
    time_original = time.time() - start
    total_time_original += time_original
    
    # Window method
    start = time.time()
    regions_window = find_purine_rich_regions_new_window(seq)
    time_window = time.time() - start
    total_time_window += time_window
    
    # Compare
    match = regions_original == regions_window
    
    print(f"  Original: {len(regions_original)} regions in {time_original*1e6:.1f}us")
    print(f"  Window:   {len(regions_window)} regions in {time_window*1e6:.1f}us")
    print(f"  Match:    {'YES' if match else 'NO'}")
    
    if not match:
        all_match = False
        print("\n  MISMATCH DETAILS:")
        print(f"  Original regions: {len(regions_original)}")
        for r in regions_original[:3]:
            print(f"    {r}")
        print(f"  Window regions: {len(regions_window)}")
        for r in regions_window[:3]:
            print(f"    {r}")

print(f"\n{'='*80}")
if all_match:
    print("SUCCESS: All test sequences match!")
    print(f"Total time - Original: {total_time_original*1e6:.1f}us")
    print(f"Total time - Window:   {total_time_window*1e6:.1f}us")
else:
    print("FAILURE: Methods produce different results!")
    print("Fix the sliding window implementation before proceeding.")
print("="*80)

PURINE REGION SEARCH COMPARISON

Test 1: 'AGGAGGTAAGGAGGTAGCAT'
----------------------------------------
  Original: 71 regions in 0.0us
  Window:   71 regions in 0.0us
  Match:    YES

Test 2: 'AAAAAAAAAA'
----------------------------------------
  Original: 25 regions in 0.0us
  Window:   25 regions in 0.0us
  Match:    YES

Test 3: 'TTTTTTTTTT'
----------------------------------------
  Original: 0 regions in 0.0us
  Window:   0 regions in 0.0us
  Match:    YES

Test 4: 'AGTCAGTCAGTCAGTC'
----------------------------------------
  Original: 9 regions in 0.0us
  Window:   9 regions in 0.0us
  Match:    YES

Test 5: 'AGGAGG'
----------------------------------------
  Original: 6 regions in 0.0us
  Window:   6 regions in 0.0us
  Match:    YES

SUCCESS: All test sequences match!
Total time - Original: 0.0us
Total time - Window:   0.0us
