<a href="https://colab.research.google.com/github/sihuniiii/mytest/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import random

def generate_reference_to_file(filename, length, chunk_size=10_000_000):
    bases = ['A', 'C', 'G', 'T']
    with open(filename, 'w') as f:
        written = 0
        while written < length:
            to_write = min(chunk_size, length - written)
            seq_chunk = ''.join(random.choices(bases, k=to_write))
            f.write(seq_chunk)
            written += to_write

def simulate_ancient_reads_to_file(
    reference_file,
    reads_filename,
    truth_filename,
    read_len,
    num_reads,
    mutation_rate=0.02
):
    with open(reference_file, 'r') as f:
        reference_seq = f.read().strip()
    L = len(reference_seq)

    with open(reads_filename, 'w') as rf, open(truth_filename, 'w') as tf:
        for i in range(num_reads):
            start = random.randint(0, L - read_len)
            original = list(reference_seq[start : start + read_len])
            mutated = []
            for j, base in enumerate(original):
                prob = mutation_rate
                if j < 5 or j >= read_len - 5:
                    prob *= 5
                if random.random() < prob:
                    if base == 'C':
                        mutated.append('T')
                    elif base == 'G':
                        mutated.append('A')
                    else:
                        mutated.append(random.choice(['A', 'C', 'G', 'T']))
                else:
                    mutated.append(base)
            read_str = ''.join(mutated)
            rf.write(read_str + '\n')
            tf.write(f"{start}\n")

def run_simulation():
    settings = [
        (1_000_000,100_000,"1M", "100K"),
        (10_000_000,1_000_000,"10M", "1M"),
        (100_000_000,10_000_000,"100M","10M"),
        (1_000_000_000,100_000_000,"1T", "100M"),
    ]

    # Reference 생성
    for ref_len, _, ref_tag, _ in settings:
        ref_filename = f"reference_{ref_tag}.txt"
        if not os.path.exists(ref_filename):
            generate_reference_to_file(ref_filename, ref_len)

    for ref_len, num_reads, ref_tag, read_tag in settings:
        ref_filename   = f"reference_{ref_tag}.txt"
        reads_filename = f"mammoth_reads_{read_tag}.txt"
        truth_filename = f"ground_truth_{read_tag}.txt"
        read_len = 100

        if not (os.path.exists(reads_filename) and os.path.exists(truth_filename)):
            simulate_ancient_reads_to_file(
                reference_file=ref_filename,
                reads_filename=reads_filename,
                truth_filename=truth_filename,
                read_len=read_len,
                num_reads=num_reads,
                mutation_rate=0.01
            )

if __name__ == "__main__":
    run_simulation()

> Reference file generated: reference_1M.txt (length=1000000)
> Reference file generated: reference_10M.txt (length=10000000)
> Reference file generated: reference_100M.txt (length=100000000)
> Reference file generated: reference_1T.txt (length=1000000000)
> Simulating reads for reference_1M.txt ...
  - mammoth_reads_100K.txt: simulated 10000/100000 reads
  - mammoth_reads_100K.txt: simulated 20000/100000 reads
  - mammoth_reads_100K.txt: simulated 30000/100000 reads
  - mammoth_reads_100K.txt: simulated 40000/100000 reads
  - mammoth_reads_100K.txt: simulated 50000/100000 reads
  - mammoth_reads_100K.txt: simulated 60000/100000 reads
  - mammoth_reads_100K.txt: simulated 70000/100000 reads
  - mammoth_reads_100K.txt: simulated 80000/100000 reads
  - mammoth_reads_100K.txt: simulated 90000/100000 reads
  - mammoth_reads_100K.txt: simulated 100000/100000 reads
> Reads simulated: mammoth_reads_100K.txt (num_reads=100000, read_len=100)
> Ground truth saved: ground_truth_100K.txt
> Simulat

In [None]:
import os

#파일 불러오기
def load_reference_to_string(filename):
    with open(filename, 'r') as f:
        return f.read().strip()

def load_reads_from_file(filename):
    with open(filename, 'r') as f:
        return [line.strip() for line in f]

