In [93]:
import random
import numpy as np
import itertools
from Bio import SeqIO
from collections import defaultdict, Counter
import matplotlib.pyplot as plt

In [158]:
# This function mutates num_mutations number of mutations to a diferent nucleotide in a given sequence

def mutate_dna(dna_string, num_mutations):
    dna_list = list(dna_string)
    dna_length = len(dna_list)
    if num_mutations > dna_length:
        raise ValueError("Number of mutations cannot exceed the length of the DNA string.")

    mutation_indices = random.sample(range(dna_length), num_mutations)

    for index in mutation_indices:
        current_base = dna_list[index]
        valid_bases = ['A', 'U', 'C', 'G']
        if current_base not in valid_bases:
            continue
        else:
            valid_bases.remove(current_base) # Ensure the new base is different.
            new_base = random.choice(valid_bases)
            dna_list[index] = new_base

    return "".join(dna_list)  # Convert back to a string

In [103]:
# Returns proportion of k-mers of interest that are found

def proportion_kmers_present(seq,kmers):
    present = 0
    for k in kmers:
        if k in seq:
            present += 1
    return present/len(kmers)

In [96]:
def get_kmers(seq,klen,total_possible_kmers,possible_kmers):
    kmer_counts = defaultdict(int)
    for i in range(0,len(seq)+1-klen):
        kmer = seq[i:i+klen]
        kmer_counts[kmer] += 1
    tmp = []
    tmp2 = []
    for k in possible_kmers:
        freq = kmer_counts.get(k,0)/total_possible_kmers
        tmp.append(str(freq))
        tmp2.append(freq)
    return '\t'.join(tmp),np.array(tmp2)

In [88]:
# returns the reverse complement of a sequence

def revcomp(seq):
    revcomp = ''
    for i in seq[::-1]:
        revcomp += {'A':'U','C':'G','G':'C','U':'A'}.get(i,i)
    return revcomp

In [89]:
# generates a random sequence weighted by the proportion of A's, C's, G's, and U's found in input sequence

def random_seq_generator(seq):
    Counts = Counter(seq)
    L = len(seq)
    A = Counts['A']/L
    C = Counts['C']/L
    G = Counts['G']/L
    U = Counts['U']/L
    return ''.join(random.choices(['A','C','G','U'], weights = [A,C,G,U], k = len(seq))),[str(A),str(C),str(G),str(U)]

In [90]:
f = 'SILVA_138.2_SSURef_NR99_tax_silva.fasta'

In [202]:
## program finds k-mers with highest frequency in 16S sequences

klen = 11
Observation_frequency_threshold = 0.5

kmer_counts = defaultdict(int)
kmers_of_interest = set()
total_seqs = 0

for h,i in enumerate(SeqIO.parse(f,'fasta')):
    id,d,s,L = str(i.id),str(i.description),str(i.seq).upper().replace('T','U'),len(i.seq)
    if 'eukaryota' in d.lower(): continue
    total_seqs += 1
    kmers = set()
    for i in range(0,len(s)+1-klen):
        kmer = s[i:i+klen]
        # rc_kmer = revcomp(kmer)
        # canonical_kmer = min(kmer,rc_kmer)
        kmers.add(kmer)
    for k in kmers:
        kmer_counts[k] += 1
    print(total_seqs,end='\r')
    if total_seqs == 100000: break

for k,v in sorted(kmer_counts.items(),key=lambda x:x[1]):
    if v/total_seqs > Observation_frequency_threshold:
        kmers_of_interest.add(k)
        # print(k,v/total_seqs)

print()
print(f"k-mer length used for analysis: {klen}")
print(f"Observed frequency threshold: {Observation_frequency_threshold}")
print(f"Sequences analyzed: {total_seqs}")
print(f"Total k-mers observed: {len(kmer_counts)}")
print(f"Total k-mers possible: {4**klen}")
print(f"Number of biomarker k-mers identified: {len(kmers_of_interest)}")

100000
k-mer length used for analysis: 11
Observed frequency threshold: 0.5
Sequences analyzed: 100000
Total k-mers observed: 3095050
Total k-mers possible: 4194304
Number of biomarker k-mers identified: 319


In [209]:
## program classifies 16S sequences as natural or modified by the presence of k-mers

