In [1]:
import dasi

In [88]:
import pytest
from os.path import dirname, abspath, join
from pyblast.utils import load_genbank_glob, load_fasta_glob, make_linear, make_circular
from dasi import BioBlastFactory

PRIMERS = "primers"
TEMPLATES = "templates"
QUERIES = "queries"

def blast_factory(here):
    factory = BioBlastFactory()

    primers = make_linear(
        load_fasta_glob(join(here, "data/test_data/primers/primers.fasta"))
    )
    templates = load_genbank_glob(join(here, "data/test_data/genbank/templates/*.gb"))
    queries = make_circular(
        load_genbank_glob(
            join(here, "data/test_data/genbank/designs/pmodkan-ho-pact1-z4-er-vpr.gb")
        )
    )

    factory.add_records(primers, PRIMERS)
    factory.add_records(templates, TEMPLATES)
    factory.add_records(queries, QUERIES)

    return factory

here = './tests'
factory = blast_factory(here)

  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))
  % location_line))


In [89]:
from dasi import AlignmentContainer, Constants
from dasi.utils import perfect_subject
import networkx as nx
from os.path import join

def assembly_graph(blast_factory, here):
    blast = blast_factory("templates", "queries")
    primer_blast = blast_factory("primers", "queries")

    blast.quick_blastn()
    primer_blast.quick_blastn_short()

    results = blast.get_perfect()
    primer_results = primer_blast.get_perfect()

    primer_results = [p for p in primer_results if perfect_subject(p["subject"])]
    print("Number of perfect primers: {}".format(len(primer_results)))
    # primer_results = [p for p in primer_results if p['subject']['start'] == 1]

    # combine the sequence databases (for now)
    seqdb = {}
    seqdb.update(blast.seq_db.records)
    seqdb.update(primer_blast.seq_db.records)

    container = AlignmentContainer(seqdb)

    # load the results (potential PCR Products)
    container.load_blast_json(results, Constants.PCR_PRODUCT)

    # load the primer results
    container.load_blast_json(primer_results, Constants.PRIMER)

    # group by query_regions
    groups = container.alignment_groups

    print("Number of types: {}".format(len(container.groups_by_type)))
    print("Number of groups: {}".format(len(groups)))

    # build assembly graph
    G = container.build_assembly_graph()
    # print()
    print("=== Assembly Graph ===")
    print(nx.info(G))
    assert G.number_of_edges()

    # print_graph
#     nx.write_gexf(G, join(here, "out", "graph.gexf"))
    
    return {
        "G": G,
        "container": container,
    }

In [90]:
results = assembly_graph(factory, here)
results

CMD: makeblastdb -dbtype nucl -title ac247bf8-72c3-4c51-b103-8691c91818e3 -out /var/folders/fr/yc9x0z2s39lcj4f9d95wvrpr0000gn/T/tmpordm9jsa/ac247bf8-72c3-4c51-b103-8691c91818e3 -in /var/folders/fr/yc9x0z2s39lcj4f9d95wvrpr0000gn/T/tmpvwstr_bc.fasta
CMD: blastn -db /var/folders/fr/yc9x0z2s39lcj4f9d95wvrpr0000gn/T/tmpordm9jsa/ac247bf8-72c3-4c51-b103-8691c91818e3 -out /var/folders/fr/yc9x0z2s39lcj4f9d95wvrpr0000gn/T/tmpordm9jsa/tmp1r50qezf -query /var/folders/fr/yc9x0z2s39lcj4f9d95wvrpr0000gn/T/tmphhjmysj0.fasta -outfmt "7 qacc sacc score evalue bitscore length nident gapopen gaps qlen qstart qend slen sstart send sstrand qseq sseq"
CMD: makeblastdb -dbtype nucl -title 02b3ab3d-2a0c-4472-9b30-fb176607cd38 -out /var/folders/fr/yc9x0z2s39lcj4f9d95wvrpr0000gn/T/tmpo41v63ma/02b3ab3d-2a0c-4472-9b30-fb176607cd38 -in /var/folders/fr/yc9x0z2s39lcj4f9d95wvrpr0000gn/T/tmpqb9ozrig.fasta
CMD: blastn -db /var/folders/fr/yc9x0z2s39lcj4f9d95wvrpr0000gn/T/tmpo41v63ma/02b3ab3d-2a0c-4472-9b30-fb176607cd38 -

{'G': <networkx.classes.digraph.DiGraph at 0xa356c1898>,
 'container': <dasi.AlignmentContainer at 0x1a5d173f60>}

In [99]:
import pylab as plt
import seaborn as sns
import numpy as np
from numpy import inf
from more_itertools import pairwise

def shortest_path_matrix(G):
    nodelist = G.nodes()
    matrix = np.array(nx.floyd_warshall_numpy(G, nodelist=nodelist, weight='weight'))
    return matrix

def plot_shortest_path_matrix(matrix):
    plot_matrix = matrix.copy()
    plot_matrix[plot_matrix == inf] = 10000
    plot_matrix = np.nan_to_num(plot_matrix)

    fig = plt.figure(figsize=(12,10))
    ax = fig.gca()
    step = 1
    sns.heatmap(plot_matrix[::step, ::step], ax=ax)
    plt.show()
    
