Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 54 additions & 23 deletions assembly2.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Slightly different assembly implementation"""

from pydna.utils import shift_location as _shift_location, flatten
from pydna.utils import shift_location as _shift_location, flatten, location_boundaries as _location_boundaries
from pydna._pretty import pretty_str as _pretty_str
from pydna.common_sub_strings import common_sub_strings as common_sub_strings_str
from pydna.common_sub_strings import terminal_overlap as terminal_overlap_str
Expand All @@ -16,36 +16,45 @@
def ends_from_cutsite(cutsite: tuple[tuple[int,int],AbstractCut], seq: _Dseq):
if cutsite is None:
raise ValueError('None is not supported')
cut_watson, cut_crick = cutsite[0]
enz = cutsite[1]
if enz.ovhg < 0:

cut_watson, cut_crick, ovhg = seq.get_cut_parameters(cutsite, is_left=None)
if ovhg < 0:
# TODO check the edge in circular
return (
("5'", str(seq[cut_watson:cut_crick].reverse_complement()).lower()),
("5'", str(seq[cut_watson:cut_crick]).lower()),
)
elif enz.ovhg > 0:
elif ovhg > 0:
return (
("3'", str(seq[cut_crick:cut_watson]).lower()),
("3'", str(seq[cut_crick:cut_watson].reverse_complement()).lower()),
)

return ('blunt', ''), ('blunt', '')

def restriction_ligation_overlap(seqx: _Dseqrecord, seqy: _Dseqrecord, enzymes=RestrictionBatch):
def restriction_ligation_overlap(seqx: _Dseqrecord, seqy: _Dseqrecord, enzymes=RestrictionBatch, partial=False):
"""Find overlaps. Like in stiky and gibson, the order matters"""
cuts_x = seqx.seq.get_cutsites(*enzymes)
cuts_y = seqy.seq.get_cutsites(*enzymes)
matches = list()
for cut_x, cut_y in _itertools.product(cuts_x, cuts_y):
overlap = sum_is_sticky(
ends_from_cutsite(cut_x, seqx.seq)[0],
ends_from_cutsite(cut_y, seqy.seq)[1]
ends_from_cutsite(cut_y, seqy.seq)[1],
partial
)
if overlap:
left_x = cut_x[0][0] if cut_x[1].ovhg < 0 else cut_x[0][1]
left_y = cut_y[0][0] if cut_y[1].ovhg < 0 else cut_y[0][1]
matches.append((left_x, left_y, overlap))
if not overlap:
continue
x_watson, x_crick, x_ovhg = seqx.seq.get_cut_parameters(cut_x, is_left=False)
y_watson, y_crick, y_ovhg = seqy.seq.get_cut_parameters(cut_y, is_left=True)
# Positions where the overlap would start for full overlap
left_x = x_watson if x_ovhg < 0 else x_crick
left_y = y_watson if y_ovhg < 0 else y_crick

# Correct por partial overlaps
left_x += abs(x_ovhg) - overlap

matches.append((left_x, left_y, overlap))
return matches


Expand Down Expand Up @@ -215,7 +224,7 @@ def assembly_is_valid(fragments, assembly, is_circular, use_all_fragments, fragm
for (u1, v1, _, start_location), (u2, v2, end_location, _) in edge_pairs:
# Incompatible as described in figure above
fragment = fragments[abs(v1)-1]
if not fragment.circular and start_location.parts[-1].end >= end_location.parts[0].end:
if not fragment.circular and _location_boundaries(start_location)[1] >= _location_boundaries(end_location)[1]:
return False

if fragments_only_once:
Expand Down Expand Up @@ -250,7 +259,8 @@ def assemble(fragments, assembly, is_circular):
# Remove trailing overlap
out_dseqrecord = _Dseqrecord(fill_dseq(out_dseqrecord.seq)[:-overlap], features=out_dseqrecord.features, circular=True)
for feature in out_dseqrecord.features:
if feature.location.parts[0].start >= len(out_dseqrecord) or feature.location.parts[-1].end > len(out_dseqrecord):
start, end = _location_boundaries(feature.location)
if start >= len(out_dseqrecord) or end > len(out_dseqrecord):
# Wrap around the origin
feature.location = _shift_location(feature.location, 0, len(out_dseqrecord))

Expand Down Expand Up @@ -308,18 +318,30 @@ def get_assembly_subfragments(fragments: list[_Dseqrecord], subfragment_represen
subfragments = list()
for node, start_location, end_location in subfragment_representation:
seq = fragments[node-1] if node > 0 else fragments[-node-1].reverse_complement()
start = 0 if start_location is None else start_location.parts[0].start
end = None if end_location is None else end_location.parts[-1].end
# Special case, some of it could be handled by better Dseqrecord slicing in the future
if seq.circular and start_location == end_location:
# This could be definitely be done better, but for now it works:
dummy_cut = ((start, end), type('DynamicClass', (), {'ovhg': start-end})())
open_seq = seq.apply_cut(dummy_cut, dummy_cut)
subfragments.append(_Dseqrecord(fill_dseq(open_seq.seq), features=open_seq.features))
continue
subfragments.append(seq[start:end])
subfragments.append(extract_subfragment(seq, start_location, end_location))
return subfragments

def extract_subfragment(seq: _Dseqrecord, start_location: Location, end_location: Location):
"""Extract a subfragment from a sequence, given the start and end locations of the subfragment."""
start = 0 if start_location is None else _location_boundaries(start_location)[0]
end = None if end_location is None else _location_boundaries(end_location)[1]

# Special case, some of it could be handled by better Dseqrecord slicing in the future
if seq.circular and start_location == end_location:
# The overhang is different for origin-spanning features, for instance
# for a feature join{[12:13], [0:3]} in a sequence of length 13, the overhang
# is -4, not 9
ovhg = start-end if end > start else start - end - len(seq)
dummy_cut = ((start, ovhg), None)
open_seq = seq.apply_cut(dummy_cut, dummy_cut)
return _Dseqrecord(fill_dseq(open_seq.seq), features=open_seq.features)

# TODO: remove with https://github.com/BjornFJohansson/pydna/issues/161
if seq.circular and start == end:
return seq.shifted(start)

return seq[start:end]

def is_sublist(sublist, my_list, my_list_is_cyclic=False):
"""Returns True if sublist is a sublist of my_list (can be treated as cyclic), False otherwise.