def load_truth_positions(filename):
    return [int(line.strip()) for line in open(filename, 'r')]


# Minimizer 인덱스
def build_minimizer_index(reference, k=20, w=8, max_occ=500):
    index = {}
    num_kmers = len(reference) - k + 1
    for i in range(num_kmers - w + 1):
        window_kmers = [reference[j:j + k] for j in range(i, i + w)]
        mn = min(window_kmers)
        mn_pos = i + window_kmers.index(mn)
        index.setdefault(mn, []).append(mn_pos)

    filtered = {m: poses for m, poses in index.items() if len(poses) <= max_occ}
    return filtered

# 매칭 알고리즘
def minimizer_match(
    reference,index,read,
    k=20,w=8,
    max_mismatch=2,seed_min=2
):
    read_kmers = [read[i:i + k] for i in range(len(read) - k + 1)]
    read_min_pairs = []
    for i in range(len(read_kmers) - w + 1):
        window = read_kmers[i:i + w]
        mn = min(window)
        mn_idx = window.index(mn)
        read_pos = i + mn_idx
        read_min_pairs.append((mn, read_pos))

    delta_counts = {}
    for mn, rpos in read_min_pairs:
        if mn not in index:
            continue
        for refpos in index[mn]:
            delta = refpos - rpos
            delta_counts[delta] = delta_counts.get(delta, 0) + 1

    candidates = [d for d, cnt in delta_counts.items() if cnt >= seed_min]
    best_pos, best_mm = -1, max_mismatch + 1

    for delta in candidates:
        if delta < 0 or delta + len(read) > len(reference):
            continue
        window_seq = reference[delta : delta + len(read)]
        mismatches = sum(1 for a, b in zip(read, window_seq) if a != b)
        if mismatches <= max_mismatch and mismatches < best_mm:
            best_mm = mismatches
            best_pos = delta

    return best_pos, best_mm

# 복원 함수
def reconstruct_genome_with_reads(
    reference,
    reads,
    truth_positions,
    index,
    k=20, w=8, max_mismatch=2, seed_min=2
):
    reconstructed = list(reference)
    matched_reads = 0
    total_reads = len(reads)

    for i, read in enumerate(reads):
        true_pos = truth_positions[i]
        pred_pos, mm = minimizer_match(
            reference, index, read,
            k=k, w=w, max_mismatch=max_mismatch, seed_min=seed_min
        )
        # mismatch 허용 범위에서 복원
        if pred_pos == true_pos and mm <= max_mismatch:
            for j, base in enumerate(read):
                reconstructed[pred_pos + j] = base
            matched_reads += 1

    return ''.join(reconstructed), matched_reads, total_reads

# 정확도 계산
def evaluate_reconstruction(reference, reconstructed):
    assert len(reference) == len(reconstructed)
    L = len(reference)
    matches = sum(1 for a, b in zip(reference, reconstructed) if a == b)
    return matches / L

# 매핑 및 평가
def run_mapping_and_evaluation():

    K = 20
    W = 8
    MAX_OCC = 500
    MAX_MM = 2
    SEED_MIN = 2

    pairs = [
        ("reference_1M.txt",  "mammoth_reads_100K.txt",  "ground_truth_100K.txt"),
        ("reference_10M.txt", "mammoth_reads_1M.txt",   "ground_truth_1M.txt"),
        ("reference_100M.txt","mammoth_reads_10M.txt",  "ground_truth_10M.txt"),
        #("reference_1T.txt",  "mammoth_reads_100M.txt", "ground_truth_100M.txt"),
    ]

    for ref_file, read_file, truth_file in pairs:
        if not (os.path.exists(ref_file) and os.path.exists(read_file) and os.path.exists(truth_file)):
            print(f"> Skipping {ref_file} / {read_file} / {truth_file}: 파일이 존재하지 않음.")
            continue

        print(f"\n=== Processing {ref_file} & {read_file} ===")
        reference = load_reference_to_string(ref_file)
        reads = load_reads_from_file(read_file)
        truth_positions = load_truth_positions(truth_file)

        print("> Building minimizer index ...")
        index = build_minimizer_index(reference, k=K, w=W, max_occ=MAX_OCC)
        print(f"  - Unique minimizers after filtering: {len(index)}")

        print("> Performing mapping & reconstruction ...")
        reconstructed, matched_reads, total_reads = reconstruct_genome_with_reads(
            reference, reads, truth_positions, index,
            k=K, w=W, max_mismatch=MAX_MM, seed_min=SEED_MIN
        )

        read_level_acc = matched_reads / total_reads * 100
        base_level_acc = evaluate_reconstruction(reference, reconstructed) * 100

        print(f"  * Read-level mapping accuracy (exact 위치 매칭 & ≤{MAX_MM} mismatch): "
              f"{matched_reads}/{total_reads} = {read_level_acc:.2f}%")
        print(f"  * Base-level reconstruction accuracy: {base_level_acc:.2f}%")