print("Decision_Boundary Total_seqs Proportion_seq_mutate TP TN FP FN")

for proportion_seq_mutate in [1.0,0.9,0.8,0.7,0.6,0.5,0.4,0.3,0.2,0.1,0.01]:
    total_seqs = 0
    Decision_Boundary = 0.5 
    TP,TN,FP,FN = 0,0,0,0
    
    for h,i in enumerate(SeqIO.parse(f,'fasta')):
        id,d,s,L = str(i.id),str(i.description),str(i.seq).upper().replace('T','U'),len(i.seq)
        if 'eukaryota' in d.lower(): continue
        total_seqs += 1
        Counts = Counter(s)
        weights = [Counts['A']/L,Counts['C']/L,Counts['G']/L,Counts['U']/L]
        if proportion_seq_mutate == 1.0:
            random_seq = list(s).copy()
            random.shuffle(random_seq)
        else:
            random_seq = mutate_dna(s, int(proportion_seq_mutate*L))
        present_s = proportion_kmers_present(s,kmers_of_interest)
        present_random = proportion_kmers_present(random_seq,kmers_of_interest)
        # print(total_seqs,end='\r') # progress tracker
        if present_s > Decision_Boundary: # True positive
            TP += 1
        elif present_s <= Decision_Boundary: # False negative
            FN += 1
        if present_random > Decision_Boundary: # False positive
            FP += 1
        elif present_random <= Decision_Boundary: # True negative
            TN += 1
        if total_seqs == 10000:
            break
    
    print(Decision_Boundary,total_seqs,proportion_seq_mutate,TP,TN,FP,FN)

# print(f"Real sequences marked as read (True Positive): {TP}")
# print(f"Random sequences marked as fake (True Negative): {TN}")
# print(f"Random sequences marked as real (False Positive): {FP}")
# print(f"Real sequences marked as fake (False Negative): {FN}")

Decision_Boundary Total_seqs Proportion_seq_mutate TP TN FP FN
0.5 10000 1.0 9350 10000 0 650
0.5 10000 0.9 9350 10000 0 650
0.5 10000 0.8 9350 10000 0 650
0.5 10000 0.7 9350 10000 0 650
0.5 10000 0.6 9350 10000 0 650
0.5 10000 0.5 9350 10000 0 650
0.5 10000 0.4 9350 10000 0 650
0.5 10000 0.3 9350 10000 0 650
0.5 10000 0.2 9350 10000 0 650
0.5 10000 0.1 9350 9999 1 650
0.5 10000 0.01 9350 1107 8893 650


In [101]:
# Program generates boxplots for frequency of observing each k-mer in 16S sequences

klen = 7

possible_kmers = [''.join(k) for k in itertools.product("ACGU", repeat=klen)]
total_seqs = 0

with open("16S_rRNA_kmer_count_matrix.txt",'w') as out:
    out.write('SeqID\tState\tFreq_A\tFreq_C\tFreq_G\tFreq_U\t'+'\t'.join(possible_kmers)+'\n')
    for h,i in enumerate(SeqIO.parse(f,'fasta')):
        id,d,s,L = str(i.id),str(i.description),str(i.seq).upper().replace('T','U'),len(i.seq)
        if 'eukaryota' in d.lower(): continue
        Counts = Counter(s)
        weights = [Counts['A']/L,Counts['C']/L,Counts['G']/L,Counts['U']/L]
        total_seqs += 1
        random_seq = list(s).copy()
        random.shuffle(random_seq)
        weights = '\t'.join(weights)
        total_possible_kmers = L - klen + 1
        s_kmers_print,s_kmers = get_kmers(s,klen,total_possible_kmers,possible_kmers)
        random_kmers_print,random_kmers = get_kmers(random_seq,klen,total_possible_kmers,possible_kmers)
        # diff = sum((s_kmers - random_kmers))/len(random_kmers)
        # print(diff)
        out.write(f"{d}\t{'natural'}\t{weights}\t{s_kmers_print}\n")
        out.write(f"{d}\t{'random_weighted_seq'}\t{weights}\t{random_kmers_print}\n")
        print(total_seqs,end='\r')
        if total_seqs == 10: break


12345678910