In [1]:
import os
import numpy as np
import pandas as pd

In [2]:
def generate_random_sequence_advanced(n, filename=None):
    # numpy의 고급 난수 생성기 설정
    rng = np.random.default_rng()
    
    # 0(A), 1(T), 2(C), 3(G)로 직접 숫자 생성
    sequence = rng.integers(0, 4, size=n, dtype=np.uint8)
    
    if filename is None:
        filename = f'dna_{n}.bin'
    
    # 바이너리 모드로 파일 저장
    try:
        with open(filename, 'wb') as f:
            sequence.tofile(f)
        print(f"시퀀스가 {filename}에 성공적으로 저장되었습니다.")
    except Exception as e:
        print(f"파일 저장 중 오류 발생: {e}")
    
    return sequence

def read_sequence(filename):
    # 저장된 시퀀스를 읽어오는 함수
    try:
        sequence = np.fromfile(filename, dtype=np.uint8)
        print(f"파일에서 {len(sequence)} 개의 염기를 읽었습니다.")
        return sequence
    except Exception as e:
        print(f"파일 읽기 중 오류 발생: {e}")
        return None

def convert_to_bases(sequence):
    # 필요한 경우 숫자를 염기로 변환
    base_map = {0: 'A', 1: 'T', 2: 'C', 3: 'G'}
    return ''.join(base_map[x] for x in sequence)

In [None]:
# 시퀀스 생성 및 저장
sequence = generate_random_sequence_advanced(10**6, "test_dna.bin")

# 저장된 시퀀스 읽기
loaded_sequence = read_sequence("test_dna.bin")

# 염기서열로 변환하여 확인
print(convert_to_bases(loaded_sequence[:20]))  # 처음 20개의 염기 출력

시퀀스가 test_dna.bin에 성공적으로 저장되었습니다.
파일에서 1000000 개의 염기를 읽었습니다.
TGACCTCCATCAGTACCCAA