def shortest_cycle_candidates(matrix, nodelist):
    cycles = []
    paths = []
    for i, _ in enumerate(matrix):
        for j, _ in enumerate(matrix[i]):
            a = matrix[i, j]
            b = matrix[j, i]
            if i == j:
                continue

            anode = nodelist[i]
            bnode = nodelist[j]
            if a != np.inf:
                paths.append((anode, bnode, a))
            if b != np.inf:
                paths.append((bnode, anode, b))
            if a != np.inf and b != np.inf:
                cycles.append((anode, bnode, a, b, a + b))
    cycles = sorted(cycles, key=lambda c: c[-1])
    paths = sorted(paths, key=lambda c: c[-1])
    return cycles, paths

def true_path(G, c):
    print(c[0])
    path1 = nx.shortest_path(G, c[0], c[1], weight='weight')
    path2 = nx.shortest_path(G, c[1], c[0], weight='weight')
    path = path1 + path2[1:]
    edges = [G[p[0]][p[1]] for p in pairwise(path)]
    return path, edges

G = results['G']
container = results['container']
matrix = shortest_path_matrix(G)
# plot_shortest_path_matrix(matrix)
cycles, paths = shortest_cycle_candidates(matrix, list(G.nodes()))

path, edges = true_path(G, cycles[0])

(4227, 5920, 1, 'PCR_PRODUCT')


In [100]:
print(edges)

[{'weight': 205.89, 'r1': '<Region (4227, 5920, 9408) True start=0>', 'r2': '<Region (5947, 4219, 9408) True start=0>', 'name': 'synthesis'}, {'weight': 204.56, 'r1': '<Region (5947, 4219, 9408) True start=0>', 'r2': '<Region (4227, 5920, 9408) True start=0>', 'name': 'synthesis'}]


In [92]:
path, edges = true_path(G, cycles[0])

for start, end, _, product_type in path:
    groups = [a for a in container.alignment_groups if a.query_region.start == start and a.query_region.end == end]
    for g in groups:
        alignments = g.alignments
        a = g.alignments[0]
        
        subject_key = a.subject_key
        seqdb = container.seqdb
        record = seqdb.get(subject_key)
        
        print("[{}:{}] < {}[{}:{}] {}".format(
            a.query_region.start, 
            a.query_region.end, 
            record.name, 
            a.subject_region.start, 
            a.subject_region.end,
            a.type)
        )

(4227, 5920, 1, 'PCR_PRODUCT')
[4227:5920] < hCas9-VPR[4851:6544] PCR_PRODUCT
[5947:4219] < pMODKan-HO-pACT1-ZEV4[4422:4219] PCR_PRODUCT
[4227:5920] < hCas9-VPR[4851:6544] PCR_PRODUCT


# Synthesis

In [102]:
import itertools
import json
import os
import time
from tqdm import tqdm
import numpy as np
from dasi.log import logger


HERE = '.'


def digitize_dictionary(mydict, min, max):
    bins = list(mydict.keys())
    x = np.arange(min, max + 1)
    key_indices = np.digitize(x, bins)
    values = []
    for index in key_indices:
        if index == len(bins):
            index = index - 1
        values.append(mydict[bins[index]])
    return dict(zip(x, values))


def timeit(method):
    def timed(*args, **kw):
        ts = time.time()
        print(f"Starting {method.__name__}")
        result = method(*args, **kw)
        te = time.time()
        if 'log_time' in kw:
            name = kw.get('log_name', method.__name__.upper())
            kw['log_time'][name] = int((te - ts) * 1000)
        else:
            print(f"{method.__name__} took {(te-ts)*1000} ms\n")
        return result

    return timed