if __name__ == "__main__":
    run_mapping_and_evaluation()



=== Processing reference_1M.txt & mammoth_reads_100K.txt ===
> Building minimizer index ...
  - Unique minimizers after filtering: 241634
> Performing mapping & reconstruction ...
  * Read-level mapping accuracy (exact 위치 매칭 & ≤2 mismatch): 87743/100000 = 87.74%
  * Base-level reconstruction accuracy: 99.08%

=== Processing reference_10M.txt & mammoth_reads_1M.txt ===
> Building minimizer index ...
  - Unique minimizers after filtering: 2416185
> Performing mapping & reconstruction ...
  * Read-level mapping accuracy (exact 위치 매칭 & ≤2 mismatch): 875852/1000000 = 87.59%
  * Base-level reconstruction accuracy: 99.07%

=== Processing reference_100M.txt & mammoth_reads_10M.txt ===
> Building minimizer index ...


In [2]:
!pwd

/content


In [None]:
import random
import sys

# ----------------------------
# 1) Reference 생성 및 저장
# ----------------------------
def generate_artificial_elephant_genome(length=10000):
    """
    코끼리 유전체처럼 보이는 염기서열을 인위적으로 생성.
    length: 생성 길이 (기본 10,000bp; 실제 테스트 시 더 늘려도 됨)
    """
    bases = ['A', 'C', 'G', 'T']
    genome = ''.join(random.choices(bases, k=length))
    return genome

def save_reference_as_txt(sequence, filename="reference_10K.txt"):
    with open(filename, "w") as f:
        f.write(sequence)

# ----------------------------
# 2) Short Read 시뮬레이션
# ----------------------------
def load_reference(filename="reference_10K.txt"):
    """
    저장된 reference 파일을 한 줄 문자열로 읽어서 반환.
    """
    with open(filename, "r") as f:
        return f.read().strip()

def simulate_ancient_reads(
    reference_seq,
    read_len=100,
    num_reads=10000,
    mutation_rate=0.01
):
    """
    reference_seq에서 num_reads개 만큼 랜덤하게 잘라내고,
    “고대 DNA 특성”에 따라 말단(C→T, G→A) 변이 삽입.
    반환: (reads_list, truth_list)
      - reads_list: 생성된 read 문자열 리스트
      - truth_list: (read_id, start_position, mutation_count) 튜플 리스트
    """
    reads = []
    truth = []
    L = len(reference_seq)
    for i in range(num_reads):
        # 0 ~ L-read_len 사이에서 랜덤 시작점 선택
        start = random.randint(0, L - read_len)
        original = list(reference_seq[start:start + read_len])
        mutated = []
        mutation_count = 0

        for j in range(read_len):
            # 말단(j<5 또는 j>=read_len-5)에선 변이 확률을 5배 증가
            prob = mutation_rate
            if j < 5 or j >= read_len - 5:
                prob *= 5

            base = original[j]
            if random.random() < prob:
                # C→T, G→A 우선 치환. 나머지는 랜덤 선택
                if base == 'C':
                    mutated.append('T')
                elif base == 'G':
                    mutated.append('A')
                else:
                    mutated.append(random.choice(['A', 'C', 'G', 'T']))
                mutation_count += 1
            else:
                mutated.append(base)

        read_str = ''.join(mutated)
        reads.append(read_str)
        truth.append((f'read_{i}', start, mutation_count))

    return reads, truth

