In [1]:
import os
import numpy as np
import pandas as pd
import torch
from tqdm import tqdm
import msprime
import multiprocessing
from torch.utils.data import TensorDataset

# FASTA DNA Sequences -> torch

In [2]:
def get_sequence(lines):
    sequence = ""
    for i in range(1,len(lines)):
        sequence += lines[i][:-1]
    return sequence

In [3]:
#Get the master sequence which we will assume to be the ancestor
file = open("mtDNA/master_sequence.fasta", 'r')
lines = file.readlines()
ancestral_seq = get_sequence(lines)
ancestral_seq = list(ancestral_seq)
print(len(ancestral_seq))

16569


## Root distribution to choose, count from the ancestor

In [4]:
ancestral_seq_cut = ancestral_seq[:4000]

print(f"P(A) = {ancestral_seq_cut.count('A')/len(ancestral_seq_cut):.2f}")
print(f"P(T) = {ancestral_seq_cut.count('T')/len(ancestral_seq_cut):.2f}")
print(f"P(C) = {ancestral_seq_cut.count('C')/len(ancestral_seq_cut):.2f}")
print(f"P(G) = {ancestral_seq_cut.count('G')/len(ancestral_seq_cut):.2f}")

P(A) = 0.32
P(T) = 0.23
P(C) = 0.29
P(G) = 0.16


In [5]:
for i in range(len(ancestral_seq)):
    if ancestral_seq[i] == 'N':
        print(i)
ancestral_seq[3106] = 'A'

3106


In [6]:
alleles = ["A", "T", "C", "G", "N"]
numbers = {allele: i for i, allele in enumerate(alleles)}
numbers["N"] = 0
ancestral_seq_cut_numbers = [numbers[allele] for allele in ancestral_seq_cut]

# From csv to txt

In [8]:
#Load the position_informed_mitochondirial_mutations dataset
df = pd.read_csv("mtDNA/position_informed_mitochondirial_mutations.csv", sep=";")
snp_pos = [df["Variants"][i].split("_") for i in range(len(df))]

In [9]:
#We delete mutations that are amplifications
to_drop = [[j for j in range(len(snp_pos[i])) if (snp_pos[i][j].count("A")+snp_pos[i][j].count("C")+snp_pos[i][j].count("T")+snp_pos[i][j].count("G")+snp_pos[i][j].count("d"))>1] for i in range(len(snp_pos))]
for i in range(len(to_drop)):
    for j in reversed(to_drop[i]):  # To avoid index error/complications
        snp_pos[i].pop(j)

In [10]:
#We create the list of sequences 
sequences = [ancestral_seq.copy() for _ in range(len(snp_pos))]
pos_count = [0 for _ in range(len(ancestral_seq))]
count = 0
for i in range(len(snp_pos)):
    for j in range(len(snp_pos[i])):
        if int(snp_pos[i][j][:-1])>len(ancestral_seq):
            raise ValueError("The snp position is greater than the length of the ancestral sequence")
        if int(snp_pos[i][j][:-1])<1:
            raise ValueError("The snp position is smaller than 1")
        if snp_pos[i][j][-1]==ancestral_seq[int(snp_pos[i][j][:-1])-1]:
            raise ValueError(f"The ancestral sequence is the same as the mutated sequence, i={i}, j={j}")
        sequences[i][int(snp_pos[i][j][:-1])-1] = snp_pos[i][j][-1]
        pos_count[int(snp_pos[i][j][:-1])-1] += 1
        count +=1
print(f"There are {count} mutations in the dataset")
pos = [i for i in range(len(pos_count)) if pos_count[i]>0]
print(f"There are {len(pos)} SNPs in the dataset")
#Count specific mutations from a nucleotide to another
alleles = ["A", "T", "C", "G"]
count = {ancestral_allele: {new_allele: 0 for new_allele in alleles} for ancestral_allele in alleles}
total_count = 0
for i in range(len(snp_pos)):
    for j in range(len(snp_pos[i])):
        mutation_pos = int(snp_pos[i][j][:-1]) - 1
        if mutation_pos in pos:
            total_count +=1
            count[ancestral_seq[int(snp_pos[i][j][:-1])-1]][snp_pos[i][j][-1]] +=1

print(f"Total count: {total_count}")
print(count)

There are 5351 mutations in the dataset
There are 477 SNPs in the dataset
Total count: 5351
{'A': {'A': 0, 'T': 103, 'C': 64, 'G': 1680}, 'T': {'A': 24, 'T': 0, 'C': 1516, 'G': 18}, 'C': {'A': 21, 'T': 921, 'C': 0, 'G': 19}, 'G': {'A': 970, 'T': 3, 'C': 12, 'G': 0}}


In [11]:
#Export sequences with number 0,1,2,3
sequences = sequences[:100]
sequences_cut_numbers = [[numbers[allele] for allele in sequence[:4000]] for sequence in sequences]
torch.save(sequences_cut_numbers, "real_descendants.pt")
torch.save(ancestral_seq_cut_numbers, "real_ancestor.pt")