In [4]:
def generate_reads_with_coverage(file_path, output_csv=None, desired_coverage=30):
    """
    주어진 염기서열과 coverage를 고려하여 리드 테이블을 생성합니다.
    각 케이스는 DBG에 최적화된 리드 길이와 리드 개수를 계산하여 생성합니다.
    
    Parameters:
    - file_path (str): 원본 염기서열이 저장된 파일 경로
    - output_csv (str): 결과를 저장할 CSV 파일 경로 (선택사항)
    - desired_coverage (int): 염기서열 전체에 대한 목표 coverage. 기본값은 30

    Returns:
    - pd.DataFrame: 각 케이스의 리드 길이와 개수를 담은 데이터프레임
    """
    
    # 시퀀스 읽기
    try:
        sequence = np.fromfile(file_path, dtype=np.uint8)
        genome_length = len(sequence)
        print(f"시퀀스 길이: {genome_length}")
    except Exception as e:
        print(f"시퀀스 파일 읽기 오류: {str(e)}")
        return None
        
    read_length_arr = [30,50,75,100]
    
    cases = []
    for i, read_length in enumerate(read_length_arr):
        k_mer_length = (2 * read_length) // 3
        if k_mer_length % 2 == 0:
            k_mer_length -= 1
        k_mer_length = max(15, min(31, k_mer_length))
        
        read_count = int((desired_coverage * genome_length) / read_length)
        read_count = max(2, read_count)
        
        print(f"\n{'='*50}")
        print(f"Case {i+1} 상세 정보:")
        print(f"리드 길이: {read_length}")
        print(f"목표 리드 개수: {read_count}")
        print(f"k-mer 길이: {k_mer_length}")
        print(f"전체 시퀀스 길이: {genome_length}")
        
        reads = []
        max_start_pos = genome_length - read_length
        
        if max_start_pos < 0:
            print("경고: 리드 길이가 시퀀스 길이보다 큽니다!")
            reads.append(sequence[:read_length])
        else:
            # 균등 간격 계산
            step_size = max(1, max_start_pos // (read_count - 1))
            print(f"리드 간 간격: {step_size}")
            
            # 리드 생성 및 검증
            for start_pos in range(0, max_start_pos + 1, step_size):
                if len(reads) < read_count:
                    read = sequence[start_pos:start_pos + read_length]
                    if len(read) == read_length:  # 리드 길이 검증
                        reads.append(read)
                    else:
                        print(f"경고: 잘못된 리드 길이 - 위치: {start_pos}, 길이: {len(read)}")
        
        # 부족한 리드 추가
        remaining_reads = read_count - len(reads)
        if remaining_reads > 0:
            print(f"추가로 생성할 리드 수: {remaining_reads}")
            for _ in range(remaining_reads):
                start_pos = np.random.randint(0, max_start_pos + 1)
                read = sequence[start_pos:start_pos + read_length]
                if len(read) == read_length:
                    reads.append(read)
                else:
                    print(f"경고: 랜덤 리드 생성 실패 - 위치: {start_pos}, 길이: {len(read)}")
        
        # 최종 검증
        print(f"\n최종 결과:")
        print(f"생성된 총 리드 수: {len(reads)}")
        print(f"첫 번째 리드 길이: {len(reads[0])}")
        print(f"마지막 리드 길이: {len(reads[-1])}")
        
        # 리드 길이 분포 확인
        lengths = [len(read) for read in reads]
        if len(set(lengths)) > 1:
            print("경고: 리드 길이가 일정하지 않습니다!")
            print(f"리드 길이 분포: {set(lengths)}")
        
        # 첫 번째와 마지막 리드의 실제 데이터 확인
        print("\n리드 데이터 샘플:")
        print(f"첫 번째 리드: {convert_to_bases(reads[0])[:30]}...")
        print(f"마지막 리드: ...{convert_to_bases(reads[-1])[-30:]}")
        
        cases.append({
            'Case': f'Case {read_count}',
            'Read Length': read_length,
            'Read Count': read_count,
            'Coverage': desired_coverage,
            'k-mer Length': k_mer_length,
            'Reads': reads
        })
    
    # 데이터프레임으로 결과 반환
    df = pd.DataFrame(cases)
    
    # CSV 파일로 저장
    if output_csv:
        try:
            df.to_csv(output_csv, index=False)
            print(f"\n리드 테이블이 성공적으로 {output_csv}에 저장되었습니다.")
        except Exception as e:
            print(f"\nCSV 파일 저장 중 오류 발생: {str(e)}")
    
    return df


In [5]:
# 예제 실행
filename = "test_dna.bin"

# 파일에서 읽어와 리드 테이블 생성
read_table = generate_reads_with_coverage(
    filename, 
    #output_csv="reads_table.csv",
    desired_coverage=30)

시퀀스 길이: 1000000

Case 1 상세 정보:
리드 길이: 30
목표 리드 개수: 1000000
k-mer 길이: 19
전체 시퀀스 길이: 1000000
리드 간 간격: 1
추가로 생성할 리드 수: 29

최종 결과:
생성된 총 리드 수: 1000000
첫 번째 리드 길이: 30
마지막 리드 길이: 30

리드 데이터 샘플:
첫 번째 리드: TGACCTCCATCAGTACCCAATTATAGCCAA...
마지막 리드: ...GTCAACATTGTTAGTTAGACCAAGCCTCCT

Case 2 상세 정보:
리드 길이: 50
목표 리드 개수: 600000
k-mer 길이: 31
전체 시퀀스 길이: 1000000
리드 간 간격: 1

최종 결과:
생성된 총 리드 수: 600000
첫 번째 리드 길이: 50
마지막 리드 길이: 50

리드 데이터 샘플:
첫 번째 리드: TGACCTCCATCAGTACCCAATTATAGCCAA...
마지막 리드: ...GAGCAGCGCTTTCAATCAGGGTATCGGTGG

Case 3 상세 정보:
리드 길이: 75
목표 리드 개수: 400000
k-mer 길이: 31
전체 시퀀스 길이: 1000000
리드 간 간격: 2

최종 결과:
생성된 총 리드 수: 400000
첫 번째 리드 길이: 75
마지막 리드 길이: 75

리드 데이터 샘플:
첫 번째 리드: TGACCTCCATCAGTACCCAATTATAGCCAA...
마지막 리드: ...CGATCCGATCAGATACCCTCTGGAGGGCGT

Case 4 상세 정보:
리드 길이: 100
목표 리드 개수: 300000
k-mer 길이: 31
전체 시퀀스 길이: 1000000
리드 간 간격: 3

최종 결과:
생성된 총 리드 수: 300000
첫 번째 리드 길이: 100
마지막 리드 길이: 100

리드 데이터 샘플:
첫 번째 리드: TGACCTCCATCAGTACCCAATTATAGCCAA...
마지막 리드: ...TGGGCTCAGAACTGCTGGACCATGCGTCAG


In [6]:
read_table

Unnamed: 0,Case,Read Length,Read Count,Coverage,k-mer Length,Reads
0,Case 1000000,30,1000000,30,19,"[[1, 3, 0, 2, 2, 1, 2, 2, 0, 1, 2, 0, 3, 1, 0,..."
1,Case 600000,50,600000,30,31,"[[1, 3, 0, 2, 2, 1, 2, 2, 0, 1, 2, 0, 3, 1, 0,..."
2,Case 400000,75,400000,30,31,"[[1, 3, 0, 2, 2, 1, 2, 2, 0, 1, 2, 0, 3, 1, 0,..."
3,Case 300000,100,300000,30,31,"[[1, 3, 0, 2, 2, 1, 2, 2, 0, 1, 2, 0, 3, 1, 0,..."


In [7]:
# De Bruijn Graph를 이용한 시퀀스 재구성
def construct_debruijn_graph(reads, k):
    edges = {}
    # k-mer 빈도수 추적
    kmer_frequency = {}
    
    for read in reads:
        # 각 리드의 모든 k-mer 추출
        for i in range(len(read) - k + 1):
            kmer = tuple(read[i:i+k])
            next_kmer = tuple(read[i+1:i+k+1])
            
            # k-mer 빈도수 업데이트
            kmer_frequency[kmer] = kmer_frequency.get(kmer, 0) + 1
            
            if kmer not in edges:
                edges[kmer] = {}
            if next_kmer in edges[kmer]:
                edges[kmer][next_kmer] += 1
            else:
                edges[kmer][next_kmer] = 1
    
    # 낮은 빈도수의 k-mer 필터링 (노이즈 제거)
    min_frequency = 2  # 최소 빈도수 임계값
    filtered_edges = {}
    for kmer, next_kmers in edges.items():
        if kmer_frequency[kmer] >= min_frequency:
            filtered_edges[kmer] = {
                next_kmer: count 
                for next_kmer, count in next_kmers.items() 
                if kmer_frequency.get(next_kmer, 0) >= min_frequency
            }
    
    return filtered_edges

def find_eulerian_path(graph):
    # 진입/진출 차수 계산
    in_degree = {}
    out_degree = {}
    
    for node, edges in graph.items():
        out_degree[node] = sum(edges.values())
        for next_node, count in edges.items():
            in_degree[next_node] = in_degree.get(next_node, 0) + count
    
    # 시작 노드 찾기 (진입차수 < 진출차수)
    start = None
    for node in graph:
        if in_degree.get(node, 0) < out_degree.get(node, 0):
            start = node
            break
    
    if not start:
        start = max(graph.keys(), key=lambda x: out_degree.get(x, 0))
    
    path = []
    stack = [(start, [])]  # (현재 노드, 현재까지의 경로)
    
    while stack:
        current, current_path = stack[-1]
        
        if current not in graph or not graph[current]:
            # 더 이상 진행할 수 없는 경우
            path.append(current)
            stack.pop()
            continue
        
        # 다음 노드 선택 (가중치가 가장 높은 엣지)
        next_node = max(graph[current].items(), key=lambda x: x[1])[0]
        
        # 엣지 제거
        graph[current][next_node] -= 1
        if graph[current][next_node] == 0:
            del graph[current][next_node]
        if not graph[current]:
            del graph[current]
        
        stack.append((next_node, current_path + [current]))
    
    return path[::-1]

def reconstruct_sequence(reads, k):
    """
    개선된 시퀀스 재구성 함수
    """
    # De Bruijn Graph 구성
    graph = construct_debruijn_graph(reads, k)
    
    if not graph:
        print("경고: 그래프가 비어있습니다!")
        return ""
    
    # 오일러 경로 찾기
    path = find_eulerian_path(graph)
    
    # 경로를 시퀀스로 변환
    base_map = {0: 'A', 1: 'T', 2: 'C', 3: 'G'}
    
    # 첫 번째 k-mer 전체를 포함
    sequence = [base_map[x] for x in path[0]]
    
    # 나머지 k-mer의 마지막 염기만 추가
    for node in path[1:]:
        sequence.append(base_map[node[-1]])
    
    reconstructed = ''.join(sequence)
    
    print(f"재구성된 시퀀스 길이: {len(reconstructed)}")
    print(f"사용된 고유 k-mer 수: {len(graph)}")
    
    return reconstructed


In [10]:
filename = "test_dna.bin"

# 길이 10,000의 염기서열 생성 및 파일 저장
original_sequence = generate_random_sequence_advanced(10**4, filename)

# 리드 테이블 생성
reads_df = generate_reads_with_coverage(
    file_path=filename,
    #output_csv="reads_table.csv",
    desired_coverage=30
)

for i in range(len(reads_df)):
    # 첫 번째 케이스의 리드들을 이용하여 시퀀스 재구성
    k = reads_df.iloc[i]['k-mer Length']
    reads = reads_df.iloc[i]['Reads']

    print(f"\n재구성 시작:")
    print(f"사용할 k-mer 길이: {k}")
    print(f"리드 개수: {len(reads)}")
    print(f"첫 번째 리드 예시: {convert_to_bases(reads[0])}")

    reconstructed_sequence = reconstruct_sequence(reads, k)

    # 결과 비교
    original_bases = convert_to_bases(original_sequence)
    print(f"\n결과 비교:")
    print(f"원본 시퀀스 길이: {len(original_bases)}")
    print(f"재구성된 시퀀스 길이: {len(reconstructed_sequence)}")
    print(f"\n원본 시퀀스 (처음 100bp): {original_bases[:100]}")
    print(f"재구성 시퀀스 (처음 100bp): {reconstructed_sequence[:100]}")

    # 일치율 계산
    min_len = min(len(original_bases), len(reconstructed_sequence))
    matches = sum(1 for i in range(min_len) if original_bases[i] == reconstructed_sequence[i])
    similarity = (matches / min_len) * 100
    print(f"\n일치율: {similarity:.2f}%")

시퀀스가 test_dna.bin에 성공적으로 저장되었습니다.
시퀀스 길이: 10000

Case 1 상세 정보:
리드 길이: 30
목표 리드 개수: 10000
k-mer 길이: 19
전체 시퀀스 길이: 10000
리드 간 간격: 1
추가로 생성할 리드 수: 29

최종 결과:
생성된 총 리드 수: 10000
첫 번째 리드 길이: 30
마지막 리드 길이: 30

리드 데이터 샘플:
첫 번째 리드: TGTAGCAGCTCTCTCACTTCCGAAAGGACA...
마지막 리드: ...ACCCCCGGAGTTGCGGGGAGGGGGCCTTCA

Case 2 상세 정보:
리드 길이: 50
목표 리드 개수: 6000
k-mer 길이: 31
전체 시퀀스 길이: 10000
리드 간 간격: 1

최종 결과:
생성된 총 리드 수: 6000
첫 번째 리드 길이: 50
마지막 리드 길이: 50

리드 데이터 샘플:
첫 번째 리드: TGTAGCAGCTCTCTCACTTCCGAAAGGACA...
마지막 리드: ...ATGGGTCATATAATGATCATTCATATAACA

Case 3 상세 정보:
리드 길이: 75
목표 리드 개수: 4000
k-mer 길이: 31
전체 시퀀스 길이: 10000
리드 간 간격: 2

최종 결과:
생성된 총 리드 수: 4000
첫 번째 리드 길이: 75
마지막 리드 길이: 75

리드 데이터 샘플:
첫 번째 리드: TGTAGCAGCTCTCTCACTTCCGAAAGGACA...
마지막 리드: ...CTGTGCTCTTGAAATAAGCCCGGTGAAGCG

Case 4 상세 정보:
리드 길이: 100
목표 리드 개수: 3000
k-mer 길이: 31
전체 시퀀스 길이: 10000
리드 간 간격: 3

최종 결과:
생성된 총 리드 수: 3000
첫 번째 리드 길이: 100
마지막 리드 길이: 100

리드 데이터 샘플:
첫 번째 리드: TGTAGCAGCTCTCTCACTTCCGAAAGGACA...
마지막 리드: ...CTTGCCTGATTACTATACGCTAGGACCATG

재