class GibsonAssemblyCost(object):
    # -----------------------------------------------
    # Parameters
    # -----------------------------------------------

    PRIMER_EXTENSION = (0, 40)
    PRIMER_COST = 0.60  # / bp
    ULTRAMER_EXTENSION = (40, 180)
    ULTRAMER_COST = 0.80  # / bp
    INF = 1000000.0

    GENE_SYNTHESIS_COST_RANGES = [
        {"lower": 0, "upper": 0, "cost": 0.0},
        {"lower": 100, "upper": 500, "cost": 89.0},
        {"lower": 501, "upper": 750, "cost": 129.0},
        {"lower": 751, "upper": 1000, "cost": 149.0},
        {"lower": 1001, "upper": 1250, "cost": 209.0},
        {"lower": 1251, "upper": 1500, "cost": 249.0},
        {"lower": 1501, "upper": 1750, "cost": 289.0},
        {"lower": 1751, "upper": 2000, "cost": 329.0},
        {"lower": 2001, "upper": INF, "cost": INF},
    ]

    SYNTHESIS_GAP_SPAN_RANGE = (-300, 1000, 10)
    SYNTHESIS_SIZE_OPTIONS = (100, 2000, 10)

    JUNCTION_EFFICIENCY = [
        {"lower": -INF, "upper": 10, "cost": 0.0},
        {"lower": 11, "upper": 19, "cost": 0.1},
        {"lower": 20, "upper": 30, "cost": 0.75},
        {"lower": 31, "upper": 100, "cost": 0.9},
        {"lower": 101, "upper": INF, "cost": 0.0},
    ]

    JUNCTION_GAP_SPAN_RANGE = (-300, 1000, 5)

    def __init__(self):
        self._gap_cost_dict_both_extendable = None
        self._gap_cost_dict_one_extendable = None
        self._gap_cost_dict_none_extendable = None
        self._gap_cost_dict_synthesis_none_extendable = None
        self._gap_cost_dict_synthesis_one_extendable = None
        self._gap_cost_dict_synthesis_extendable = None

    # -----------------------------------------------
    # Properties
    # -----------------------------------------------
    @property
    def synthesis_gap_bins(self):
        return np.arange(*self.SYNTHESIS_GAP_SPAN_RANGE)

    @property
    def junction_gap_bins(self):
        return np.arange(*self.JUNCTION_GAP_SPAN_RANGE)

    # -----------------------------------------------
    # Primer, Ultramer, Synthesis options
    # -----------------------------------------------

    def primer_extension_options(self):
        primer_options = []
        for extension in range(*self.PRIMER_EXTENSION):
            primer_options.append(
                {
                    "extension": extension,
                    "cost": self.PRIMER_COST * (20.0 + extension),
                    "name": "primer"
                }
            )
        return primer_options
        # primer_options.append((0, 0, "no primer"))

    def ultramer_extension_options(self):
        ultramer_options = []
        for extension in range(*self.ULTRAMER_EXTENSION):
            ultramer_options.append(
                {
                    "extension": extension,
                    "cost": self.ULTRAMER_COST * (20.0 + extension),
                    "name": "ultra-mer"
                }
            )
        return ultramer_options

    def synthesis_options(self):
        synthesis_options = []
        for gene_size in range(*self.SYNTHESIS_SIZE_OPTIONS):
            synthesis_options.append(
                {
                    "size": gene_size,
                    "cost": self._synthesis_cost(gene_size),
                    "name": "synthesis"
                }
            )
        return synthesis_options

    # -----------------------------------------------
    # Methods
    # -----------------------------------------------

    @staticmethod
    def find_in_range(x, ranges, lower_label="lower", upper_label="upper", label="cost"):
        for range in ranges:
            lower = range[lower_label]
            upper = range[upper_label]
            if lower <= x <= upper:
                return range[label]

    def _synthesis_cost(self, size):
        return self.find_in_range(size, self.GENE_SYNTHESIS_COST_RANGES)

    def _junction_efficiency(self, overlap):
        return self.find_in_range(overlap, self.JUNCTION_EFFICIENCY)

    def _optimize_primer(self, primer_choices):
        """
        Create a dictionary of gap_span to best_extension case.

        :param gap_span_start:
        :type gap_span_start:
        :param gap_span_end:
        :type gap_span_end:
        :param gap_span_step:
        :type gap_span_step:
        :param primer_choices:
        :type primer_choices:
        :return:
        :rtype:
        """
        results = {}

        # iterate through gap span
        for gap_span in tqdm(range(*self.JUNCTION_GAP_SPAN_RANGE), desc='optimizing primers'):
            results[gap_span] = None
            for choice in primer_choices:
                if not isinstance(choice, tuple):
                    choice = (choice,)
                extensions = [x['extension'] for x in choice]
                costs = [x['cost'] for x in choice]
                names = [x['name'] for x in choice]

                total_extension = sum(extensions)
                total_cost = sum(costs)
                junction_efficiency = self._junction_efficiency(-gap_span + total_extension)

                # score
                score = self.INF
                if junction_efficiency > 0:
                    score = total_cost / junction_efficiency

                _x = results[gap_span]
                if _x is None or score < _x['score']:
                    results[gap_span] = {
                        "score": score,
                        "cost": total_cost,
                        "efficiency": junction_efficiency,
                        "condition": f"{names}",
                        "extension": f"{extensions}"
                    }
        return results

    def _optimize_synthesis_options(self, synthesis_options, left_ext_dict, right_ext_dict):
        """

        :param gap_span_start:
        :type gap_span_start:
        :param gap_span_end:
        :type gap_span_end:
        :param gap_span_step:
        :type gap_span_step:
        :param synthesis_options:
        :type synthesis_options:
        :param extension_dict:
        :type extension_dict:
        :return:
        :rtype:
        """

        # for gap in gap_span_options
        # for gene in gene_options
        # for offset in offset_options
        # best primer option for left_offset
        # best primer option for right offset



        results = {}

        # digitize dictionary so that it includes all possible values
        largest_overlap = self.SYNTHESIS_SIZE_OPTIONS[1] - self.SYNTHESIS_GAP_SPAN_RANGE[0]
        smallest_overlap = self.SYNTHESIS_SIZE_OPTIONS[0] - self.SYNTHESIS_GAP_SPAN_RANGE[1]
        left_ext_dict = digitize_dictionary(left_ext_dict, -largest_overlap, -smallest_overlap)
        right_ext_dict = digitize_dictionary(right_ext_dict, -largest_overlap, -smallest_overlap)

        # optimize for gap spane
        for gap_span in tqdm(range(*self.SYNTHESIS_GAP_SPAN_RANGE), desc="optimizing synthesis"):
            results[gap_span] = None

            # optimize for gene size
            for gene in synthesis_options:
                max_overlap = gene['size'] - gap_span

                extension_params = {
                    'score': self.INF,
                    'cost': self.INF,
                    'eff': 0.0,
                    'left': None,
                    'right': None,
                    'gene_offset': None
                }

                # optimize for offset
                for offset in range(0, max_overlap):
                    gap_left = -max_overlap + offset
                    gap_right = -max_overlap - gap_left
                    ext_left = left_ext_dict[gap_left]
                    ext_right = right_ext_dict[gap_right]
                    ext_cost = ext_left['cost'] + ext_right['cost']
                    ext_eff = ext_left['efficiency'] * ext_right['efficiency']
                    ext_score = self.INF
                    if ext_eff > 0:
                        ext_score = ext_cost / ext_eff

                    if ext_score < extension_params['score']:
                        extension_params['score'] = ext_score
                        extension_params['cost'] = ext_cost
                        extension_params['eff'] = ext_eff
                        extension_params['left'] = ext_left
                        extension_params['right'] = ext_right
                        extension_params['gene_offset'] = offset

                cost = gene['cost'] + extension_params['cost']
                eff = extension_params['eff']
                score = self.INF
                if eff > 0:
                    score = cost / eff

                if results[gap_span] is None or score < results[gap_span]['score']:
                    results[gap_span] = {
                        'score': score,
                        'cost': cost,
                        'gene_cost': gene['cost'],
                        'extension': extension_params,
                        'gene_size': gene['size'],
                        'eff': eff
                    }
        return results

    def gap_cost_dict_both_extendable(self):
        if self._gap_cost_dict_both_extendable is None:
            both_extendable_options = list(itertools.combinations(
                self.primer_extension_options() + self.ultramer_extension_options(), 2))
            self._gap_cost_dict_both_extendable = self._optimize_primer(both_extendable_options)
        return self._gap_cost_dict_both_extendable

    def gap_cost_dict_one_extendable(self):
        if self._gap_cost_dict_one_extendable is None:
            self._gap_cost_dict_one_extendable = self._optimize_primer(
                self.primer_extension_options() + self.ultramer_extension_options())
        return self._gap_cost_dict_one_extendable

    def gap_cost_dict_none_extendable(self):
        if self._gap_cost_dict_none_extendable is None:
            self._gap_cost_dict_none_extendable = self._optimize_primer(
                [{"extension": 0, "cost": 1.0, "name": "no primer"}])
        return self._gap_cost_dict_none_extendable

    def _gap_cost_dict_synthesis(self, left_ext=False, right_ext=False):
        left = right = self.gap_cost_dict_none_extendable()
        if left_ext:
            left = self.gap_cost_dict_one_extendable()
        if right_ext:
            right = self.gap_cost_dict_one_extendable()
        return self._optimize_synthesis_options(self.synthesis_options(), left, right)

    def gap_cost_dict_synthesis_none_extendable(self):
        if self._gap_cost_dict_synthesis_none_extendable is None:
            self._gap_cost_dict_synthesis_none_extendable = self._gap_cost_dict_synthesis(left_ext=False,
                                                                                          right_ext=False)
        return self._gap_cost_dict_synthesis_none_extendable

    def gap_cost_dict_synthesis_one_extendable(self):
        if self._gap_cost_dict_synthesis_one_extendable is None:
            self._gap_cost_dict_synthesis_one_extendable = self._gap_cost_dict_synthesis(left_ext=True, right_ext=False)
        return self._gap_cost_dict_synthesis_one_extendable

    def gap_cost_dict_synthesis_extendable(self):
        if self._gap_cost_dict_synthesis_extendable is None:
            self._gap_cost_dict_synthesis_extendable = self._gap_cost_dict_synthesis(left_ext=True, right_ext=True)
        return self._gap_cost_dict_synthesis_extendable

    @classmethod
    def _path_name(cls):
        name = cls.__name__
        path = os.path.join(HERE, name + ".json")
        return path

    def save(self):
        with open(self._path_name(), 'w') as handle:
            json.dump(self.__dict__, handle)

    @classmethod
    def load(cls):
        if os.path.isfile(cls._path_name()):
            with open(cls._path_name(), 'rb') as handle:
                data = json.load(handle)
            x = cls()
            x.__dict__.update(data)
            return x

    def compute(self):

        logger.debug("Computing gap cost none extendable")
        self.gap_cost_dict_none_extendable()

        logger.debug("Computing gap cost one extendable")
        self.gap_cost_dict_one_extendable()

        logger.debug("Computing gap cost both extendable")
        self.gap_cost_dict_both_extendable()

        logger.debug("Computing synthesis cost none extendable")
        self.gap_cost_dict_synthesis_none_extendable()

        logger.debug("Computing synthesis cost one extendable")
        self.gap_cost_dict_synthesis_one_extendable()

        logger.debug("Computing synthesis cost both extendable")
        self.gap_cost_dict_synthesis_extendable()

    def conditions(self):
        dicts = [
            [0, False, self.gap_cost_dict_none_extendable()],
            [1, False, self.gap_cost_dict_one_extendable()],
            [2, False, self.gap_cost_dict_both_extendable()],
            [0, True, self.gap_cost_dict_synthesis_none_extendable()],
            [1, True, self.gap_cost_dict_synthesis_one_extendable()],
            [2, True, self.gap_cost_dict_synthesis_extendable()],
        ]

        for d in dicts:
            d[-1] = digitize_dictionary(d[-1], -500, 5000)
        return dicts

    def filter_by_condition(self, n, syn):
        dicts = self.conditions()
        if not syn:
            dicts = list(filter(lambda x: not x[1], dicts))
        return list(filter(lambda x: x[0] == n, dicts))

    def gap_cost_dict(self, min, max, e=0, syn=True):
        dicts = self.filter_by_condition(e, syn)
        data = {}
        for span in tqdm(range(min, max+1)):
            values = np.arrayx([d[-1][span]['score'] for d in dicts])
            data[span] = np.min(values)
        return data

    def gap_cost(self, span, e=0, syn=True):
        dicts = self.filter_by_condition(e, syn)

        values = [d[-1][span]['score'] for d in dicts]
        index = values.index(min(values))
        return dicts[index][-1][span]['score']

        # condition 1: no synthesis, both extendable
        # condition 2: no synthesis, one extendable
        # condition 3: no synthesis, none extendable
        # condition 4: synthesis, both extendable
        # condition 5: synthesis, one extendable
        # condition 6: synthesis, none extendable


