In [None]:
import numpy as np
from random import randint


def read_subselection(transcript, target_length):
    """
    Return a subsection of a transcript of target length
    :param transcript: String
    :param target_length: int
    :return: Read
    """
    if len(transcript) <= target_length:
        return transcript
    pos = randint(0, len(transcript) - target_length)
    return transcript[pos:pos + target_length]


def random_impute_n(read, probability):
    """
    Randomly replace Nuclotides in a string with "N"s with a probability
    :param read: Nucleotide read
    :param probability: Probability a given nucleotide is replaces with N
    :return: Modified String
    """
    length = len(read)
    my_arr = np.random.choice([0, 1], size=(length,), p=[1 - probability, probability])
    return "".join(["N" if my_arr[i] == 1 else read[i] for i in range(length)])


def random_mutation(read, probability):
    """
    Randomly replace Nuclotides in a string with a random mutation with a probability
    :param read: Nucleotide read
    :param probability: Probability a given nucleotide is replaced with a random nucleotide
    :return: Modified String
    """
    length = len(read)
    my_arr = np.random.choice([0, 1], size=(length,), p=[1 - probability, probability])
    return "".join([np.take(["A", "C", "G", "T"], randint(0, 3)) if my_arr[i] == 1 else read[i] for i in range(length)])


def random_deletion(read, probability):
    """
    Randomly replace deletes Nuclotides in a string with a probability
    :param read: Nucleotide read
    :param probability: Probability a given nucleotide is removed
    :return: Modified String
    """
    length = len(read)
    my_arr = np.random.choice([0, 1], size=(length,), p=[1 - probability, probability])
    return "".join(["" if my_arr[i] == 1 else read[i] for i in range(length)])


def random_insert(nucl):
    """
    Randomly prepend a given nucleotide with a random nucleotide. EX: "A" -> "CA"
    :param nucl: Nucleotide
    :return: 2 nucleotide string
    """
    return np.take(["A" + nucl, "C" + nucl, "G" + nucl, "T" + nucl], randint(0, 3))


def random_insertion(read, probability):
    """
    Randomly inserts (before) Nuclotides in a string with a random mutation with a probability
    :param read: Nucleotide read
    :param probability: Probability a given nucleotide is prepended random nucleotide
    :return: Modified String
    """
    length = len(read)
    my_arr = np.random.choice([0, 1], size=(length,), p=[1 - probability, probability])
    return "".join([random_insert(read[i]) if my_arr[i] == 1 else read[i] for i in range(length)])


def simulate_read(transcript, n_prob, mutation_prob, delete_prob, insert_prob, target_length):
    """
    Takes a transcript, and simulates random possible read from the transcript.
    Will replace with N's, mutate, delete or insert with associated probabilities.
    :param transcript: Nucleotide transcript
    :param n_prob: Probability of replacing a nucleotide with "N"
    :param mutation_prob: Probability of mutating a nucleotide
    :param delete_prob: Probability of deleting a nucleotide
    :param insert_prob: Probability of inserting a nucleotide.
    :param target_length: Length of read.
    :return: Read with (possibly) modified sequence.
    """
    read = random_mutation(read=transcript, probability=mutation_prob)
    read = random_insertion(read=read, probability=insert_prob)
    read = random_deletion(read=read, probability=delete_prob)
    read = random_impute_n(read=read, probability=n_prob)
    return read_subselection(transcript=read, target_length=target_length)


def shuffle_x_y_together(a, b):
    """
    Shuffles 2 numpy arrays together at the 1st dimension. Keeps them together, such that (Xi,Yi) => (Xj,Xj)
    :param a: Numpy Array
    :param b: Numpy Array
    :return: Shuffled A, Shuffled B
    """
    a_s = a.shape[0]
    b_s = b.shape[0]
    assert a_s == b_s, "Arrays must be of same size"
    i = np.argsort(np.random.permutation(a_s))
    return a[i, :, :], b[i, :]


def build_train_sets(data_samplers,
                     sample_size=1000,
                     test_size=.2):
    """
    Builds creates a train/test tensor for a round of training. 
    :param data_samplers: list sfseventools.FastmSampler
    :param sample_size: Samples per tensor, default 1000
    :param test_size: Test size as percentage of sample size. Default .2
    :return: x_train, y_train, x_test, y_test
    """

    length = len(data_samplers)
    test_length = int(length * test_size)

    x_train = np.concatenate([data_samplers[i].build_tensor(sample_size) for i in range(length)])
    y_train = np.concatenate([data_samplers[i].build_output_tensor(sample_size) for i in range(length)])
    x_test = np.concatenate([data_samplers[i].build_tensor(sample_size) for i in range(test_length)])
    y_test = np.concatenate([data_samplers[i].build_output_tensor(sample_size) for i in range(test_length)])

    return x_train, y_train, x_test, y_test