In [12]:
#ONLY FIRST 4000p
pos = [i for i in pos if i < 4000]
print(f"There are {len(pos)} SNPs in the first 4000 bp")
count = sum([pos_count[i] for i in pos])
print(f"There are {count} mutations in the first 4000 bp")
#Count specific mutations from a nucleotide to another
alleles = ["A", "T", "C", "G"]
count = {ancestral_allele: {new_allele: 0 for new_allele in alleles} for ancestral_allele in alleles}
total_count = 0
for i in range(len(snp_pos)):
    for j in range(len(snp_pos[i])):
        mutation_pos = int(snp_pos[i][j][:-1]) - 1
        if mutation_pos in pos:
            total_count +=1
            count[ancestral_seq[int(snp_pos[i][j][:-1])-1]][snp_pos[i][j][-1]] +=1

print(f"Total count: {total_count}")
print(count)

There are 93 SNPs in the first 4000 bp
There are 1557 mutations in the first 4000 bp
Total count: 1557
{'A': {'A': 0, 'T': 6, 'C': 0, 'G': 678}, 'T': {'A': 11, 'T': 0, 'C': 353, 'G': 0}, 'C': {'A': 0, 'T': 91, 'C': 0, 'G': 0}, 'G': {'A': 409, 'T': 0, 'C': 9, 'G': 0}}


In [13]:
vects = np.eye(16)
alleles = ["A", "T", "C", "G"]
keys = ["AA", "AT", "AC", "AG", "TA", "TT", "TC", "TG", "CA", "CT", "CC", "CG", "GA", "GT", "GC", "GG"]
vects = {key: vects[i] for i, key in enumerate(keys)}

In [14]:
#count frequency of each allele in the dataset, first 4000bp
sequences_cut = [sequences[i][:4000] for i in range(len(sequences))]
p_a = list(np.array(sequences_cut).flatten()).count("A")/np.array(sequences_cut).flatten().shape[0]
p_t = list(np.array(sequences_cut).flatten()).count("T")/np.array(sequences_cut).flatten().shape[0]
p_c = list(np.array(sequences_cut).flatten()).count("C")/np.array(sequences_cut).flatten().shape[0]
p_g = list(np.array(sequences_cut).flatten()).count("G")/np.array(sequences_cut).flatten().shape[0]
print(f"P(A) = {p_a:.4f}")
print(f"P(T) = {p_t:.4f}")
print(f"P(C) = {p_c:.4f}")
print(f"P(G) = {p_g:.4f}")

P(A) = 0.3239
P(T) = 0.2249
P(C) = 0.2876
P(G) = 0.1636


In [15]:
#Keep only the SNPs, not full sequences until position 4000
ancestral_seq_SNPs = [ancestral_seq[i] for i in pos]
SNPs = [[sequences[i][j] for j in pos] for i in range(len(sequences))]
for i in range(len(SNPs)):
    if len(SNPs[i])!=len(SNPs[0]):
        raise ValueError(f"SNP {i} has a different number of SNPs than SNP 0")
snps = np.empty((len(sequences), len(pos) , 16), dtype=np.uint8)
for i in range(len(sequences)):
    for j in range(len(ancestral_seq_SNPs)):
        snps[i][j] = vects[ancestral_seq_SNPs[j]+SNPs[i][j]]
print(snps.shape)

x = np.concatenate([np.expand_dims(np.array(pos).repeat(16).reshape(-1,16), axis=0),snps]) #Concatenate Positions and SNPs
zeros_to_add = 4_000 - x.shape[1]
x = np.pad(x, ((0, 0), (0, zeros_to_add), (0, 0)), mode='constant') # Make the sequences 4000bp long
x = torch.from_numpy(x)[:101] #Keep 100 first sequences + pos row
x = x.unsqueeze(0) #Add a dimension for the batch
Y = torch.zeros(x.shape[0], 17) #Create the Y tensor (even though we don't know the labels, only for fitting purposes)
print(x.shape, Y.shape)

(100, 93, 16)
torch.Size([1, 101, 4000, 16]) torch.Size([1, 17])


In [16]:
torch.save(TensorDataset(x,Y), "mtDNA/real_data/test_dataset_mtDNA_L4.pt")

In [17]:
#Keep full sequences
snps = np.empty((len(sequences), len(sequences[0]) , 16), dtype=np.uint8)
for i in range(len(sequences)):
    for j in range(len(sequences[0])):
        snps[i][j] = vects[ancestral_seq[j]+sequences[i][j]]
print(snps.shape)

x = np.concatenate([np.expand_dims(np.array(np.arange(4000)).repeat(16).reshape(-1,16), axis=0),snps[:,:4000]]) #Concatenate Positions and SNPs
x = torch.from_numpy(x)[:101] #Keep 100 first rows
x = x.unsqueeze(0) #Add a dimension for the batch
Y = torch.zeros(x.shape[0], 17) #Create the Y tensor (even though we don't know the labels, only for fitting purposes)
print(x.shape, Y.shape)

(100, 16569, 16)
torch.Size([1, 101, 4000, 16]) torch.Size([1, 17])


In [18]:
torch.save(TensorDataset(x,Y), "mtDNA/full_seq_real_data/test_dataset_mtDNA_L4.pt")