Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 134 additions & 127 deletions assembly2.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Slightly different assembly implementation"""

from pydna.utils import shift_location as _shift_location
from pydna.utils import shift_location as _shift_location, flatten
from pydna._pretty import pretty_str as _pretty_str
from pydna.common_sub_strings import common_sub_strings
from pydna.dseqrecord import Dseqrecord as _Dseqrecord
Expand All @@ -10,6 +10,125 @@
from Bio.SeqFeature import SimpleLocation
from pydna.utils import shift_location

def assembly2str(assembly):
return str(tuple(f'{u}{lu}:{v}{lv}' for u, v, lu, lv in assembly))

def assembly_is_valid(fragments, assembly, is_circular, use_all_fragments):
"""Function used to filter paths returned from the graph, see conditions tested below.
"""
# Linear assemblies may get begin-1-end, begin-2-end, these are removed here.
if len(assembly) == 0:
return False

if use_all_fragments and len(fragments) != len(set(flatten(map(abs, e[:2]) for e in assembly))):
return False

# Here we check whether subsequent pairs of fragments are compatible, for instance:
# Compatible (overlap of 1 and 2 occurs before overlap of 2 and 3):
# (1,2,[2:9],[0:7]), (2,3,[12:19],[0:7])
# -- A --
# 1 gtatcgtgt -- B --
# 2 atcgtgtactgtcatattc
# 3 catattcaa
# Incompatible (overlap of 1 and 2 occurs after overlap of 2 and 3):
# (1,2,[2:9],[13:20]), (2,3,[0:7],[0:7])
# -- A --
# 1 -- B -- gtatcgtgt
# 2 catattcccccccatcgtgtactgt
# 3 catattcaa
# Redundant: overlap of 1 and 2 ends at the same spot as overlap of 2 and 3
# (1,2,[2:9],[1:8]), (2,3,[0:8],[0:8])
# -- A --
# gtatcgtgt
# catcgtgtactgtcatattc
# catcgtgtactgtcatattc
# -- B ---
if is_circular:
# In a circular assembly, first and last fragment must be the same
if assembly[0][0] != assembly[-1][1]:
return False
edge_pairs = zip(assembly, assembly[1:] + assembly[:1])
else:
edge_pairs = zip(assembly, assembly[1:])

for (u1, v1, _, start_location), (u2, v2, end_location, _) in edge_pairs:
# Incompatible as described in figure above
if start_location.parts[-1].end >= end_location.parts[0].end:
return False

return True

def assemble(fragments, assembly, is_circular):
"""Execute an assembly, from the representation returned by get_linear_assemblies or get_circular_assemblies."""

subfragment_representation = edge_representation2subfragment_representation(assembly, is_circular)

# Length of the overlaps between consecutive assembly fragments
fragment_overlaps = [len(e[-1]) for e in assembly]

subfragments = get_assembly_subfragments(fragments, subfragment_representation)

out_dseqrecord = _Dseqrecord(subfragments[0])

for fragment, overlap in zip(subfragments[1:], fragment_overlaps):
# Shift the features of the right fragment to the left by `overlap`
new_features = [f._shift(len(out_dseqrecord)-overlap) for f in fragment.features]
# Join the left sequence including the overlap with the right sequence without the overlap
out_dseqrecord = _Dseqrecord(out_dseqrecord.seq + fragment.seq[overlap:], features=out_dseqrecord.features + new_features)

# For circular assemblies, close the loop and wrap origin-spanning features
if is_circular:
overlap = fragment_overlaps[-1]
# Remove trailing overlap
out_dseqrecord = _Dseqrecord(out_dseqrecord.seq[:-overlap], features=out_dseqrecord.features, circular=True)
for feature in out_dseqrecord.features:
if feature.location.parts[0].start >= len(out_dseqrecord) or feature.location.parts[-1].end > len(out_dseqrecord):
# Wrap around the origin
feature.location = _shift_location(feature.location, 0, len(out_dseqrecord))

return out_dseqrecord

def edge_representation2subfragment_representation(assembly, is_circular):
"""
Turn this kind of edge representation fragment 1, fragment 2, right edge on 1, left edge on 2
a = [(1, 2, 'loc1a', 'loc2a'), (2, 3, 'loc2b', 'loc3b'), (3, 1, 'loc3c', 'loc1c')]
Into this: fragment 1, left edge on 1, right edge on 1
b = [(1, 'loc1c', 'loc1a'), (2, 'loc2a', 'loc2b'), (3, 'loc3b', 'loc3c')]
"""

if is_circular:
temp = list(assembly[-1:]) + list(assembly)
else:
temp = [(None, assembly[0][0], None, None)] + list(assembly) + [(assembly[-1][1], None, None, None)]
edge_pairs = zip(temp, temp[1:])
subfragment_representation = list()
for (u1, v1, _, start_location), (u2, v2, end_location, _) in edge_pairs:
subfragment_representation.append((v1, start_location, end_location))

return subfragment_representation

def get_assembly_subfragments(fragment, subfragment_representation):
"""From the fragment representation returned by edge_representation2subfragment_representation, get the subfragments that are joined together.

