In [1]:
import random
import numpy as np
import itertools

Functions for De Bruijn graph

In [2]:
def debruijnize(reads):
    # Initialize sets to store nodes, nodes that are not starts, and list to store edges
    nodes = set()
    not_starts = set()
    edges = []
    
    # Iterate over each read in the input list of reads
    for r in reads:
        # Extract the substrings by removing the last and first characters
        r1 = r[:-1]
        r2 = r[1:]
        
        # Add the substrings to the set of nodes
        nodes.add(r1)
        nodes.add(r2)
        
        # Create an edge between the substrings and append it to the list of edges
        edges.append((r1, r2))
        
        # Add the second substring to the set of nodes that are not starting nodes
        not_starts.add(r2)
    
    # Return a tuple containing the set of nodes, list of edges, and list for start node if exists
    return (nodes, edges, list(nodes - not_starts))

def build_k_mer(str, k):
    # Initialize an empty list to store the k-mers
    k_mers = []
    
    # Iterate over the string with a sliding window of size k
    for i in range(0, len(str) - k + 1):
        # Extract a k-mer substring starting from index i
        k_mer = str[i:k+i]
        
        # Append the extracted k-mer to the list
        k_mers.append(k_mer)
    
    # Return the list of k-mers
    return k_mers

def make_node_edge_map(edges):
    # Initialize an empty dictionary to store node-edge mappings
    node_edge_map = {}
    
    # Iterate over each edge in the list of edges
    for e in edges:
        # Extract the source node from the edge
        n = e[0]
        
        # Check if the source node already exists in the mapping
        if n in node_edge_map:
            # If it exists, append the destination node to its list of connected nodes
            node_edge_map[n].append(e[1])
        else:
            # If it doesn't exist, create a new entry with the source node as key
            # and a list containing the destination node as its value
            node_edge_map[n] = [e[1]]
    
    # Return the node-edge mapping dictionary
    return node_edge_map

def eulerian_trail(m, v):
    # Copy the input node-edge mapping to a local variable
    nemap = m
    
    # Initialize an empty list to store the result trail
    result_trail = []
    
    # Set the starting node for the trail
    start = v
    
    # Append the starting node to the result trail
    result_trail.append(start)
    
    # Loop until the trail is completed
    while(True):
        # Initialize an empty list to store the current trail
        trail = []
        
        # Initialize the previous node to the starting node
        previous = start
        
        # Traverse the current trail until it loops back to the starting node
        while(True):
            # Check if the previous node has any outgoing edges
            if(previous not in nemap):
                break
            
            # Pop the next node from the list of outgoing edges of the previous node
            next = nemap[previous].pop()
            
            # Remove the previous node from the mapping if it has no more outgoing edges
            if(len(nemap[previous]) == 0):
                nemap.pop(previous, None)
            
            # Append the next node to the current trail
            trail.append(next)
            
            # Break the loop if the next node is the starting node, indicating completion of the trail
            if(next == start):
                break
            
            # Update the previous node to the next node for the next iteration
            previous = next
        
        # Combine the current trail with the result trail
        index = result_trail.index(start)
        result_trail = result_trail[0:index + 1] + trail + result_trail[index + 1:len(result_trail)]
        
        # Choose a new start node if there are remaining edges
        if(len(nemap) == 0):
            break
        
        # Find a new start node from the result trail
        found_new_start = False
        for n in result_trail:
            if n in nemap:
                start = n
                found_new_start = True
                break
        
        # Break the loop if no new start node is found
        if not found_new_start:
            break
    
    # Return the result trail
    return result_trail


def assemble_trail(trail):
    # Check if the trail is empty
    if len(trail) == 0:
        return ""
    
    # Initialize the result string with the first node's substring excluding its last character
    result = trail[0][:-1]
    
    # Concatenate the last character of each node in the trail to the result string
    for node in trail:
        result += node[-1]
    
    # Return the assembled result string
    return result