def sample_from_transcripts(transcripts, n_prob, mutation_prob, delete_prob, insert_prob, target_length, num_samples):
    """
    Takes a list of nucleotide transcripts, and simulates sets of random possible reads, sampled from the transcripts.
    Will replace with N's, mutate, delete or insert with associated probabilities.

    :param transcripts: Nucleotide transcripts
    :param n_prob: Probability of replacing a nucleotide with "N"
    :param mutation_prob: Probability of mutating a nucleotide
    :param delete_prob: Probability of deleting a nucleotide
    :param insert_prob: Probability of inserting a nucleotide.
    :param target_length: Length of read.
    :param num_samples:
    :return: Read with (possibly) modified sequence.
    """
    picks = np.random.choice(len(transcripts), num_samples)
    return [simulate_read(transcripts[pick], n_prob, mutation_prob, delete_prob, insert_prob, target_length) for pick in
            picks]


In [None]:
def read_fastm_file(input_filepath, uid=None, sep=None, ender=None):
    """
    Ingests a .fastm file, and returns a list of trsancript UIDs and Nucleotide transcripts.

    :param input_filepath: Fastm filepath
    :param uid: Extraneous chars for read UIDs
    :param sep: UID / read separator. Default ')'
    :param ender: File endline character. Default '}'
    :return: (tuple) (List of Unique Identifiers, List of Transcripts)
    """
    if uid is None:
        uid = UID
    if sep is None:
        sep = SEP
    if ender is None:
        ender = ENDER
    transcript_ids = []
    transcripts = []
    with open(input_filepath, 'r') as f:
        lines = f.readlines()
        for line in lines:
            line = line.replace('\n', '').replace(ender, '')
            if uid is not None:
                line = line.replace(uid, '')
            name, sequence = line.split(sep)
            transcript_ids.append(name)
            transcripts.append(sequence)
    return transcript_ids, transcripts

def reads_to_numpy(reads, array_length, randomize_positions, possible_reverse):
    """
    Converts a nucleotide read to a 1-hot numpy array.
    :param reads: list of Nucleotide reads
    :param array_length: int Array length, must be large enough to fit the provided read 
    :param randomize_positions: Can the read pe in any position in the read? Else it will at the beginning of the array 
    :param possible_reverse: Bool Can the read be reversed?
    :return: 3-D numpy 1-hot list
    """
    return np.asarray([sequence_to_array(read, array_length, randomize_positions, possible_reverse) for read in reads])

In [None]:
from random import uniform
import numpy as np


class FastmSampler(object):
    def __init__(self,
                 fastm_file_path,
                 n_prob_upper_limit,
                 mutation_prob_upper_limit,
                 delete_prob_upper_limit,
                 insert_prob_upper_limit,
                 min_read_length,
                 tensor_length,
                 b_randomize_location,
                 b_randomize_direction,
                 training_output,
                 uid=None,
                 sep=None,
                 ender=None):
        """
        Samples a fastm file and builds a training 1-hot tensor. Will impute Ns, mutations etc.
        with associated probabities.
        
        :param fastm_file_path: Path fo fastm file.
        :param n_prob_upper_limit: Upper chance to impute N's
        :param mutation_prob_upper_limit: Upper chance to impute mutations
        :param delete_prob_upper_limit: Upper chance to delete nucleotides
        :param insert_prob_upper_limit: Upper chance to insert nucleotides
        :param min_read_length: Minimum read length
        :param tensor_length: Target tensor length
        :param b_randomize_location: Randomize location if read length < tensor length
        :param b_randomize_direction: Randomize 3'-5' or 5'-3'?
        :param training_output: Truth output vector eg: [1] or [0, 0, 0, 1] etc
        :param uid: Extraneous chars for read UIDs (optional)
        :param sep: UID / read separator. Default ')' (optional)
        :param ender: File endline character. Default '}' (optional)
        """
        self.fastm_file_path = fastm_file_path
        self.n_prob_upper_limit = n_prob_upper_limit
        self.mutation_prob_upper_limit = mutation_prob_upper_limit
        self.delete_prob_upper_limit = delete_prob_upper_limit
        self.insert_prob_upper_limit = insert_prob_upper_limit
        self.min_read_length = min_read_length
        self.tensor_length = tensor_length
        self.b_randomize_location = b_randomize_location
        self.b_randomize_direction = b_randomize_direction
        self.training_output = training_output
        ids, self.transcripts = read_fastm_file(self.fastm_file_path, uid=uid, sep=sep, ender=ender)

    def build_tensor(self, size):
        """
        Builds a train tensor. Randomly imputes N, mutation etc
        :param size: Number of sample to take in training set
        :return: Tensor (size x tensor length x 4)
        """
        n_p = uniform(0, self.n_prob_upper_limit)
        m_p = uniform(0, self.mutation_prob_upper_limit)
        d_p = uniform(0, self.delete_prob_upper_limit)
        i_p = uniform(0, self.insert_prob_upper_limit)
        target_length = int(round(uniform(self.min_read_length, self.tensor_length), 0))
        samples = sample_from_transcripts(self.transcripts,
                                          n_prob=n_p,
                                          mutation_prob=m_p,
                                          delete_prob=d_p,
                                          insert_prob=i_p,
                                          target_length=target_length,
                                          num_samples=size)
        return reads_to_numpy(samples, self.tensor_length, self.b_randomize_location, self.b_randomize_direction)

    def build_output_tensor(self, size):
        """
        Builds a train tensor. Randomly imputes N, mutation etc
        :param size: Number of sample to take in training set
        :return: Tensor (size x tensor length x 4)
        """
        return np.asarray([self.training_output for i in range(size)])