<a href="https://colab.research.google.com/github/jjjung99/SD-design/blob/main/500%2C000%EA%B0%9C_%EC%83%98%ED%94%8C%2BSD%EC%A0%84%EC%88%98%EC%A1%B0%EC%82%AC_Base_line.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# ============================
# Cell 1. WT 분석 (SD–ASD ΔG 분포)
#  - MG1655 genome + GFF 필요
#  - 결과: WT_ASD_RNA, DG_MIN, DG_MAX, CROSS_CUTOFF, WT_SD_SUBSET
# ============================

!pip install -q ViennaRNA
!pip install -q biopython

import RNA
from Bio import SeqIO
import numpy as np
import random

genome_fasta_path = "/content/GCF_000005845.2_ASM584v2_genomic.fna"
gff_path          = "/content/genomic.gff"

def dna_to_rna(seq: str) -> str:
    return seq.upper().replace("T", "U")

def revcomp_dna(seq: str) -> str:
    comp = str.maketrans("ACGTacgt", "TGCAtgca")
    return seq.translate(comp)[::-1]

def is_sd_candidate(motif: str) -> bool:
    motif = motif.upper().replace("U", "T")
    L = len(motif)
    if L < 4 or L > 6:
        return False
    purines = motif.count("A") + motif.count("G")
    if purines < int(0.6 * L):
        return False
    if ("GG" not in motif) and ("AGG" not in motif) and ("GAG" not in motif):
        return False
    return True

def calc_duplex_energy(asd_rna: str, sd_dna: str) -> float:
    sd_rna = dna_to_rna(sd_dna)
    duplex = RNA.duplexfold(asd_rna, sd_rna)
    return duplex.energy

def load_genome_and_features():
    genome_dict = {}
    for record in SeqIO.parse(genome_fasta_path, "fasta"):
        genome_dict[record.id] = str(record.seq).upper()

    features = []
    with open(gff_path, "r") as gff:
        for line in gff:
            if not line.strip() or line.startswith("#"):
                continue
            cols = line.rstrip().split("\t")
            if len(cols) != 9:
                continue
            # Corrected: Added 'score' variable to unpack all 9 columns
            seqid, source, ftype, start, end, score, strand, phase, attrs = cols
            features.append((seqid, ftype, int(start), int(end), strand, attrs))
    return genome_dict, features

def extract_wt_asd(genome_dict, features):
    ASD_list = []
    for seqid, ftype, start, end, strand, attrs in features:
        if ftype == "rRNA" and "16S ribosomal RNA" in attrs:
            if seqid not in genome_dict:
                continue
            full_rrna = genome_dict[seqid][start-1:end]
            if strand == "-":
                full_rrna = revcomp_dna(full_rrna)
            ASD_list.append(full_rrna[-10:])
    if not ASD_list:
        raise RuntimeError("16S rRNA에서 ASD 후보를 찾지 못했습니다.")
    return dna_to_rna(ASD_list[0])   # RNA

def get_upstream_20(genome_dict, seqid, start, end, strand, window=20):
    chrom = genome_dict.get(seqid)
    if chrom is None:
        return None
    L = len(chrom)
    if strand == "+":
        s0 = start - 1
        if s0 < window:
            return None
        return chrom[s0-window:s0].upper()
    elif strand == "-":
        e0 = end - 1
        if e0 + window >= L:
            return None
        downstream = chrom[e0+1:e0+1+window]
        return revcomp_dna(downstream).upper()
    else:
        return None

