Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 201 additions & 41 deletions assembly2.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
)
from pydna._pretty import pretty_str as _pretty_str
from pydna.common_sub_strings import common_sub_strings as common_sub_strings_str
from pydna.common_sub_strings import terminal_overlap as terminal_overlap_str
from pydna.dseqrecord import Dseqrecord as _Dseqrecord
from pydna.dseq import Dseq as _Dseq
import networkx as _nx
Expand All @@ -20,6 +19,57 @@
import regex


def gather_overlapping_locations(locs: list[Location], fragment_length: int):
"""
Turn a list of locations into a list of tuples of those locations, where each tuple contains
locations that overlap. For example, if locs = [loc1, loc2, loc3], and loc1 and loc2 overlap,
the output will be [(loc1, loc2), (loc3,)].
"""
# Make a graph with all the locations as nodes
G = _nx.Graph()
for i, loc in enumerate(locs):
G.add_node(i, location=loc)

# Add edges between nodes that overlap
for i in range(len(locs)):
for j in range(i + 1, len(locs)):
if _locations_overlap(locs[i], locs[j], fragment_length):
G.add_edge(i, j)

# Get groups of overlapping locations
groups = list()
for loc_set in _nx.connected_components(G):
groups.append(tuple(locs[i] for i in loc_set))

# Sort by location of the first element in each group (does not matter which since they are overlapping)
groups.sort(key=lambda x: _location_boundaries(x[0])[0])

return groups


def assembly_checksum(G: _nx.MultiDiGraph, edge_list):
"""Calculate a checksum for an assembly, from a list of edges in the form (u, v, key)."""
checksum_list = list()
for edge in edge_list:
u, v, key = edge
checksum_list.append(G.get_edge_data(u, v, key)['uid'])

return min('-'.join(checksum_list), '-'.join(checksum_list[::-1]))


def unique_linear_paths(G: _nx.MultiDiGraph):
# We remove the begin and end nodes
edge_lists = [x[1:-1] for x in _nx.all_simple_edge_paths(G, 'begin', 'end')]
# For each path, we create a unique checksum
checksums = [assembly_checksum(G, edge_list) for edge_list in edge_lists]
# We remove duplicates
out = list()
for i in range(len(edge_lists)):
if checksums[i] not in checksums[:i]:
out.append(edge_lists[i])
return out


def ends_from_cutsite(cutsite: tuple[tuple[int, int], AbstractCut], seq: _Dseq):
if cutsite is None:
raise ValueError('None is not supported')
Expand All @@ -40,12 +90,28 @@ def ends_from_cutsite(cutsite: tuple[tuple[int, int], AbstractCut], seq: _Dseq):
return ('blunt', ''), ('blunt', '')


def restriction_ligation_overlap(seqx: _Dseqrecord, seqy: _Dseqrecord, enzymes=RestrictionBatch, partial=False):
def restriction_ligation_overlap(
seqx: _Dseqrecord, seqy: _Dseqrecord, enzymes=RestrictionBatch, partial=False, allow_blunt=False
):
"""Find overlaps. Like in stiky and gibson, the order matters"""
cuts_x = seqx.seq.get_cutsites(*enzymes)
cuts_y = seqy.seq.get_cutsites(*enzymes)
# If blunt ends are allowed, something similar to this could be done to allow
# joining with linear sequence ends, but for now it messes up with the only_adjacent_edges
# case
# if allow_blunt:
# if not seqx.circular:
# cuts_x.append(((len(seqx), 0), None))
# if not seqy.circular:
# cuts_y.append(((0, 0), None))
matches = list()
for cut_x, cut_y in _itertools.product(cuts_x, cuts_y):
# A blunt end
if allow_blunt and cut_x[0][1] == cut_y[0][1] == 0:
matches.append((cut_x[0][0], cut_y[0][0], 0))
continue