Expand Down Expand Up @@ -484,6 +506,15 @@ def add_edges_from_match(self, match, u: int, v: int, first: _Dseqrecord, secnd:
_shift_location(SimpleLocation(y_start, y_start + length), 0, len(secnd))]
rc_locs = [locs[0]._flip(len(first)), locs[1]._flip(len(secnd))]

# the parts list of rc_locs that span the origin get inverted when flipped.
# For instance, join{[4:6], [0:1]} in a sequence with length 6 will become
# join{[0:2], [5:6]} when flipped. This removes the meaning of origin-spanning.
# We fix this by flipping the parts list again.
# TODO: Pending on https://github.com/biopython/biopython/issues/4611
for rc_loc in rc_locs:
if len(rc_loc.parts) > 1:
rc_loc.parts = rc_loc.parts[::-1]

combinations = (
(u, v, locs),
(-v, -u, rc_locs[::-1]),
Expand Down
3 changes: 2 additions & 1 deletion dna_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def sum_is_sticky(three_prime_end: tuple[str,str], five_prime_end: tuple[str,str
sticky_seq2 = str(reverse_complement(sticky_seq2))

ovhg_len = min(len(sticky_seq1), len(sticky_seq2))
for i in range(1, ovhg_len+1):
# [::-1] to try the longest overhangs first
for i in range(1, ovhg_len+1)[::-1]:
if sticky_seq1[-i:] == sticky_seq2[:i]:
return i
else:
Expand Down
37 changes: 37 additions & 0 deletions examples/sequences/golden_gate.gb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
LOCUS pj5_00001 27 bp DNA linear SYN 15-JAN-2024
COMMENT teselagen_unique_id: 5adf735aa1811801e17d8aac
FEATURES Location/Qualifiers
misc_feature 10..17
/label="A"
ORIGIN
1 GGTCTCAatt aAAAAAttaa AGAGACC
//

LOCUS pj5_00001 27 bp DNA linear SYN 15-JAN-2024
COMMENT teselagen_unique_id: 5adf735aa1811801e17d8aac
FEATURES Location/Qualifiers
misc_feature complement(10..17)
/label="B"
ORIGIN
1 GGTCTCAtta aCCCCCatat AGAGACC
//

LOCUS pj5_00001 35 bp DNA linear SYN 15-JAN-2024
COMMENT teselagen_unique_id: 5adf735aa1811801e17d8aac
FEATURES Location/Qualifiers
misc_feature 10..17
/label="C"
ORIGIN
1 GGTCTCAata tGGGGGccgg AGAGACC
//

LOCUS pj5_00001 35 bp DNA circular SYN 15-JAN-2024
COMMENT teselagen_unique_id: 5adf735aa1811801e17d8aac
FEATURES Location/Qualifiers
misc_feature 11..25
/label="VectorOUT"
misc_feature 32..36
/label="VectorIN"
ORIGIN
1 TTTTattaAG AGACCTTTTT GGTCTCAccg gTTTT
//
91 changes: 63 additions & 28 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
read_dsrecord_from_json, request_from_addgene
from pydantic_models import PCRSource, PrimerModel, SequenceEntity, SequenceFileFormat, \
RepositoryIdSource, RestrictionEnzymeDigestionSource, StickyLigationSource, \
UploadedFileSource, HomologousRecombinationSource, GibsonAssemblySource
UploadedFileSource, HomologousRecombinationSource, GibsonAssemblySource, RestrictionAndLigationSource, \
AssemblySource
from fastapi.middleware.cors import CORSMiddleware
from Bio.Restriction.Restriction import RestrictionBatch
from urllib.error import HTTPError, URLError
from fastapi.responses import HTMLResponse
from Bio.Restriction.Restriction_Dictionary import rest_dict
from assembly2 import Assembly, assemble, sticky_end_sub_strings, assembly2str, PCRAssembly, gibson_overlap, filter_linear_subassemblies
from assembly2 import Assembly, assemble, sticky_end_sub_strings, assembly2str, PCRAssembly, \
gibson_overlap, filter_linear_subassemblies, restriction_ligation_overlap
# Instance of the API object
app = FastAPI()

Expand All @@ -36,6 +38,16 @@
# TODO limit the maximum size of submitted files


def format_known_assembly_response(source: AssemblySource, out_sources: list[AssemblySource], fragments: list[Dseqrecord], possible_assemblies: list):
"""Common function for assembly sources, when assembly is known"""
# If a specific assembly is requested
for i, s in enumerate(out_sources):
if s == source:
# TODO: remove enumeration by better parsing
return {'sequences': [format_sequence_genbank(assemble(fragments, possible_assemblies[i], s.circular))], 'sources': [s]}
raise HTTPException(400, 'The provided assembly is not valid.')


@app.get('/')
async def greeting(request: Request):
html_content = f"""
Expand Down Expand Up @@ -179,11 +191,11 @@ async def restriction(source: RestrictionEnzymeDigestionSource,
invalid_enzymes = get_invalid_enzyme_names(source.restriction_enzymes)
if len(invalid_enzymes):
raise HTTPException(404, 'These enzymes do not exist: ' + ', '.join(invalid_enzymes))
enzymes = RestrictionBatch(first=[e for e in source.restriction_enzymes if e is not None])

seqr = read_dsrecord_from_json(sequences[0])
# TODO: return error if the id of the sequence does not correspond

enzymes = RestrictionBatch(first=[e for e in source.restriction_enzymes if e is not None])
cutsites = seqr.seq.get_cutsites(*enzymes)
cutsite_pairs = seqr.seq.get_cutsite_pairs(cutsites)
sources = [RestrictionEnzymeDigestionSource.from_cutsites(*p, source.input, source.id) for p in cutsite_pairs]
Expand Down Expand Up @@ -234,11 +246,7 @@ async def sticky_ligation(source: StickyLigationSource,

# If a specific assembly is requested
if source.assembly is not None:
for i, s in enumerate(out_sources):
if s == source:
# TODO: remove enumeration by better parsing
return {'sequences': [format_sequence_genbank(assemble(fragments, possible_assemblies[i], s.circular))], 'sources': [s]}
raise HTTPException(400, 'The provided assembly is not valid.')
return format_known_assembly_response(source, out_sources, fragments, possible_assemblies)

if len(possible_assemblies) == 0:
raise HTTPException(400, 'No ligations were found.')
Expand Down Expand Up @@ -280,11 +288,7 @@ async def pcr(source: PCRSource,

# If a specific assembly is requested
if source.assembly is not None:
for i, s in enumerate(out_sources):
if s == source:
# TODO: remove enumeration by better parsing
return {'sequences': [format_sequence_genbank(assemble(fragments, possible_assemblies[i], s.circular))], 'sources': [s]}
raise HTTPException(400, 'The provided assembly is not valid.')
return format_known_assembly_response(source, out_sources, fragments, possible_assemblies)

if len(possible_assemblies) == 0:
raise HTTPException(400, 'No pair of annealing primers was found. Try changing the annealing settings.')
Expand All @@ -307,8 +311,7 @@ async def homologous_recombination(
# source.input contains the ids of the sequences in the order template, insert
template, insert = [next((read_dsrecord_from_json(seq) for seq in sequences if seq.id == id), None) for id in source.input]

if template.circular or insert.circular:
# TODO: add support for the other cases
if template.circular:
raise HTTPException(400, 'The template and the insert must be linear.')

# If an assembly is provided, we ignore minimal_homology
Expand All @@ -323,10 +326,7 @@ async def homologous_recombination(

# If a specific assembly is requested
if source.assembly is not None:
for i, s in enumerate(out_sources):
if s == source:
return {'sequences': [format_sequence_genbank(assemble([template, insert], possible_assemblies[i], False))], 'sources': [s]}
raise HTTPException(400, 'The provided assembly is not valid.')
return format_known_assembly_response(source, out_sources, [template, insert], possible_assemblies)

if len(possible_assemblies) == 0:
raise HTTPException(400, 'No homologous recombination was found.')
Expand All @@ -342,23 +342,21 @@ async def homologous_recombination(
))
async def gibson_assembly(source: GibsonAssemblySource,
sequences: conlist(SequenceEntity, min_length=1),
minimal_homology: int = Query(40, description='The minimum homology between consecutive fragments in the assembly.'),):
minimal_homology: int = Query(40, description='The minimum homology between consecutive fragments in the assembly.'),
circular_only: bool = Query(False, description='Only return circular assemblies.')):

fragments = [read_dsrecord_from_json(seq) for seq in sequences]

asm = Assembly(fragments, algorithm=gibson_overlap, limit=minimal_homology, use_all_fragments=True, use_fragment_order=False)
circular_assemblies = asm.get_circular_assemblies()
linear_assemblies = filter_linear_subassemblies(asm.get_linear_assemblies(), circular_assemblies, fragments)
possible_assemblies = circular_assemblies + linear_assemblies
print(possible_assemblies)
possible_assemblies = asm.get_circular_assemblies()
if not circular_only:
possible_assemblies += filter_linear_subassemblies(asm.get_linear_assemblies(), possible_assemblies, fragments)

out_sources = [GibsonAssemblySource.from_assembly(id= source.id, input=source.input, assembly=a, circular=(a[0][0] == a[-1][1])) for a in possible_assemblies]

# If a specific assembly is requested
if source.assembly is not None:
for i, s in enumerate(out_sources):
if s == source:
return {'sequences': [format_sequence_genbank(assemble(fragments, possible_assemblies[i], s.circular))], 'sources': [s]}
raise HTTPException(400, 'The provided assembly is not valid.')
return format_known_assembly_response(source, out_sources, fragments, possible_assemblies)

if len(possible_assemblies) == 0:
raise HTTPException(400, 'No terminal homology was found.')
Expand All @@ -367,3 +365,40 @@ async def gibson_assembly(source: GibsonAssemblySource,

return {'sources': out_sources, 'sequences': out_sequences}

@ app.post('/restriction_and_ligation', response_model=create_model(
'RestrictionAndLigationResponse',
sources=(list[RestrictionAndLigationSource], ...),
sequences=(list[SequenceEntity], ...),
),
summary='Restriction and ligation in a single step. Can also be used for Golden Gate assembly.'
)
async def restriction_and_ligation(source: RestrictionAndLigationSource,
sequences: conlist(SequenceEntity, min_length=1),
allow_partial_overlap: bool = Query(False, description='Allow for partially overlapping sticky ends.'),
circular_only: bool = Query(False, description='Only return circular assemblies.')):

fragments = [read_dsrecord_from_json(seq) for seq in sequences]
invalid_enzymes = get_invalid_enzyme_names(source.restriction_enzymes)
if len(invalid_enzymes):
raise HTTPException(404, 'These enzymes do not exist: ' + ', '.join(invalid_enzymes))
enzymes = RestrictionBatch(first=[e for e in source.restriction_enzymes if e is not None])
algo = lambda x, y, l : restriction_ligation_overlap(x, y, enzymes, allow_partial_overlap)

asm = Assembly(fragments, algorithm=algo, use_fragment_order=False, use_all_fragments=True)

possible_assemblies = asm.get_circular_assemblies()
if not circular_only:
possible_assemblies += filter_linear_subassemblies(asm.get_linear_assemblies(), possible_assemblies, fragments)

out_sources = [RestrictionAndLigationSource.from_assembly(id= source.id, input=source.input, assembly=a, circular=(a[0][0] == a[-1][1]), restriction_enzymes=source.restriction_enzymes) for a in possible_assemblies]

# If a specific assembly is requested
if source.assembly is not None:
return format_known_assembly_response(source, out_sources, fragments, possible_assemblies)

if len(possible_assemblies) == 0:
raise HTTPException(400, 'No terminal homology was found.')

out_sequences = [format_sequence_genbank(assemble(fragments, a, s.circular)) for s, a in zip(out_sources, possible_assemblies)]

return {'sources': out_sources, 'sequences': out_sequences}
Loading