def test_assembly_debruijn(str, k):
    # Generate all k-mers from the input string
    reads = build_k_mer(str, k)
    
    # Construct the De Bruijn graph from the k-mers
    G = debruijnize(reads)
    
    # Create a node-edge mapping from the edges of the De Bruijn graph
    nemap = make_node_edge_map(G[1])
    
    # Choose a starting node for the Eulerian trail
    start = next(iter(G[2])) if (len(G[2]) > 0) else next(iter(G[0]))
    
    # Find the Eulerian trail in the De Bruijn graph
    trail = eulerian_trail(nemap, start)
    
    # Assemble the Eulerian trail into a string
    return assemble_trail(trail)

def generate_random_genome(length):
    nucleotides = ['A', 'C', 'G', 'T']
    genome = ''.join(random.choices(nucleotides, k=length))
    return genome

Class for adjacency matrix

In [3]:
class Adjacency:
    def __init__(self, kmer_length):
        # Initialize the length of k-mers and generate all possible k-mers of that length
        self.kmer_length = kmer_length
        self.all_kmers = self.get_all_kmers()
        
        # Create a mapping from k-mers to their indices in the list of all k-mers
        self.kmer_to_index = {kmer: i for i, kmer in enumerate(self.all_kmers)}
    
    def graph2adjacency(self, k_mers):
        # Determine the size of the adjacency matrix based on the number of all possible k-mers
        matrix_size = len(self.all_kmers)
        
        # Initialize the adjacency matrix with zeros
        adjacency_matrix = np.zeros((matrix_size, matrix_size), dtype=int)
        
        # Populate the adjacency matrix based on the input k-mers
        for kmer, neighbors in k_mers.items():
            row_index = self.kmer_to_index[kmer]
            for neighbor in neighbors:
                col_index = self.kmer_to_index[neighbor]
                adjacency_matrix[row_index, col_index] += 1
        
        # Return the constructed adjacency matrix
        return adjacency_matrix

    def get_all_kmers(self):
        # Generate all possible k-mers of the specified length
        nucleotides = ['A', 'C', 'G', 'T']
        kmers = [''.join(p) for p in itertools.product(nucleotides, repeat=self.kmer_length)]
        return kmers
    
    def adjacency2graph(self, adjacency_matrix):
        # Initialize an empty dictionary to represent the graph
        graph = {}
        
        # Iterate over each k-mer and its index in the kmer_to_index mapping
        for kmer, index in self.kmer_to_index.items():
            neighbors = []
            
            # Iterate over each entry in the row of the adjacency matrix corresponding to the current k-mer
            for i, val in enumerate(adjacency_matrix[index]):
                # If the value is greater than 0, add the corresponding k-mer to the neighbors list
                if val > 0:
                    neighbors.extend([self.all_kmers[i]] * val)
            
            # If the neighbors list is not empty, add the k-mer and its neighbors to the graph
            if neighbors:
                graph[kmer] = neighbors
        
        # Return the constructed graph
        return graph

Genome -> graph -> adjacency matrix -> graph -> genome

In [15]:
genome_length = 6
reads_length = 3
k_mers_length = reads_length - 1

genome = generate_random_genome(genome_length)
print('Generated genome:', genome)

reads = build_k_mer(genome, reads_length)
print('\nReads:', reads)

nodes, edges, start_node = debruijnize(reads)
print('\nDe Bruijn graph nodes:', nodes)
print('\nDe Bruijn graph edges:', edges)
print('\nDe Bruijn graph start node (If a node with an indegree of 0 exists, \nit will be chosen; otherwise, the first node in set will be selected):', start_node)

node_edge_map = make_node_edge_map(edges)
print('\nNode edge mapping:', node_edge_map)

adjacency = Adjacency(k_mers_length)
adjacency_matrix = adjacency.graph2adjacency(node_edge_map)

print('\nAdjacency matrix:', adjacency_matrix)

node_edge_map_from_adjacency = adjacency.adjacency2graph(adjacency_matrix)
print('\nNode edge mapping from adjacency matrix:', node_edge_map_from_adjacency)

start = start_node[0] if (len(start_node) > 0) else next(iter(nodes))
trail = eulerian_trail(node_edge_map_from_adjacency, start)
rec_genome = assemble_trail(trail)
print('\nReconstructed genome from graph:', rec_genome)

Generated genome: TACCGT