# Otherwise, test overhangs
overlap = sum_is_sticky(ends_from_cutsite(cut_x, seqx.seq)[0], ends_from_cutsite(cut_y, seqy.seq)[1], partial)
if not overlap:
continue
Expand All @@ -62,6 +128,18 @@ def restriction_ligation_overlap(seqx: _Dseqrecord, seqy: _Dseqrecord, enzymes=R
return matches


def combine_algorithms(*algorithms):
"""Combine algorithms, if any of them returns a match, the match is returned."""

def combined(seqx, seqy, limit):
matches = list()
for algorithm in algorithms:
matches += algorithm(seqx, seqy, limit)
return matches

return combined


def blunt_overlap(seqx: _Dseqrecord, seqy: _Dseqrecord, limit=None):
"""Find blunt overlaps"""
if seqx.seq.three_prime_end()[0] == 'blunt' and seqy.seq.five_prime_end()[0] == 'blunt':
Expand All @@ -73,10 +151,6 @@ def common_sub_strings(seqx: _Dseqrecord, seqy: _Dseqrecord, limit=25):
return common_sub_strings_str(str(seqx.seq).upper(), str(seqy.seq).upper(), limit)


def terminal_overlap(seqx: _Dseqrecord, seqy: _Dseqrecord, limit=25):
return terminal_overlap_str(str(seqx.seq).upper(), str(seqy.seq).upper(), limit)


def gibson_overlap(seqx: _Dseqrecord, seqy: _Dseqrecord, limit=25):
"""
The order matters, we want alignments like:
Expand Down Expand Up @@ -620,9 +694,7 @@ class Assembly:
- Circular assemblies: the first subfragment is not reversed, and has the smallest index in the input fragment list.
use_fragment_order is ignored.
- Linear assemblies:
- If use_fragment_order is False, the first fragment is always in the forward orientation.
- If use_fragment_order is True, the first fragment is always the first fragment in the input list,
in forward or reverse order, and the last one is the last fragment in the input list, in forward or reverse order.
- Using uid (see add_edges_from_match) to identify unique edges.

Parameters
----------
Expand Down Expand Up @@ -720,30 +792,24 @@ def add_edges_from_match(self, match, u: int, v: int, first: _Dseqrecord, secnd:

rc_locs = [locs[0]._flip(len(first)), locs[1]._flip(len(secnd))]

# Needed before https://github.com/biopython/biopython/issues/4611
# the parts list of rc_locs that span the origin used to get inverted when flipped.
# For instance, join{[4:6], [0:1]} in a sequence with length 6 would become
# join{[0:2], [5:6]} when flipped. This removed the meaning of origin-spanning.
# It used to be necessary to flip the parts list again.
# for rc_loc in rc_locs:
# if len(rc_loc.parts) > 1:
# rc_loc.parts = rc_loc.parts[::-1]
# Unique id that identifies the edge in either orientation
uid = f'{u}{locs[0]}:{v}{locs[1]}'

combinations = (
(u, v, locs),
(-v, -u, rc_locs[::-1]),
)

for u, v, l in combinations:
self.G.add_edge(u, v, f'{u}{l[0]}:{v}{l[1]}', locations=l)
self.G.add_edge(u, v, f'{u}{l[0]}:{v}{l[1]}', locations=l, uid=uid)

def format_assembly_edge(self, assembly_edge):
"""Go from the (u, v, key) to the (u, v, locu, locv) format."""
u, v, key = assembly_edge
locu, locv = self.G.get_edge_data(u, v, key)['locations']
return u, v, locu, locv

def get_linear_assemblies(self):
def get_linear_assemblies(self, only_adjacent_edges: bool = False):
"""Get linear assemblies, applying the constrains described in __init__, ensuring that paths represent
real assemblies (see assembly_is_valid). Subassemblies are removed (see remove_subassemblies)."""

Expand All @@ -758,18 +824,15 @@ def get_linear_assemblies(self):
G.add_edge(len(self.fragments), 'end')
G.add_edge(-len(self.fragments), 'end')
else:
# Path must start with forward fragment
for node in filter(lambda x: type(x) is int, G.nodes):
if not self.use_fragment_order and node > 0:
G.add_edge('begin', node)
G.add_edge('begin', node)
G.add_edge(node, 'end')

assemblies = [
tuple(map(self.format_assembly_edge, x[1:-1])) for x in _nx.all_simple_edge_paths(G, 'begin', 'end')
]
return remove_subassemblies(
[a for a in assemblies if assembly_is_valid(self.fragments, a, False, self.use_all_fragments)]
)
assemblies = [tuple(map(self.format_assembly_edge, x)) for x in unique_linear_paths(G)]
out = [a for a in assemblies if assembly_is_valid(self.fragments, a, False, self.use_all_fragments)]
if only_adjacent_edges:
out = [a for a in out if self.assembly_uses_only_adjacent_edges(a, False)]
return remove_subassemblies(out)

def cycle2circular_assemblies(self, cycle):
"""Convert a cycle in the format [1, 2, 3] (as returned by _nx.cycles.simple_cycles) to a list of all possible circular assemblies.
Expand All @@ -784,7 +847,7 @@ def cycle2circular_assemblies(self, cycle):
combine.append([(u, v, key) for key in self.G[u][v]])
return [tuple(map(self.format_assembly_edge, x)) for x in _itertools.product(*combine)]

def get_circular_assemblies(self):
def get_circular_assemblies(self, only_adjacent_edges: bool = False):
"""Get circular assemblies, applying the constrains described in __init__, ensuring that paths represent
real assemblies (see assembly_is_valid)."""
# The constrain of circular sequence is that the first node is the fragment with the smallest index in its initial orientation,
Expand All @@ -793,7 +856,10 @@ def get_circular_assemblies(self):
sorted_cycles = filter(lambda x: x[0] > 0, sorted_cycles)
# cycles.simple_cycles returns lists [1,2,3] not assemblies, see self.cycle2circular_assemblies
assemblies = sum(map(self.cycle2circular_assemblies, sorted_cycles), [])
return [a for a in assemblies if assembly_is_valid(self.fragments, a, True, self.use_all_fragments)]
out = [a for a in assemblies if assembly_is_valid(self.fragments, a, True, self.use_all_fragments)]
if only_adjacent_edges:
out = [a for a in out if self.assembly_uses_only_adjacent_edges(a, True)]
return out

def format_insertion_assembly(self, assembly):
"""Sorts the fragment representing a cycle so that they represent an insertion assembly if possible,
Expand Down Expand Up @@ -834,12 +900,14 @@ def format_insertion_assembly(self, assembly):
shift_by = (edge_pair_index[0] + 1) % len(assembly)
return assembly[shift_by:] + assembly[:shift_by]

def get_insertion_assemblies(self):
def get_insertion_assemblies(self, only_adjacent_edges: bool = False):
"""Assemblies that represent the insertion of a fragment or series of fragment inside a linear construct. For instance,
digesting CCCCGAATTCCCCGAATTC with EcoRI and inserting the fragment with two overhangs into the EcoRI site of AAAGAATTCAAA.
This is not so much meant for the use-case of linear fragments that represent actual linear fragments, but for linear
fragments that represent a genome region. This can then be used to simulate homologous recombination.
"""
if only_adjacent_edges:
raise NotImplementedError('only_adjacent_edges not implemented for insertion assemblies')

# We find cycles first
assemblies = sum(map(self.cycle2circular_assemblies, _nx.cycles.simple_cycles(self.G)), [])
Expand All @@ -853,21 +921,107 @@ def get_insertion_assemblies(self):
if assembly_is_valid(self.fragments, a, False, self.use_all_fragments, is_insertion=True)
]

def assemble_linear(self):
def assemble_linear(self, only_adjacent_edges: bool = False):
"""Assemble linear constructs, from assemblies returned by self.get_linear_assemblies."""
assemblies = self.get_linear_assemblies()
assemblies = self.get_linear_assemblies(only_adjacent_edges)
return [assemble(self.fragments, a, False) for a in assemblies]

def assemble_circular(self):
def assemble_circular(self, only_adjacent_edges: bool = False):
"""Assemble circular constructs, from assemblies returned by self.get_circular_assemblies."""
assemblies = self.get_circular_assemblies()
assemblies = self.get_circular_assemblies(only_adjacent_edges)
return [assemble(self.fragments, a, True) for a in assemblies]

def assemble_insertion(self):
def assemble_insertion(self, only_adjacent_edges: bool = False):
"""Assemble insertion constructs, from assemblies returned by self.get_insertion_assemblies."""
assemblies = self.get_insertion_assemblies()
assemblies = self.get_insertion_assemblies(only_adjacent_edges)
return [assemble(self.fragments, a, False) for a in assemblies]

def get_locations_on_fragments(self) -> dict[int, dict[str, list[Location]]]:
"""Get a dictionary where the keys are the nodes in the graph, and the values are dictionaries with keys
`left`, `right`, containing (for each fragment) the locations where the fragment is joined to another fragment on its left
and right side. The values in `left` and `right` are often the same, except in restriction-ligation with partial overlap enabled,
where we can end up with a situation like this:

GGTCTCCCCAATT and aGGTCTCCAACCAA as fragments

# Partial overlap in assembly 1[9:11]:2[8:10]
GGTCTCCxxAACCAA
CCAGAGGGGTTxxTT

# Partial overlap in 2[10:12]:1[7:9]
aGGTCTCCxxCCAATT
tCCAGAGGTTGGxxAA

Would return
{
1: {'left': [7:9], 'right': [9:11]},
2: {'left': [8:10], 'right': [10:12]},
-1: {'left': [2:4], 'right': [4:6]},
-2: {'left': [2:4], 'right': [4:6]}
}

"""

locations_on_fragments = dict()
for node in self.G.nodes:
this_dict = {'left': list(), 'right': list()}
for edge in self.G.edges(data=True):
for i, key in enumerate(['right', 'left']):
if edge[i] == node:
edge_location = edge[2]['locations'][i]
if edge_location not in this_dict[key]:
this_dict[key].append(edge_location)
this_dict['left'] = sorted(this_dict['left'], key=lambda x: _location_boundaries(x)[0])
this_dict['right'] = sorted(this_dict['right'], key=lambda x: _location_boundaries(x)[0])
locations_on_fragments[node] = this_dict

return locations_on_fragments

def assembly_uses_only_adjacent_edges(self, assembly, is_circular: bool) -> bool:
"""
Check whether only adjacent edges within each fragment are used in the assembly. This is useful to check if a cut and ligate assembly is valid,
and prevent including partially digested fragments. For example, imagine the following fragment being an input for a digestion
and ligation assembly, where the enzyme cuts at the sites indicated by the vertical lines:

x y z
-------|-------|-------|---------

We would only want assemblies that contain subfragments start-x, x-y, y-z, z-end, and not start-x, y-end, for instance.
The latter would indicate that the fragment was partially digested.
"""

locations_on_fragments = self.get_locations_on_fragments()
for node in locations_on_fragments:
fragment_len = len(self.fragments[abs(node) - 1])
for side in ['left', 'right']:
locations_on_fragments[node][side] = gather_overlapping_locations(
locations_on_fragments[node][side], fragment_len
)

allowed_location_pairs = dict()
for node in locations_on_fragments:
if not is_circular:
# We add the existing ends of the fragment
left = [(None,)] + locations_on_fragments[node]['left']
right = locations_on_fragments[node]['right'] + [(None,)]

else:
# For circular assemblies, we add the first location at the end
# to allow for the last edge to be used
left = locations_on_fragments[node]['left']
right = locations_on_fragments[node]['right'][1:] + locations_on_fragments[node]['right'][:1]

pairs = list()
for pair in zip(left, right):
pairs += list(_itertools.product(*pair))
allowed_location_pairs[node] = pairs

fragment_assembly = edge_representation2subfragment_representation(assembly, is_circular)
for node, start_location, end_location in fragment_assembly:
if (start_location, end_location) not in allowed_location_pairs[node]:
return False
return True

def __repr__(self):
# https://pyformat.info
return _pretty_str(
Expand Down Expand Up @@ -921,8 +1075,11 @@ def __init__(self, frags: tuple[_Dseqrecord, _Dseqrecord, _Dseqrecord], limit=25

return

def get_linear_assemblies(self):
def get_linear_assemblies(self, only_adjacent_edges: bool = False):
"""Adds extra constrains to prevent clashing primers."""
if only_adjacent_edges:
raise NotImplementedError('only_adjacent_edges not implemented for PCR assemblies')

assemblies = super().get_linear_assemblies()
# Error if clashing primers
for a in assemblies:
Expand Down Expand Up @@ -974,14 +1131,17 @@ def __init__(self, frags: [_Dseqrecord], limit=25, algorithm=common_sub_strings)

return

def get_circular_assemblies(self):
def get_circular_assemblies(self, only_adjacent_edges: bool = False):
# We don't want the same location twice
assemblies = filter(lambda x: x[0][2] != x[0][3], super().get_circular_assemblies())
assemblies = filter(lambda x: x[0][2] != x[0][3], super().get_circular_assemblies(only_adjacent_edges))
return [a for a in assemblies if assembly_is_valid(self.fragments, a, True, self.use_all_fragments)]

def get_insertion_assemblies(self):
def get_insertion_assemblies(self, only_adjacent_edges: bool = False):
"""This could be renamed splicing assembly, but the essence is similar"""

if only_adjacent_edges:
raise NotImplementedError('only_adjacent_edges not implemented for insertion assemblies')

def splicing_assembly_filter(x):
# We don't want the same location twice
if x[0][2] == x[0][3]:
Expand Down
3 changes: 2 additions & 1 deletion dna_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@


def sum_is_sticky(three_prime_end: tuple[str, str], five_prime_end: tuple[str, str], partial: bool = False) -> int:
"""Return true if the 3' end of seq1 and 5' end of seq2 ends are sticky and compatible for ligation."""
"""Return the overlap length if the 3' end of seq1 and 5' end of seq2 ends are sticky and compatible for ligation.
Return 0 if they are not compatible."""
type_seq1, sticky_seq1 = three_prime_end
type_seq2, sticky_seq2 = five_prime_end

Expand Down
Loading