In [14]:
import os
import random

from collections import defaultdict
from math import ceil

import distinctipy
from PIL import Image, ImageDraw, ImageFont


random.seed(234)


REFERENCE = "CCAACACAACCACCCACACAAAAACCAAAACA"
READ_POS = [(21,28), (22, 30), (17, 26), (18,27), (7,14), (13,20), (16, 25), (3,11), (0, 10), (6, 13)]
READS = [REFERENCE[start:end] for start, end in READ_POS]

def reads_with_errors(reads):
    SWAP_NUCLEOTIDE = {"C": "A", "A": "C"}
    return [(read[:-1] + SWAP_NUCLEOTIDE[read[-1]]) for read in reads]
    
#READS = []
KMER_SIZE = 3
# Open the image
IMAGE_HEIGHT = 600
IMAGE_WIDTH = 800
TEXT_HEIGHT = 14
RECT_MARGIN = 5
TEXT_WIDTH = 9.5
TEXT_SIZE = 14

footer = True and KMER_SIZE == 3
with_reads = True if READS else False
with_error = False
body = not with_reads

IMAGE_HEIGHT = ceil((len(REFERENCE) - 2) * TEXT_HEIGHT + 4)
IMAGE_WIDTH = ceil(len(REFERENCE) * TEXT_WIDTH + 2)

kmer_stats, max_kmer_cnt = get_kmer_stats(REFERENCE, KMER_SIZE)
max_mult = max_kmer_cnt
if with_reads:
    reads_colors = get_reads_colors(READS)
    kmer_stats_reads, max_kmer_read_cnt = get_kmer_stats_reads(READS, reads_colors, KMER_SIZE, with_error=with_error)
    max_mult = max_kmer_read_cnt

HEADER_HEIGHT = TEXT_HEIGHT + len(READS) * TEXT_HEIGHT if with_reads else 0
BODY_HEIGHT = (len(REFERENCE) - KMER_SIZE + 2) * TEXT_HEIGHT if body else 0
FOOTER_HEIGHT = (max_mult + 1) * TEXT_HEIGHT if footer else TEXT_HEIGHT
IMAGE_HEIGHT = HEADER_HEIGHT + FOOTER_HEIGHT + BODY_HEIGHT

image_size = (IMAGE_WIDTH, IMAGE_HEIGHT)
img = Image.new(mode="RGB", size=image_size, color="white")

# Create an ImageDraw object
draw = ImageDraw.Draw(img)
draw.rectangle(((0, 0), (IMAGE_WIDTH - 1, IMAGE_HEIGHT-1)), outline="blue")

# Define the text, position, color, and font
text = REFERENCE
position = (0, 0)
color = "black"  # white
font = ImageFont.load_default(size=TEXT_SIZE)
add_reference_header(draw, text, font, text_color=color, fill_color="lightblue")

if with_reads:
    for row, (t, p) in enumerate(zip(READS, READ_POS)):
        pos = (p[0] * TEXT_WIDTH, (row + 1) * TEXT_HEIGHT)
       
        if with_error:
            add_kmer(draw, t[:-1], font, text_color=color, fill_color=reads_colors[t], position=pos)
            add_kmer(draw, reads_with_errors(t[-1])[0], font, text_color=color, fill_color="yellow", position=(pos[0] + (len(t)-1) * TEXT_WIDTH, pos[1]))
        else:
            add_kmer(draw, t, font, text_color=color, fill_color=reads_colors[t], position=pos)
# Add the text to the image
#draw.text(position, text, fill=color, font=font)

if body:
    for idx in range(0, len(REFERENCE) - KMER_SIZE + 1):
        position = (1 + TEXT_WIDTH * idx, HEADER_HEIGHT + TEXT_SIZE * (idx + 1))
        kmer = REFERENCE[idx : idx + KMER_SIZE]
        #fill_color = (255, 0 , 0)
        add_kmer(draw, kmer, font, text_color=color, fill_color=kmer_stats[kmer]["color"], position=position)


#left, top, right, bottom = draw.textbbox(position, text, font=font)
#draw.rectangle((left, top-1, right, bottom+1), fill=fill_color)
#draw.text(position, text, font=font, fill=color)
if footer:
    if with_reads:
        kmer_positions = get_multiplicity(kmer_stats_reads, image_height=IMAGE_HEIGHT, image_width=IMAGE_WIDTH, text_width=TEXT_WIDTH, text_height=TEXT_HEIGHT, interleave=False)
        for kmer, positions in kmer_positions.items():
            for idx, position in enumerate(positions):
                if kmer_stats_reads[kmer]["errors"][idx]:
                    add_kmer(draw, kmer[:-1], font, text_color=color, fill_color=kmer_stats_reads[kmer]["colors"][idx], position=position)
                    add_kmer(draw, kmer[-1], font, text_color=color, fill_color="yellow", position=(position[0] + (len(kmer)-1) * TEXT_WIDTH, position[1]))

                else:
                    add_kmer(draw, kmer, font, text_color=color, fill_color=kmer_stats_reads[kmer]["colors"][idx], position=position)
    else:
        kmer_positions = get_multiplicity(kmer_stats, image_height=IMAGE_HEIGHT, image_width=IMAGE_WIDTH, text_width=TEXT_WIDTH, text_height=TEXT_HEIGHT)
        
        for kmer, positions in kmer_positions.items():
            for position in positions:
                add_kmer(draw, kmer, font, text_color=color, fill_color=kmer_stats[kmer]["color"], position=position)