gac = GibsonAssemblyCost()
print("loading")
gac.load()
print('computing')
gac.compute()
gac.save()

d = gac.gap_cost_dict(-200, 1000)
pass


# gac.compute()
# gac.save()
#
# r1 = gac.gap_cost_dict_synthesis_none_extendable()
# for k, v in r1.items():
#     print(f"{k} {v}")

# r2 = gac.gap_cost_dict_synthesis_one_extendable()
# for k, v in r2.items():
#     print(f"{k} {v}")

# r3 = gac.gap_cost_dict_synthesis_extendable()
# for k, v in r3.items():
#     print(f"{k} {v}")

# r4 = gac.gap_cost_dict_both_extendable()
# for k, v in r4.items():
#     print(f"{k} {v}")

# r5 = gac.gap_cost_dict_one_extendable()
# for k, v in r5.items():
#     print(f"{k} {v}")

# r6 = gac.gap_cost_dict_none_extendable()
# for k, v in r6.items():
#     print(f"{k} {v}")

import matplotlib.pyplot as plt

X = list(range(-400, 1000, 5))
Y = []
for i in X:
    Y.append(gac.gap_cost(i)['score'])


plt.figure()
plt.scatter(X, Y, c='brown')
plt.scatter(list(r1.keys()), [v['score'] for _, v in r1.items()], c='r')
plt.scatter(list(r2.keys()), [v['score'] for _, v in r2.items()], c='b')
plt.scatter(list(r3.keys()), [v['score'] for _, v in r3.items()], c='g')
plt.scatter(list(r4.keys()), [v['score'] for _, v in r4.items()], c='c')
plt.scatter(list(r5.keys()), [v['score'] for _, v in r5.items()], c='purple')
plt.scatter(list(r6.keys()), [v['score'] for _, v in r6.items()], c='yellow')
plt.ylim(0, 500)
plt.show()