def find_wt_sd_and_energies(genome_dict, features, WT_ASD_RNA):
    energies = []
    wt_sd_list = []
    for seqid, ftype, start, end, strand, attrs in features:
        if ftype != "CDS":
            continue
        upstream = get_upstream_20(genome_dict, seqid, start, end, strand, 20)
        if upstream is None:
            continue
        window = upstream.upper()
        best_motif = None
        best_dG = None
        for k in [4, 5, 6]:
            for i in range(0, len(window)-k+1):
                m = window[i:i+k]
                if not is_sd_candidate(m):
                    continue
                dG = calc_duplex_energy(WT_ASD_RNA, m)
                if (best_dG is None) or (dG < best_dG):
                    best_dG = dG
                    best_motif = m
        if best_motif is not None:
            energies.append(best_dG)
            wt_sd_list.append(best_motif)

    if not energies:
        raise RuntimeError("어떤 CDS에서도 SD 후보를 찾지 못했습니다.")
    return np.array(energies, float), wt_sd_list

# ---- 실행 ----
genome_dict, features = load_genome_and_features()
print(f"[INFO] Loaded chromosomes: {list(genome_dict.keys())}")
print(f"[INFO] Total features in GFF: {len(features)}")

WT_ASD_RNA = extract_wt_asd(genome_dict, features)
print(f"[INFO] WT_ASD_RNA (16S 3' tail) = {WT_ASD_RNA}")

energies, WT_SD_DNA_LIST = find_wt_sd_and_energies(genome_dict, features, WT_ASD_RNA)

mean_dG = float(energies.mean())
std_dG  = float(energies.std())

# 타깃 범위: mean \u00b1 0.4 (너가 이미 본 [-6.110, -5.310]이 여기서 나왔음)
DG_MIN  = mean_dG - 0.400
DG_MAX  = mean_dG + 0.400

# 직교성 기준: WT 평균보다 1\u03c3 약한 결합 이상만 허용
CROSS_CUTOFF = mean_dG + std_dG

print("\n=== WT SD–WT ASD \u0394G 통계 ===")
print(f"Count : {len(energies)}")
print(f"Mean  : {mean_dG:.3f} kcal/mol")
print(f"Std   : {std_dG:.3f} kcal/mol")
print(f"TARGET_DG range   : [{DG_MIN:.3f}, {DG_MAX:.3f}]")
print(f"CROSS_CUTOFF (weak): \u0394G >= {CROSS_CUTOFF:.3f} kcal/mol")

# 직교성 테스트에 사용할 WT SD subset (계산량 줄이기용)
WT_SD_SUBSET_SIZE = 300
if len(WT_SD_DNA_LIST) > WT_SD_SUBSET_SIZE:
    WT_SD_SUBSET = random.sample(WT_SD_DNA_LIST, WT_SD_SUBSET_SIZE)
else:
    WT_SD_SUBSET = WT_SD_DNA_LIST[:]

print(f"[INFO] WT_SD_SUBSET size for orthogonality check: {len(WT_SD_SUBSET)}")