# Save the image
wr = "_reads_" if with_reads else ""
wb = "_kmers_" if body else ""
wm = "_mult_" if not with_reads else "_cov_"
wm = wm if footer else ""
we = "_err_" if with_error else ""
img.save(f"./dbg{wr}{wb}{wm}{we}_K{KMER_SIZE}.png")
#img.save(f'./colored_text_boxes_with_text_K{KMER_SIZE}.png')
#img.show()
sequences = READS if not with_error else reads_with_errors(READS)
sequences = [REFERENCE] if not (with_reads or with_error) else sequences
de_bruijn_graph = generate_de_bruijn_graph(sequences=sequences, k=KMER_SIZE+1)
output_dot_format(de_bruijn_graph, f'de_bruijn_graph{wr}{wb}{wm}{we}_K{KMER_SIZE}.dot', kmer_stats)
os.system(f"dot -Tpng de_bruijn_graph{wr}{wb}{wm}{we}_K{KMER_SIZE}.dot -o de_bruijn_graph{wr}{wb}{wm}{we}_K{KMER_SIZE}.png")

0

In [3]:
def add_reference_header(draw_obj, text, font, text_color, fill_color, position = (1, 0)):
    left, top, right, bottom = draw.textbbox(position, text, font=font)
    draw.rectangle((left, top-1, right, bottom+1), fill=fill_color)
    draw.text(position, text, font=font, fill=color)
    ##return draw_obj

In [4]:
def add_kmer(draw_obj, text, font, text_color, fill_color, position = (0, 0)):
    left, top, right, bottom = draw.textbbox(position, text, font=font)
    draw.rectangle((left, top-1, right, bottom+1), fill=fill_color)
    draw.text(position, text, font=font, fill=color)

In [5]:
def get_multiplicity(kmer_stats, image_height, image_width, text_width, text_height, interleave=True):
    num_kmers, len_kmer = len(kmer_stats), len(list(kmer_stats)[0])
    # caclulate remaining white space after all kmers are written out and divide it equally between kmers
    spacing = (image_width - num_kmers * len_kmer * text_width) / (num_kmers - 1)
    spacings = [spacing] * num_kmers
    kmer_positions = defaultdict(list)
    for kmer, stats in kmer_stats.items():
        for cnt in range(stats["cnt"]):
            extra_space = sum(spacings[:stats["idx"]]) if interleave else text_width*stats["idx"]
            x_position = 1 + stats["idx"] * len_kmer * text_width + extra_space
            y_position = image_height - text_height * (cnt + 1) - 3
            kmer_positions[kmer].append((x_position, y_position))
    return kmer_positions

In [6]:
def get_kmer_stats(reference, kmer_size):
    kmers = set()
    kmer_stats = dict()
    max_kmer_cnt = 0
    for idx in range(len(reference) - kmer_size + 1):
        kmer = reference[idx : idx + kmer_size]
        if kmer not in kmers:
            kmer_stats[kmer] = {"idx": len(kmers), "cnt": 1}
            kmers.add(kmer)
        else:
            kmer_stats[kmer]["cnt"] += 1
        max_kmer_cnt = max(max_kmer_cnt, kmer_stats[kmer]["cnt"])
    
    colors = distinctipy.get_colors(len(kmers))

    for kmer, color in zip(kmer_stats, colors):
        kmer_stats[kmer]["color"] = distinctipy.get_hex(color)
    return kmer_stats, max_kmer_cnt

In [7]:
def get_kmer_stats_reads(reads, read_colors, kmer_size, with_error=False):
    kmers = set()
    kmer_stats = dict()
    max_kmer_cnt = 0
    for read_idx, read in enumerate(reads):
        
        read_mod = reads_with_errors([read])[0] if with_error else read
        for idx in range(len(read) - kmer_size + 1):
            kmer = read_mod[idx : idx + kmer_size]
            if kmer not in kmers:
                kmer_stats[kmer] = {"idx": len(kmers), "cnt": 1}
                kmers.add(kmer)
                kmer_stats[kmer]["colors"] = [read_colors[read]]
                kmer_stats[kmer]["errors"] = [False]
            else:
                kmer_stats[kmer]["cnt"] += 1
                kmer_stats[kmer]["colors"].append(read_colors[read])
                kmer_stats[kmer]["errors"].append(False)

            if idx == len(read) - kmer_size and with_error:
                # hardcoded, only last becomes error
                kmer_stats[kmer]["errors"][-1] = True

            max_kmer_cnt = max(max_kmer_cnt, kmer_stats[kmer]["cnt"])
    
    return kmer_stats, max_kmer_cnt

In [8]:
def get_reads_colors(reads):
    colors = distinctipy.get_colors(len(reads))
    return {r: distinctipy.get_hex(c) for c, r in zip(colors, reads)}

In [9]:
def generate_de_bruijn_graph(sequences, k):
    graph = {}
    
    # Create k-mers from each sequence
    for sequence in sequences:
        for i in range(len(sequence) - k + 1):
            kmer = sequence[i:i + k]
            #print(kmer)
            prefix = kmer[:-1]
            suffix = kmer[1:]
            
            # Add edges to the graph
            if prefix not in graph:
                graph[prefix] = []
            graph[prefix].append(suffix)
    
    return graph


In [10]:
def output_dot_format(graph, output_file, kmer_stats):
    with open(output_file, 'w') as f:
        f.write('digraph DeBruijnGraph {\n')
        for node, neighbors in graph.items():
            for neighbor in neighbors:
                f.write(f'  "{node}" -> "{neighbor}";\n')

        for i, node in enumerate(graph.keys()):
            color = kmer_stats[node]["color"]

            f.write(f'  "{node}" [fillcolor="{color}", style=filled];\n')
        f.write('}\n')