DEBUG: Computing gap cost none extendable
optimizing primers: 100%|██████████| 260/260 [00:00<00:00, 196348.40it/s]
DEBUG: Computing gap cost one extendable
optimizing primers: 100%|██████████| 260/260 [00:00<00:00, 2878.43it/s]
DEBUG: Computing gap cost both extendable
optimizing primers:   0%|          | 0/260 [00:00<?, ?it/s]

loading
computing


optimizing primers: 100%|██████████| 260/260 [00:07<00:00, 36.34it/s]
DEBUG: Computing synthesis cost none extendable
optimizing synthesis: 100%|██████████| 130/130 [00:13<00:00, 22.73it/s]
DEBUG: Computing synthesis cost one extendable
optimizing synthesis: 100%|██████████| 130/130 [00:14<00:00, 22.69it/s]
DEBUG: Computing synthesis cost both extendable
optimizing synthesis: 100%|██████████| 130/130 [00:13<00:00,  9.39it/s]
100%|██████████| 1201/1201 [00:00<00:00, 171917.65it/s]


-300 {'score': 1000000.0, 'cost': 1000089.0, 'gene_cost': 89.0, 'extension': {'score': 1000000.0, 'cost': 1000000.0, 'eff': 0.0, 'left': None, 'right': None, 'gene_offset': None}, 'gene_size': 100, 'eff': 0.0}
-290 {'score': 1000000.0, 'cost': 1000089.0, 'gene_cost': 89.0, 'extension': {'score': 1000000.0, 'cost': 1000000.0, 'eff': 0.0, 'left': None, 'right': None, 'gene_offset': None}, 'gene_size': 100, 'eff': 0.0}
-280 {'score': 1000000.0, 'cost': 1000089.0, 'gene_cost': 89.0, 'extension': {'score': 1000000.0, 'cost': 1000000.0, 'eff': 0.0, 'left': None, 'right': None, 'gene_offset': None}, 'gene_size': 100, 'eff': 0.0}
-270 {'score': 1000000.0, 'cost': 1000089.0, 'gene_cost': 89.0, 'extension': {'score': 1000000.0, 'cost': 1000000.0, 'eff': 0.0, 'left': None, 'right': None, 'gene_offset': None}, 'gene_size': 100, 'eff': 0.0}
-260 {'score': 1000000.0, 'cost': 1000089.0, 'gene_cost': 89.0, 'extension': {'score': 1000000.0, 'cost': 1000000.0, 'eff': 0.0, 'left': None, 'right': None, 'g

TypeError: 'float' object is not subscriptable

In [104]:
gac = GibsonAssemblyCost.load()

In [324]:
from bisect import bisect_left, insort_left, bisect_right
from collections import Sized
from more_itertools import unique_everseen, flatten

def argsort(a):
    x = list(enumerate(a))
    x.sort(key=lambda b: b[1])
    return [_x[0] for _x in x]

class SpanDictionary(Sized):
    
    def __init__(self, data=None):
        self._values = []
        self._mins = []
        self._maxs = []
        if data:
            for k, v in data.items():
                self[k[0], k[1]] = v
                
    def values(self):
        return self._values[:]
    
    def keys(self):
        return self.ranges()
        
    def ranges(self, data=False):
        if not data:
            return zip(self._mins, self._maxs)
        return zip(self._mins, self._maxs, self._values)
    
    def items(self):
        for k in self:
            yield k, self[k]
    
    def __setitem__(self, key, val):
        if not isinstance(key, tuple) or len(key) != 2:
            raise ValueError("Must set using [mn, mx]")
        if key[0] > key[1]:
            raise ValueError("Min must be creater than max when setting indices.")
        i = bisect_left(self._mins, key[0])
        self._values.insert(i, val)
        self._mins.insert(i, key[0])
        self._maxs.insert(i, key[1])
        
    def __getitem__(self, key):
        i = bisect_right(self._mins, key)
   
        indices = []
        for j, mx in enumerate(self._maxs[:i]):
            if key < mx:
                indices.append(j)
        return [self._values[n] for n in indices]
    
    def to_dict(self):
        return dict(zip(self.ranges(), self.values()))
    
    def min(self):
        return min(self._mins)
    
    def max(self):
        return max(self._maxs)
    
    def __iter__(self):
        return unique_everseen(flatten(range(a[0], a[1]) for a in self.ranges()))
    
    def __len__(self):
        return len(self._values)
    
    def __repr__(self):
        return str(self)
    
    def __str__(self):
        return "{}({})".format(self.__class__.__name__, str(self.to_dict()))  
    
d = SpanDictionary()
d[4, 4] = 5
d[3, 4] = 8
d[4, 5] = 10
d[8, 10] = 20
d[8, 9] = 25
for i in d:
    print(i)

3
4
8
9


In [331]:
import functools

def partialclass(cls, *args, **kwds):

    class PartialClass(cls):
        __init__ = functools.partialmethod(cls.__init__, *args, **kwds)
    return PartialClass

Span = partialclass(SpanDictionary, valuefunc=lambda x: x)

Span

__main__.partialclass.<locals>.PartialClass

In [332]:
class Cost(object):
    INF = 100000
    MIN_PRIMER_ANNEAL = 20
    PRIMER_COST_PER_BP = SpanDictionary({
        (0, 40): {'base': 0, 'bp': 0.6},
        (40, 80): {'base': 0, 'bp': 0.8}
    })
    PRIMER_COST = 0.60
    GENE_SYNTHESIS_COST_RANGES = SpanDictionary({
        (0, 1): 0,
        (1, 100): INF,
        (100, 500): 89.0,
        (500, 750): 129.0,
        (750, 1000): 149.0,
        (1000, 1250): 209.0,
        (1250, 1750): 249.0,
        (1750, 2000): 289.0,
        (2000, INF): INF
        
    })
    JUNCTION_EFFICIENCY = SpanDictionary({
        (-INF, 10): 0.0,
        (10, 20): 0.1,
        (20, 30): 0.75,
        (31, 100): 0.9,
        (101, INF): 0.0
    })
    
    def primer_extension_options(self):
        return dict(self.PRIMER_COST_PER_BP.items())
    
    def _optimize_primer(self, primer_choices):
        """
        Create a dictionary of gap_span to best_extension case.

        :param gap_span_start:
        :type gap_span_start:
        :param gap_span_end:
        :type gap_span_end:
        :param gap_span_step:
        :type gap_span_step:
        :param primer_choices:
        :type primer_choices:
        :return:
        :rtype:
        """
        results = {}

        # iterate through gap span
        for gap_span in tqdm(range(*self.JUNCTION_GAP_SPAN_RANGE), desc='optimizing primers'):
            results[gap_span] = None
            for choice in primer_choices:
                if not isinstance(choice, tuple):
                    choice = (choice,)
                extensions = [x['extension'] for x in choice]
                costs = [x['cost'] for x in choice]
                names = [x['name'] for x in choice]

                total_extension = sum(extensions)
                total_cost = sum(costs)
                junction_efficiency = self._junction_efficiency(-gap_span + total_extension)

                # score
                score = self.INF
                if junction_efficiency > 0:
                    score = total_cost / junction_efficiency

                _x = results[gap_span]
                if _x is None or score < _x['score']:
                    results[gap_span] = {
                        "score": score,
                        "cost": total_cost,
                        "efficiency": junction_efficiency,
                        "condition": f"{names}",
                        "extension": f"{extensions}"
                    }
        return results


In [335]:

def argsortandval(a):
    x = list(enumerate(a))
    x.sort(key=lambda b: b[1])
    return zip(*[_x for _x in x])

list(argsortandval([14,2,1]))

[(2, 1, 0), (1, 2, 14)]

In [336]:
def digitize_dictionary(mydict, min, max):
    bins = list(mydict.keys())
    x = np.arange(min, max + 1)
    key_indices = np.digitize(x, bins)
    values = []
    for index in key_indices:
        if index == len(bins):
            index = index - 1
        values.append(mydict[bins[index]])
    return dict(zip(x, values))

In [345]:
d = dict(zip(range(10), range(10,20)))
digitize_dictionary(d, -100, 20)

{-100: 10,
 -99: 10,
 -98: 10,
 -97: 10,
 -96: 10,
 -95: 10,
 -94: 10,
 -93: 10,
 -92: 10,
 -91: 10,
 -90: 10,
 -89: 10,
 -88: 10,
 -87: 10,
 -86: 10,
 -85: 10,
 -84: 10,
 -83: 10,
 -82: 10,
 -81: 10,
 -80: 10,
 -79: 10,
 -78: 10,
 -77: 10,
 -76: 10,
 -75: 10,
 -74: 10,
 -73: 10,
 -72: 10,
 -71: 10,
 -70: 10,
 -69: 10,
 -68: 10,
 -67: 10,
 -66: 10,
 -65: 10,
 -64: 10,
 -63: 10,
 -62: 10,
 -61: 10,
 -60: 10,
 -59: 10,
 -58: 10,
 -57: 10,
 -56: 10,
 -55: 10,
 -54: 10,
 -53: 10,
 -52: 10,
 -51: 10,
 -50: 10,
 -49: 10,
 -48: 10,
 -47: 10,
 -46: 10,
 -45: 10,
 -44: 10,
 -43: 10,
 -42: 10,
 -41: 10,
 -40: 10,
 -39: 10,
 -38: 10,
 -37: 10,
 -36: 10,
 -35: 10,
 -34: 10,
 -33: 10,
 -32: 10,
 -31: 10,
 -30: 10,
 -29: 10,
 -28: 10,
 -27: 10,
 -26: 10,
 -25: 10,
 -24: 10,
 -23: 10,
 -22: 10,
 -21: 10,
 -20: 10,
 -19: 10,
 -18: 10,
 -17: 10,
 -16: 10,
 -15: 10,
 -14: 10,
 -13: 10,
 -12: 10,
 -11: 10,
 -10: 10,
 -9: 10,
 -8: 10,
 -7: 10,
 -6: 10,
 -5: 10,
 -4: 10,
 -3: 10,
 -2: 10,
 -1: 10,
 0: 11,


In [465]:
from collections import OrderedDict
from bisect import bisect_left
from sortedcontainers import SortedDict

class InfiniteDict(SortedDict):
    
#     __slots__ = ['_bisect_index']
    
    def __init__(self, *args, ceil=False, **kwargs):
        if not ceil:
            self._bisect_index = 1
        else:
            self._bisect_index = 0
        super().__init__(*args, **kwargs)
        
    def __getitem__(self, k):
        if k in self:
            return super().__getitem__(k)
        else:
            i = self.bisect_left(k)
            if i >= self.keys()[-1]:
                return self.values()[-1]
            x = i - self._bisect_index
            if x >= 0:
                return self.values()[i-self._bisect_index]
            else:
                return self.values()[0]

d = InfiniteDict(ceil=False)


d[1] = 5
d[10] = 4
d[11]

4

In [378]:
from sortedcontainers import SortedDict

In [379]:
(0, 40): {"base": 0, "bp": 0.6, "time": 1.5},
            (40, 80): {"base": 0, "bp": 0.8, "time": 2.0},

0

In [471]:
primer = np.array([0, 0.6, 1.5])

ext = np.array([1.0, 10, 1.0])

In [480]:
np.broadcast_to(ext, (4, 3))

array([[ 1., 10.,  1.],
       [ 1., 10.,  1.],
       [ 1., 10.,  1.],
       [ 1., 10.,  1.]])

In [711]:
# # the base, per-bp costs, time (days)
# primer = np.array([0, 0.6, 1.5])
# ultramer = np.array([0, 0.8, 2.0])

# # we define the ranges for which the primers are valid
# a1 = np.arange(0, 40).reshape(-1, 1)
# a2 = np.arange(40, 80).reshape(-1, 1)
# a = np.concatenate([a1, a2])

In [746]:
np.stack([p1]*10)

array([[0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6]])

In [897]:
a1 = np.arange(0, 61)
a2 = np.arange(45, 201)

np.concatenate([a1, a2])

array([  0,   1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,
        13,  14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,
        26,  27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,
        39,  40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,
        52,  53,  54,  55,  56,  57,  58,  59,  60,  45,  46,  47,  48,
        49,  50,  51,  52,  53,  54,  55,  56,  57,  58,  59,  60,  61,
        62,  63,  64,  65,  66,  67,  68,  69,  70,  71,  72,  73,  74,
        75,  76,  77,  78,  79,  80,  81,  82,  83,  84,  85,  86,  87,
        88,  89,  90,  91,  92,  93,  94,  95,  96,  97,  98,  99, 100,
       101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113,
       114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126,
       127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139,
       140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152,
       153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 16

In [1018]:
from collections.abc import Sized, Iterable

class NpDict(Sized, Iterable):
    
    def __init__(self, size, start, default=0, low_default=None, high_default=None):
        self.a = np.full(size, default)
        self.start = start
        self.default = 0
        self.low = low_default
        self.high = high_default
    
    @classmethod
    def add_to_slice(cls, s, x):
        if isinstance(s, int):
            return s + x
        elif isinstance(s, slice):
            start = s.start
            stop = s.stop
            step = s.step
            if start is not None:
                start += x
            if stop is not None:
                stop += x
            return slice(start, stop, step)
        elif isinstance(s, tuple):
            return tuple(cls.add_to_slice(_s) for _s in s)
        
    def __setitem__(self, k, v):
        index = self.add_to_slice(k, -self.start)
        print("self.a[{index}] = {v}  ".format(index=index, v=v))
        self.a[index] = v  
    
    def __getitem__(self, k):
        i = k - self.start
        if i < 0:
            if self.low:
                return self.low
            return self.a[0]
        elif i > len(self.a):
            if self.high:
                return self.high
            return self.a[-1]
        else:
            return self.a[i]
    
    def __contains__(self, k):
        return k in self.a
    
    def __iter__(self):
        return self.a
        
    def __len__(self):
        return len(self.a)
    
    @classmethod
    def from_tuples(cls, list_of_tuples, default=0, low=None, high=None):
        s = sorted(list_of_tuples)
        size = s[-1][1] - s[0][0]
        start = s[0][0]
        print(start)
        a = cls(size, start, default, low, high)
        for l, h, v in s:
            a[l:h] = v
        return a
    
    def __str__(self):
        return "{}({})(\n{})".format( self.__class__.__name__, self.start, self.a)
        
    def __repr__(self):
        return str(self)
        

In [1019]:
# material cost
p1 = np.array([0, 0.6, 1.5])
p2 = np.array([0, 0.8, 2.5])

# primer lengths for materials above
a1 = np.arange(16, 61)
a2 = np.arange(45, 201)
a = np.concatenate([a1, a2])
a = a.reshape((-1, 1))

# # stacked materials cost
p1 = np.broadcast_to(p1, (a1.shape[0], p1.shape[0]))
p2 = np.broadcast_to(p2, (a2.shape[0], p2.shape[0]))
p = np.concatenate([p1, p2])

# materials cost across indices "a", the primer lengths
# materials cost is independent of span
m = p[:, 1, np.newaxis] * a + p[:, 0, np.newaxis]

# time cost along indices "a", the primer lengths
# time cost is independent of span
t = p[:, 2, np.newaxis]

In [1020]:
jxn = NpDict.from_tuples([
    (10, 20, 0.1),
    (20, 30, 0.75),
    (30, 100, 0.9),
    (100, 120, 0.75),
    (120, 150, 0.3),
], low=0.0, high=0.0)
jxn.a

10
self.a[slice(0, 10, None)] = 0.1  
self.a[slice(10, 20, None)] = 0.75  
self.a[slice(20, 90, None)] = 0.9  
self.a[slice(90, 110, None)] = 0.75  
self.a[slice(110, 140, None)] = 0.3  


array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0])

In [1017]:
# the spanning distance
s = np.arange(1000)
ext = a - min(a)

# # overlap (sum of extensions - span)
o = (ext + ext.T)[:, :, np.newaxis] - s
o.shape
o

TypeError: unsupported operand type(s) for -: 'NpDict' and 'float'

In [816]:
def cross_add(a):
    """Adds element wise to create a nxn matrix."""
    _a, _b = np.meshgrid(a, a)
    return _a + _b

cross_add(m)

array([[  0. ,   0.6,   1.2, ...,  61.6,  62.4,  63.2],
       [  0.6,   1.2,   1.8, ...,  62.2,  63. ,  63.8],
       [  1.2,   1.8,   2.4, ...,  62.8,  63.6,  64.4],
       ...,
       [ 61.6,  62.2,  62.8, ..., 123.2, 124. , 124.8],
       [ 62.4,  63. ,  63.6, ..., 124. , 124.8, 125.6],
       [ 63.2,  63.8,  64.4, ..., 124.8, 125.6, 126.4]])

In [817]:
m

array([[ 0. ],
       [ 0.6],
       [ 1.2],
       [ 1.8],
       [ 2.4],
       [ 3. ],
       [ 3.6],
       [ 4.2],
       [ 4.8],
       [ 5.4],
       [ 6. ],
       [ 6.6],
       [ 7.2],
       [ 7.8],
       [ 8.4],
       [ 9. ],
       [ 9.6],
       [10.2],
       [10.8],
       [11.4],
       [12. ],
       [12.6],
       [13.2],
       [13.8],
       [14.4],
       [15. ],
       [15.6],
       [16.2],
       [16.8],
       [17.4],
       [18. ],
       [18.6],
       [19.2],
       [19.8],
       [20.4],
       [21. ],
       [21.6],
       [22.2],
       [22.8],
       [23.4],
       [32. ],
       [32.8],
       [33.6],
       [34.4],
       [35.2],
       [36. ],
       [36.8],
       [37.6],
       [38.4],
       [39.2],
       [40. ],
       [40.8],
       [41.6],
       [42.4],
       [43.2],
       [44. ],
       [44.8],
       [45.6],
       [46.4],
       [47.2],
       [48. ],
       [48.8],
       [49.6],
       [50.4],
       [51.2],
       [52. ],
       [52

In [708]:
# we represent the cost as a linear combination of 'materials', 'time', and 'efficiency'



# the base, per-bp costs, time (days)
primer = np.array([0, 0.6, 1.5])
ultramer = np.array([0, 0.8, 2.0])

# we define the ranges for which the primers are valid
a1 = np.arange(0, 40).reshape(-1, 1)
a2 = np.arange(40, 80).reshape(-1, 1)
a = np.concatenate([a1, a2])

b = np.ones(a.shape)
x = np.block([b, a, b*20.0])
 
p1 = np.broadcast_to(primer, (a1.shape[0], x.shape[1]))
p2 = np.broadcast_to(ultramer, (a2.shape[0], x.shape[1]))
p = np.concatenate([p1, p2])

P = p.sum(axis=1)

_a, _b = np.meshgrid(P, P)
d = np.expand_dims(_a+_b, axis=2)

_a, _b = np.meshgrid(a, a)
ext = _a + _b
# use ext and junction array to compute efficiency

p[:,:2]
# e = np.array([0, 0, 0, 0.1, 0.2, 0.4, 0.2, 0.8, 0.9, 1.0])
# f = np.multiply(d, e)
# print(f.shape)
# g = f.min(axis=(0,1))

# import pylab as plt
# plt.plot(g)

array([[0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.6],
       [0. , 0.8],
       [0. , 0.8],
       [0. , 0.8],
       [0. , 0.8],
       [0. , 0.8],
       [0. , 0.8],
       [0. , 0.8],
       [0. , 0.8],
       [0. , 0.8],
       [0. , 0.8],
       [0. , 0.8],
       [0. , 0.8],
       [0. ,

In [701]:
x = np.zeros(300)
x[-150 + 150:0 + 150] = 10
x

array([10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10., 10.,
       10., 10., 10., 10., 10., 10., 10.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
        0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0

In [610]:
x1 = np.arange(9.0).reshape((3, 3))
x2 = np.arange(3.0)
np.multiply(x1, x2)

array([[ 0.,  1.,  4.],
       [ 0.,  4., 10.],
       [ 0.,  7., 16.]])

In [611]:
x2

array([0., 1., 2.])