[INFO] Loaded chromosomes: ['NC_000913.3']
[INFO] Total features in GFF: 9523
[INFO] WT_ASD_RNA (16S 3' tail) = CACCUCCUUA

=== WT SD–WT ASD ΔG 통계 ===
Count : 4101
Mean  : -5.710 kcal/mol
Std   : 1.968 kcal/mol
TARGET_DG range   : [-6.110, -5.310]
CROSS_CUTOFF (weak): ΔG >= -3.742 kcal/mol
[INFO] WT_SD_SUBSET size for orthogonality check: 300


In [4]:
# ============================================
# Cell 2. O-SD 설계 (최종 통합본: SD 전수조사 + multiprocessing)
# ============================================

import RNA, random, math
import multiprocessing as mp

# --- 공통 유틸 ---
def dna_to_rna(seq: str) -> str:
    return seq.upper().replace("T", "U")

def rna_revcomp(seq: str) -> str:
    comp = str.maketrans("AUCGaucg", "UAGCuagc")
    return seq.translate(comp)[::-1]

def dna_revcomp(seq: str) -> str:
    comp = str.maketrans("ATGCatgc", "TACGtacg")
    return seq.translate(comp)[::-1]

def calc_duplex_energy(asd_rna: str, sd_dna: str) -> float:
    sd_rna = dna_to_rna(sd_dna)
    duplex = RNA.duplexfold(asd_rna, sd_rna)
    return duplex.energy

def random_AU(n: int):
    return "".join(random.choice("AT") for _ in range(n))

def random_nt(n: int):
    return "".join(random.choice("ATGC") for _ in range(n))


# ===================================================
# 0. DESIGN 고정 요소들 (baseline 그대로 유지)
# ===================================================

UTR_50 = "ATATAGGCATAGCGCACAGACAGATAAAAATTACAGAGTACACAACATCC"

EMPTY_ORF = (
    "CTGCTGGGTGAGCTTTCTCCGTAAACTTAAAGGAAAAGATTCCGTTGAAAGATT"
    "CAAAGCTATCGTTCAGCGTATACAAGAGACTTCCTCCTGAGACTCGTGTTCCC"
    "GTACCGAACTCT"
)

CODING_SEQ = """ATGCGAGTGTTGAAGTTCGGCGGTACATCAGTGGCAAATGCAGAACGTTTTCTGCGTGTTGCCGAT...""".replace("\n","")


print("[INFO] WT_ASD_RNA =", WT_ASD_RNA)
print("[INFO] DG_MIN, DG_MAX =", DG_MIN, DG_MAX)
print("[INFO] CROSS_CUTOFF =", CROSS_CUTOFF)
print("[INFO] WT_SD_SUBSET size:", len(WT_SD_SUBSET))


# ===================================================
# 1. SD 전수조사(4^6 = 4096개의 SD) → GOOD_SD_LIST 생성
# ===================================================

def precompute_good_sds():
    good = []

    for idx in range(4**6):   # 4096 조합 전체 탐색
        tmp = idx
        s = []
        for _ in range(6):
            tmp, r = divmod(tmp, 4)
            s.append("ATGC"[r])
        sd_dna = "".join(s)

        # SD candidate rule 유지 (너 baseline 그대로 사용)
        pur = sd_dna.count("A") + sd_dna.count("G")
        if pur < 4:  # 6mer에서 4/6 = 66% purine 이상
            continue
        if ("GG" not in sd_dna) and ("AGG" not in sd_dna) and ("GAG" not in sd_dna):
            continue

        sd_rna = dna_to_rna(sd_dna)
        ortho_asd_rna = rna_revcomp(sd_rna)

        # 1) O-ASD : O-SD ΔG (핵심 조건)
        E_orth = calc_duplex_energy(ortho_asd_rna, sd_dna)
        if not (DG_MIN <= E_orth <= DG_MAX):
            continue

        # 2) WT-ASD : O-SD 직교성
        E_cross1 = calc_duplex_energy(WT_ASD_RNA, sd_dna)
        if E_cross1 < CROSS_CUTOFF:
            continue

        # 3) O-ASD : WT-SD 직교성
        E_cross2_min = math.inf
        for wt_sd in WT_SD_SUBSET:
            dG = calc_duplex_energy(ortho_asd_rna, wt_sd)
            if dG < E_cross2_min:
                E_cross2_min = dG
            if E_cross2_min < CROSS_CUTOFF:
                break

        if E_cross2_min < CROSS_CUTOFF:
            continue

        good.append({
            "SD_DNA": sd_dna,
            "E_orth": E_orth,
            "E_cross1": E_cross1,
            "E_cross2_min": E_cross2_min
        })

    return good


print("[INFO] SD 전수조사 시작...")
GOOD_SD_LIST = precompute_good_sds()
print("[INFO] GOOD_SD_LIST size =", len(GOOD_SD_LIST))


# ===================================================
# 2. 후보 1개 생성 (AU/random + GOOD_SD_LIST 중 SD 선택 + hairpin)
# ===================================================

def make_and_score_candidate(seed=None):
    if seed is not None:
        random.seed(seed)

    au_dna = random_AU(10)
    spacer_dna = random_nt(6)

    # ---- 여기 핵심! ----
    sd_info = random.choice(GOOD_SD_LIST)
    sd_dna = sd_info["SD_DNA"]

    sd_rna = dna_to_rna(sd_dna)
    ortho_asd = rna_revcomp(sd_rna)

    # SD 단계에서 이미 조건 만족하므로 가져오기
    E_orth = sd_info["E_orth"]
    E_cross1 = sd_info["E_cross1"]
    E_cross2_min = sd_info["E_cross2_min"]

    # --- full DNA construct ---
    full_dna = (
        UTR_50 + au_dna + sd_dna + spacer_dna + "ATG" +
        EMPTY_ORF + CODING_SEQ
    )

    # --- hairpin ΔG 체크 ---
    local_dna = UTR_50 + au_dna + sd_dna + spacer_dna
    local_rna = dna_to_rna(local_dna)
    structure, mfe = RNA.fold(local_rna)

    if mfe < -3.0:
        return None

    return {
        "AU_DNA": au_dna,
        "SD_DNA": sd_dna,
        "spacer_DNA": spacer_dna,
        "full_dna": full_dna,
        "E_orth": E_orth,
        "E_cross1": E_cross1,
        "E_cross2_min": E_cross2_min,
        "mfe_local": mfe
    }


# ===================================================
# 3. Filter 조건 (이미 SD 전수조사에서 통과했으므로 hairpin만 체크)
# ===================================================

def passes_filters(c):
    if c is None:
        return False
    if c["mfe_local"] < -3.0:
        return False
    return True


# ===================================================
# 4. multiprocessing worker
# ===================================================

def worker_generate(n_samples, seed):
    random.seed(seed)
    acc = []
    for _ in range(n_samples):
        cand = make_and_score_candidate()
        if passes_filters(cand):
            acc.append(cand)
    return acc


# ===================================================
# 5. 병렬 실행
# ===================================================

TOTAL_SAMPLES = 500000  # ★ 너가 요청한 50만 샘플
N_PROCESSES = max(1, mp.cpu_count() - 1)
SAMPLES_PER_WORKER = TOTAL_SAMPLES // N_PROCESSES

print(f"[INFO] Using {N_PROCESSES} workers, {SAMPLES_PER_WORKER} samples per worker")

if __name__ == "__main__":
    with mp.Pool(N_PROCESSES) as pool:
        results = pool.starmap(
            worker_generate,
            [(SAMPLES_PER_WORKER, i * 77779) for i in range(N_PROCESSES)]
        )

    candidates = [c for sub in results for c in sub]

    print(f"\n[INFO] 총 시도 샘플 수 : ~{TOTAL_SAMPLES}")
    print(f"[INFO] 최종 후보 수   : {len(candidates)}")

    # Sort by closeness to center of TARGET ΔG
    TARGET_DG_MEAN = (DG_MIN + DG_MAX)/2
    candidates_sorted = sorted(
        candidates,
        key=lambda x: abs(x["E_orth"] - TARGET_DG_MEAN)
    )

    TOP_K = min(15, len(candidates_sorted))
    print(f"\n[TOP {TOP_K} candidates]")

    for i, c in enumerate(candidates_sorted[:TOP_K], 1):
        AU = dna_to_rna(c["AU_DNA"])
        SD = dna_to_rna(c["SD_DNA"])
        SPC = dna_to_rna(c["spacer_DNA"])
        OASD = rna_revcomp(SD)

        print(f"\n--- Candidate #{i} ---")
        print("AU (RNA):", AU)
        print("SD (RNA):", SD)
        print("Spacer:  ", SPC)
        print("O-ASD:   ", OASD)
        print("E_orth:", c["E_orth"])
        print("E_cross1:", c["E_cross1"])
        print("E_cross2_min:", c["E_cross2_min"])
        print("hairpin ΔG:", c["mfe_local"])


[INFO] WT_ASD_RNA = CACCUCCUUA
[INFO] DG_MIN, DG_MAX = -6.110290173128505 -5.310290173128505
[INFO] CROSS_CUTOFF = -3.7424900198527995
[INFO] WT_SD_SUBSET size: 300
[INFO] SD 전수조사 시작...
[INFO] GOOD_SD_LIST size = 16
[INFO] Using 1 workers, 500000 samples per worker

[INFO] 총 시도 샘플 수 : ~500000
[INFO] 최종 후보 수   : 162371

[TOP 15 candidates]

--- Candidate #1 ---
AU (RNA): AAAAAUUAAU
SD (RNA): CAUGAG
Spacer:   GCCGAG
O-ASD:    CUCAUG
E_orth: -5.7
E_cross1: -1.5
E_cross2_min: -3.4
hairpin ΔG: -1.090000033378601

--- Candidate #2 ---
AU (RNA): UUAAAUAAUA
SD (RNA): GGUUAG
Spacer:   GCAAUU
O-ASD:    CUAACC
E_orth: -5.7
E_cross1: -2.7
E_cross2_min: -2.3
hairpin ΔG: -2.200000047683716

--- Candidate #3 ---
AU (RNA): AAAAAUAUAA
SD (RNA): GGUUAG
Spacer:   AUUACU
O-ASD:    CUAACC
E_orth: -5.7
E_cross1: -2.7
E_cross2_min: -2.3
hairpin ΔG: -0.6000000238418579

--- Candidate #4 ---
AU (RNA): UUUAAAAAUU
SD (RNA): GUUAGG
Spacer:   ACUACC
O-ASD:    CCUAAC
E_orth: -5.7
E_cross1: -1.8
E_cross2_min: -3.7
h

4096개 SD만 전수조사

50만개의 AU/spacer 랜덤 조합 실행

전체 20억 조합을 계산하는 것이 아니라

20억 중 “가능성이 있는 SD 시퀀스의 subset”만 효율적으로 서치하는 구조

전체 공간(full search space)

AU-rich(10nt): 2¹⁰ = 1,024

SD(6nt): 4⁶ = 4,096

spacer(6nt): 4⁶ = 4,096

총 조합은:
1,024 × 4,096 × 4,096 ≈ 17억(1.7 billion)

각 SD별로:

O-ASD–O-SD ΔG

WT-ASD–O-SD ΔG

O-ASD–WT-SD ΔG

WT 기반 타깃 ΔG 범위

WT 기반 직교성 기준

이 조건을 전부 통과한 SD만 GOOD_SD_LIST에 저장.

보통 4096 중 수십 개~수백 개 정도만 살아남는다.

SD는 “랜덤 4,096개”가 아니라

GOOD_SD_LIST에서만 랜덤 선택

그러면 조합 수는 이렇게 바뀜:

기존 방식

랜덤 SD 중 대부분이 조건 탈락
→ 후처리 단계에서 거의 다 버려짐
→ 후보수 거의 증가 안 함

새로운 방식

SD 전수조사로 “가능성 있는 SD만” 남겨놓고
AU-rich + spacer는 랜덤하게 생성해서
hairpin 조건만 평가

즉 전체 1.7 billion 공간을 다 탐색하는 것이 아니고:


실제로 계산되는 조합 수
✔ SD 전수조사: 4,096개 → GOOD_SD_LIST 크기 (예: 60개)
✔ 2단계 AU/spacer 랜덤: 500,000개

따라서 실제 탐색하는 조합:

GOOD_SD_LIST_size × AU/spacer 랜덤 구성수

예를 들어 GOOD_SD_LIST가 60개라면:

실제 조합 공간은 약:

60 × 매우 많은 AU/spacer 조합 중 50만 랜덤 샘플링


즉 20억 공간 중 “가능성 있는 SD” 부분만 확률적으로 서치하는 구조.

전수조사를 하는 부분은 SD 영역(4096개) 뿐이고,
AU-rich + spacer는 50만 랜덤 샘플만 평가한다.

따라서 전체 4096 × 500,000 = 20억을 계산하는 것이 아니다.