def save_reads_and_truth(
    reads,
    truth,
    reads_filename="mammoth_reads_10K.txt",
    truth_filename="ground_truth_10K.txt"
):
    # reads만 한 줄씩 저장
    with open(reads_filename, "w") as f:
        for read in reads:
            f.write(read + "\n")
    # ground truth 위치(start position)만 저장
    with open(truth_filename, "w") as f:
        for _, pos, _ in truth:
            f.write(f"{pos}\n")

# ---------------------------------------------------------
# 3) 개선된 Minimizer 인덱스 구축 + Seed Chaining 매칭
# ---------------------------------------------------------
def build_minimizer_index(reference, k=20, w=8, max_occ=500):
    """
    reference: ACGT 문자열
    k: k-mer 길이
    w: 윈도우 크기 (k-mer 개수)
    max_occ: 인덱스에서 허용할 minimizer 최대 출현 횟수
             (너무 자주 나오는 minimizer는 필터링)
    반환: { minimizer_kmer: [pos1, pos2, ...], ... }
    """
    index = {}
    num_kmers = len(reference) - k + 1
    # i: 윈도우의 시작 오프셋(0 ≤ i ≤ num_kmers - w)
    for i in range(num_kmers - w + 1):
        window_kmers = [reference[j:j+k] for j in range(i, i + w)]
        mn = min(window_kmers)
        mn_pos = i + window_kmers.index(mn)  # minimizer의 실제 reference 상 위치
        index.setdefault(mn, []).append(mn_pos)

    # 고빈도 minimizer 제거: 등장 빈도가 max_occ 초과면 제외
    filtered = {m: poses for m, poses in index.items() if len(poses) <= max_occ}
    return filtered

def minimizer_match(
    reference,
    index,
    read,
    k=20,
    w=8,
    max_mismatch=2,
    seed_min=2
):
    """
    reference: 전체 ACGT 문자열
    index: build_minimizer_index 결과물
    read: 하나의 read 문자열
    k, w: build 때 사용한 k-mer, 윈도우 크기
    max_mismatch: 허용 mismatch 개수
    seed_min: seed chaining 시, 동일 delta(오프셋) 후보에 필요한 최소 시드 개수

    동작:
      1) read에서 sliding 최소 minimizer(k-mer)를 구하고, (minimizer, read 내 위치) 쌍 생성
      2) 각 minimizer가 reference 인덱스에서 등장하는 모든 pos마다 (refpos - readpos = delta) 계산하여 빈도 집계
      3) delta 빈도가 seed_min 이상인 δ(후보 alignment 오프셋)만 후보로 선정
      4) 후보 δ마다 read 전체와 reference[δ:δ+len(read)] 간 mismatch를 계산 → best 위치 반환

    반환: (best_pos, best_mismatch)
      - best_pos = 예측된 reference 상 alignment 시작 오프셋,
        예측 실패 시 -1 반환.
      - best_mismatch = 해당 위치에서 계산된 mismatch 개수
    """
    read_kmers = [read[i:i+k] for i in range(len(read) - k + 1)]
    # read 내 윈도우마다 minimizer와 read내 위치를 찾음
    read_min_pairs = []
    for i in range(len(read_kmers) - w + 1):
        window = read_kmers[i:i + w]
        mn = min(window)
        # minimizer 위치: read에서 i부터 i+w-1 사이에 처음 등장한 위치
        mn_idx_in_window = window.index(mn)
        read_pos = i + mn_idx_in_window
        read_min_pairs.append((mn, read_pos))

    # (refpos - readpos = delta) 빈도 세기
    delta_counts = {}
    for (mn, rpos) in read_min_pairs:
        if mn not in index:
            continue
        for refpos in index[mn]:
            delta = refpos - rpos
            delta_counts[delta] = delta_counts.get(delta, 0) + 1

    # seed_min 개수 이상 지원하는 δ만 후보로
    candidates = [d for d, cnt in delta_counts.items() if cnt >= seed_min]
    best_pos, best_mm = -1, max_mismatch + 1

    for delta in candidates:
        if delta < 0 or delta + len(read) > len(reference):
            continue
        window_seq = reference[delta : delta + len(read)]
        # simple Hamming distance(불일치 개수) 계산
        mismatches = sum(1 for a, b in zip(read, window_seq) if a != b)
        if mismatches <= max_mismatch and mismatches < best_mm:
            best_mm = mismatches
            best_pos = delta

    return best_pos, best_mm