Subfragments are the slices of the fragments that are joined together

For example:
```
-- A --
cccccgtatcgtgtcccccc
-- B --
acatcgtgtactgtcatattc
```
Subfragments: `cccccgtatcgtgt`, `atcgtgtactgtcatattc`
"""
subfragments = list()
for node, start_location, end_location in subfragment_representation:
seq = fragment[node-1] if node > 0 else fragment[-node-1].reverse_complement()
start = 0 if start_location is None else start_location.parts[0].start
end = None if end_location is None else end_location.parts[-1].end
subfragments.append(seq[start:end])
return subfragments

def is_sublist(sublist, my_list):
"""Returns True if sublist is a sublist of my_list, False otherwise.

Expand Down Expand Up @@ -237,49 +356,6 @@ def __init__(self, frags: list[_Dseqrecord], limit=25, algorithm=common_sub_stri

return

def validate_assembly(self, assembly):
"""Function used to filter paths returned from the graph, see conditions tested below.
"""
# Linear assemblies may get begin-1-end, begin-2-end, these are removed here.
if len(assembly) == 0:
return False

is_circular = assembly[0][0] == assembly[-1][1]

if self.use_all_fragments and (len(assembly) - (1 if is_circular else 0) != len(self.fragments) - 1):
return False

# Here we check whether subsequent pairs of fragments are compatible, for instance:
# Compatible (overlap of 1 and 2 occurs before overlap of 2 and 3):
# -- A --
# gtatcgtgt -- B --
# atcgtgtactgtcatattc
# catattcaa
# Incompatible (overlap of 1 and 2 occurs after overlap of 2 and 3):
# -- A --
# -- B -- gtatcgtgt
# catattcccccccatcgtgtactgt
#
# Redundant: overlap of 1 and 2 is at the same spot as overlap of 2 and 3
# -- A --
# gtatcgtgt
# catcgtgtactgtcatattc
# catcgtgtactgtcatattc
# -- B ---
if is_circular:
edge_pairs = zip(assembly, assembly[1:] + assembly[:1])
else:
edge_pairs = zip(assembly, assembly[1:])

for (u1, v1, key1), (u2, v2, key2) in edge_pairs:
left_edge = self.G[u1][v1][key1]['locations']
right_edge = self.G[u2][v2][key2]['locations']

# Incompatible as described in figure above
if left_edge[1].parts[-1].end >= right_edge[0].parts[0].end:
return False

return True

def remove_subassemblies(self, assemblies):
"""Filter out subassemblies, i.e. assemblies that are contained within another assembly.
Expand All @@ -301,6 +377,13 @@ def remove_subassemblies(self, assemblies):

return filtered_assemblies

def format_assembly_edge(self, assembly_edge):
"""Go from the (u, v, key) to the (u, v, locu, locv) format."""
u, v, key = assembly_edge
locu, locv = self.G.get_edge_data(u, v, key)['locations']
return u, v, locu, locv


def get_linear_assemblies(self):
"""Get linear assemblies, applying the constrains described in __init__, ensuring that paths represent
real assemblies (see validate_assembly). Subassemblies are removed (see remove_subassemblies)."""
Expand All @@ -322,11 +405,11 @@ def get_linear_assemblies(self):
G.add_edge('begin', node)
G.add_edge(node, 'end')

return self.remove_subassemblies(filter(self.validate_assembly, map(lambda x : tuple(x[1:-1]), _nx.all_simple_edge_paths(G, 'begin', 'end'))))
assemblies = [tuple(map(self.format_assembly_edge, x[1:-1])) for x in _nx.all_simple_edge_paths(G, 'begin', 'end')]
return self.remove_subassemblies([a for a in assemblies if assembly_is_valid(self.fragments, a, False, self.use_all_fragments)])

def cycle2circular_assemblies(self, cycle):
"""Convert a cycle in the format [1, 2, 3] (as returned by _nx.cycles.simple_cycles) to a list of all possible circular assemblies
in the format of an assembly (like the output of _nx.all_simple_edge_paths).
"""Convert a cycle in the format [1, 2, 3] (as returned by _nx.cycles.simple_cycles) to a list of all possible circular assemblies.

There may be multiple assemblies for a given cycle,
if there are several edges connecting two nodes, for example two overlaps between 1 and 2, and single overlap between 2 and 3 should
Expand All @@ -336,7 +419,7 @@ def cycle2circular_assemblies(self, cycle):
combine = list()
for u, v in zip(cycle, cycle[1:] + cycle[:1]):
combine.append([(u, v, key) for key in self.G[u][v]])
return list(_itertools.product(*combine))
return [tuple(map(self.format_assembly_edge, x)) for x in _itertools.product(*combine)]

def get_circular_assemblies(self):
"""Get circular assemblies, applying the constrains described in __init__, ensuring that paths represent
Expand All @@ -347,93 +430,17 @@ def get_circular_assemblies(self):
sorted_cycles = filter(lambda x: x[0] > 0, sorted_cycles)
# cycles.simple_cycles returns lists [1,2,3] not assemblies, see self.cycle2circular_assemblies
assemblies = sum(map(self.cycle2circular_assemblies, sorted_cycles),[])

return list(filter(self.validate_assembly, assemblies))

def edge_representation2subfragment_representation(self, assembly):
"""
Turn this kind of edge representation fragment 1, fragment 2, right edge on 1, left edge on 2
a = [(1, 2, 'loc1a', 'loc2a'), (2, 3, 'loc2b', 'loc3b'), (3, 1, 'loc3c', 'loc1c')]
Into this: fragment 1, left edge on 1, right edge on 1
b = [(1, 'loc1c', 'loc1a'), (2, 'loc2a', 'loc2b'), (3, 'loc3b', 'loc3c')]
"""

is_circular = assembly[0][0] == assembly[-1][1]
if is_circular:
temp = list(assembly[-1:]) + list(assembly)
else:
temp = [(None, assembly[0][0], None)] + list(assembly) + [(assembly[-1][1], None, None)]
edge_pairs = zip(temp, temp[1:])
alternative_representation = list()
for (u1, v1, key1), (u2, v2, key2) in edge_pairs:
start_location = None if u1 is None else self.G[u1][v1][key1]['locations'][1]
end_location = None if v2 is None else self.G[u2][v2][key2]['locations'][0]
alternative_representation.append((v1, start_location, end_location))

return alternative_representation


def get_assembly_subfragments(self, assembly_repr_fragments):
"""From the fragment representation returned by edge_representation2subfragment_representation, get the subfragments that are joined together.

Subfragments are the slices of the fragments that are joined together

For example:
```
-- A --
cccccgtatcgtgtcccccc
-- B --
acatcgtgtactgtcatattc
```
Subfragments: `cccccgtatcgtgt`, `atcgtgtactgtcatattc`
"""
subfragments = list()
for node, start_location, end_location in assembly_repr_fragments:
seq = self.G.nodes[node]['seq']
start = 0 if start_location is None else start_location.parts[0].start
end = None if end_location is None else end_location.parts[-1].end
subfragments.append(seq[start:end])
return subfragments

def assemble(self, assembly_repr_edges):
"""Execute an assembly, from the edge representation returned by get_linear_assemblies or get_circular_assemblies."""

assembly_repr_fragments = self.edge_representation2subfragment_representation(assembly_repr_edges)

# Length of the overlaps between consecutive assembly fragments
fragment_overlaps = [len(self.G[u][v][key]['locations'][1]) for u, v, key in assembly_repr_edges]

subfragments = self.get_assembly_subfragments(assembly_repr_fragments)

out_dseqrecord = _Dseqrecord(subfragments[0])

for fragment, overlap in zip(subfragments[1:], fragment_overlaps):
# Shift the features of the right fragment to the left by `overlap`
new_features = [f._shift(len(out_dseqrecord)-overlap) for f in fragment.features]
# Join the left sequence including the overlap with the right sequence without the overlap
out_dseqrecord = _Dseqrecord(out_dseqrecord.seq + fragment.seq[overlap:], features=out_dseqrecord.features + new_features)

# For circular assemblies, close the loop and wrap origin-spanning features
if assembly_repr_fragments[0][1] != None:
overlap = fragment_overlaps[-1]
# Remove trailing overlap
out_dseqrecord = _Dseqrecord(out_dseqrecord.seq[:-overlap], features=out_dseqrecord.features, circular=True)
for feature in out_dseqrecord.features:
if feature.location.parts[0].start >= len(out_dseqrecord) or feature.location.parts[-1].end > len(out_dseqrecord):
# Wrap around the origin
feature.location = _shift_location(feature.location, 0, len(out_dseqrecord))

return out_dseqrecord
return [a for a in assemblies if assembly_is_valid(self.fragments, a, True, self.use_all_fragments)]

def assemble_linear(self):
"""Assemble linear constructs, from assemblies returned by self.get_linear_assemblies."""
assemblies = self.get_linear_assemblies()
return list(map(self.assemble, assemblies))
return [assemble(self.fragments, a, False) for a in assemblies]

def assemble_circular(self):
"""Assemble circular constructs, from assemblies returned by self.get_circular_assemblies."""
assemblies = self.get_circular_assemblies()
return list(map(self.assemble, assemblies))
return [assemble(self.fragments, a, True) for a in assemblies]

def __repr__(self):
# https://pyformat.info
Expand Down
24 changes: 8 additions & 16 deletions dna_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def sort_by(list2sort, reference_list):

def get_cutsite_order(dseqrecord: Dseqrecord, enzymes: RestrictionBatch) -> tuple[list[RestrictionType], list[int]]:
"""Return the cutsites in order."""
cuts = enzymes.search(dseqrecord.seq, linear=dseqrecord.linear)
cuts = enzymes.search(dseqrecord.seq, linear=not dseqrecord.circular)
positions = list()
cutsites = list()
for enzyme in cuts:
Expand All @@ -125,14 +125,14 @@ def get_cutsite_order(dseqrecord: Dseqrecord, enzymes: RestrictionBatch) -> tupl
positions = sorted(positions)

# For digestion of linear sequences, the first one and last one are the molecule ends
if dseqrecord.linear:
if not dseqrecord.circular:
cutsites.insert(0, '')
cutsites.append('')
positions.insert(0, 0)
positions.append(len(dseqrecord))

# For digestion of circular sequences, the first one and last one are the same
if not dseqrecord.linear:
if dseqrecord.circular:
cutsites.append(cutsites[0])
positions.append(positions[0])

Expand Down Expand Up @@ -350,16 +350,11 @@ def correct_name(dseq: Dseqrecord):
def location_sorter(x, y) -> int:
"""
Sort by start, then length, then strand.

It's a bit weird for origin-spanning features in circular DNA,
as the start is always zero, ultimately the reason for this is
so that the features are always in the same order even if sets
are used in the function.
"""
if x.start != y.start:
return x.start - y.start
elif x.end != y.end:
return x.end - y.end
if x.parts[0].start != y.parts[0].start:
return x.parts[0].start - y.parts[0].start
elif x.parts[-1].end != y.parts[-1].end:
return x.parts[-1].end - y.parts[-1].end
return x.strand - y.strand


Expand All @@ -381,10 +376,6 @@ def find_sequence_regex(pattern: str, seq: str, is_circular: bool) -> list[Locat

feature_locations = list()

# Below, we use +1 in "% (len(seq) + 1)" because start / end is a range, for instance for a
# string of length 6 spanned entirely, start=0 end=6, so if you want to fold you need
# to add 1 to the end.

# Strand 1
feature_edges = get_all_regex_feature_edges(pattern, seq, is_circular)
# We use shift_location to format origin-spanning features in circular DNA
Expand Down Expand Up @@ -420,3 +411,4 @@ def perform_homologous_recombination(template: Dseqrecord, insert: Dseqrecord, l
if template.circular:
return (template[edges[1]:edges[0]] + insert).looped()
return template[0:edges[0]] + insert + template[edges[1]:]

Loading