Reads: ['TAC', 'ACC', 'CCG', 'CGT']

De Bruijn graph nodes: {'TA', 'CG', 'AC', 'GT', 'CC'}

De Bruijn graph edges: [('TA', 'AC'), ('AC', 'CC'), ('CC', 'CG'), ('CG', 'GT')]

De Bruijn graph start node (If a node with an indegree of 0 exists, 
it will be chosen; otherwise, the first node in set will be selected): ['TA']

Node edge mapping: {'TA': ['AC'], 'AC': ['CC'], 'CC': ['CG'], 'CG': ['GT']}

Adjacency matrix: [[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0

Testing graph -> adjacency matrix -> graph

In this implementation adjacency matrix has size 4^n x 4^n where n is length of k-mers, so matrix will have the same shape for all lengths of genome

In [5]:
def are_dicts_equal(dict1, dict2):
    # Check if keys are the same
    if sorted(dict1.keys()) != sorted(dict2.keys()):
        return False
    
    # Check if values for each key are the same (ignoring order)
    for key in dict1:
        if sorted(dict1[key]) != sorted(dict2[key]):
            return False
    
    return True

n = 1000
genome_length = 1000
reads_length = 4
adjacency = Adjacency(reads_length-1)

errors = 0
for i in range(n):
    str = generate_random_genome(genome_length)
    reads = build_k_mer(str,reads_length)
    G = debruijnize(reads)
    nemap = make_node_edge_map(G[1])
    matrix = adjacency.graph2adjacency(nemap)
    rev = adjacency.adjacency2graph(matrix)
    if not are_dicts_equal(nemap, rev):
        errors+=1

print("Percentage of errors:", errors/n)

Percentage of errors: 0.0


Testing genome -> graph -> genome

In some cases, the graph may correspond to different genome sequences, leading to the possibility of obtaining incorrect results. I attempted to find Fleury's algorithm for a directed multigraph, or any other algorithm for an Eulerian walk, but found nothing except for this code:  https://medium.com/@han_chen/de-bruijn-graph-a-genome-ec910e79dc26

Empirically, I have found that the smaller the difference between the length of the genome and the length of the reads, the lower the probability of error. Additionally, to increase accuracy, one can utilize information about the initial node in the graph. In this implementation, it is precisely determined only if it has an indegree of 0 in the graph

In [6]:
n = 1000
genome_length = 100
reads_length = 7

errors = 0
for i in range(n):
    genome = generate_random_genome(genome_length)
    dbg_genome = test_assembly_debruijn(genome, reads_length)
    if genome != dbg_genome:
        errors+=1
print("Percentage of errors:", errors/n)

Percentage of errors: 0.033


In [7]:
n = 1000
genome_length = 30
reads_length = 7

errors = 0
for i in range(n):
    genome = generate_random_genome(genome_length)
    dbg_genome = test_assembly_debruijn(genome, reads_length)
    if genome != dbg_genome:
        errors+=1
print("Percentage of errors:", errors/n)

Percentage of errors: 0.005


I tried to implement Fleury's algorithm, but its accuracy in reconstructing the genome was lower

In [8]:
# def fleury(graph, start):
#     # Function to check if the given edge is a bridge
#     def is_bridge(u, v):
#         if len(graph[u]) == 1:
#             return True
#         else:
#             visited = set()
#             stack = [u]
#             while stack:
#                 current = stack.pop()
#                 visited.add(current)
#                 if current in graph:
#                     for neighbor in graph[current]:
#                         if neighbor != v:
#                             if neighbor == u:
#                                 return False
#                             if neighbor not in visited:
#                                 stack.append(neighbor)
#             return True

#     # DFS to traverse the graph
#     def dfs(u):
#         if u in graph:
#             for v in graph[u]:
#                 if (u, v) not in visited and (v, u) not in visited:
#                     if not is_bridge(u, v):
#                         visited.add((u, v))
#                         dfs(v)
#                     else:
#                         visited.add((u, v))
#                         visited.add((v, u))
#                         dfs(v)
#         path.append(u)

#     visited = set()
#     path = []

#     dfs(start)
#     path.reverse()
#     return path