# ---------------------------------------------------------
# 4) 전체 파이프라인 실행
# ---------------------------------------------------------
if __name__ == "__main__":
    # ───────────
    # 1) Reference 생성 & 저장
    # ───────────
    # (테스트를 위해 length=10000; 실제 1M 이상으로 늘려서 쓰세요)
    REF_LEN = 10000
    reference_seq = generate_artificial_elephant_genome(length=REF_LEN)
    save_reference_as_txt(reference_seq, filename="reference_10K.txt")
    print(f"> Generated & saved reference length={REF_LEN}")

    # ───────────
    # 2) Short Read 시뮬레이션 & 저장
    # ───────────
    read_length = 100       # read 길이(예: 100bp)
    num_reads = 10000       # read 개수
    mutation_rate = 0.01    # 기본 변이율
    reads, truth = simulate_ancient_reads(
        reference_seq,
        read_len=read_length,
        num_reads=num_reads,
        mutation_rate=mutation_rate
    )
    save_reads_and_truth(
        reads,
        truth,
        reads_filename="mammoth_reads_10K.txt",
        truth_filename="ground_truth_10K.txt"
    )
    print(f"> Simulated & saved {num_reads} ancient reads + ground truth")

    # ───────────
    # 3) 인덱스 구축
    # ───────────
    # tuning parameter
    K = 20           # k-mer 길이
    W = 8            # 윈도우 크기
    MAX_OCC = 500    # 고빈도 minimizer 허용 최대 등장횟수
    index = build_minimizer_index(
        reference_seq,
        k=K,
        w=W,
        max_occ=MAX_OCC
    )
    print(f"> Built minimizer index (k={K}, w={W}, max_occ={MAX_OCC}) with {len(index)} keys")

    # ───────────
    # 4) 매핑 & 정확도 평가
    # ───────────
    MAX_MM = 2       # 허용 mismatch 개수
    SEED_MIN = 2     # seed chaining 최소 시드 개수

    correct = 0
    for i, read in enumerate(reads):
        true_pos = truth[i][1]  # ground truth start position
        pred_pos, mm = minimizer_match(
            reference_seq,
            index,
            read,
            k=K,
            w=W,
            max_mismatch=MAX_MM,
            seed_min=SEED_MIN
        )
        if pred_pos == true_pos:
            correct += 1

    accuracy = correct / num_reads * 100
    print(f"\n>> Reconstruction accuracy (≤{MAX_MM} mismatches): {accuracy:.2f}%")

    # ───────────
    # (추가) 매핑 결과를 파일에 기록하고 싶다면 주석 해제
    # ───────────
    with open("mapping_results.txt", "w") as outf:
         outf.write("read_id\ttrue_pos\tpred_pos\tmismatch\n")
         for idx, read in enumerate(reads):
             tp = truth[idx][1]
             pp, mm = minimizer_match(
                 reference_seq, index, read,
                 k=K, w=W, max_mismatch=MAX_MM, seed_min=SEED_MIN
             )
             outf.write(f"read_{idx}\t{tp}\t{pp}\t{mm}\n")
    print("> Mapping results saved to mapping_results.txt")


> Generated & saved reference length=10000
> Simulated & saved 10000 ancient reads + ground truth
> Built minimizer index (k=20, w=8, max_occ=500) with 2429 keys

>> Reconstruction accuracy (≤2 mismatches): 87.84%
> Mapping results saved to mapping_results.txt
