## Imports

In [64]:
import pandas as pd
import numpy as np
import time 

from sklearn.model_selection import train_test_split
from itertools import zip_longest
from Bio.Data import CodonTable
from Bio.Seq import translate, IUPAC

## Tested Functions

In [32]:
def normalize(X): 
    """ normalize an array """
    return (X - X.mean()) / X.std()

def get_wild_type_dna_sequence(): 
    return pd.read_csv("./data/gfp_data.csv")["nucSequence"].values[0]

def dna_to_amino_acid(dna_seq):
    return translate(dna_seq)

def get_all_amino_acids(): 
    return "*" + IUPAC.protein.letters
    
def get_wild_type_amino_acid_sequence(): 
    return dna_to_amino_acid(get_wild_type_dna_sequence())

def count_substring_mismatch(s1, s2): 
    """ returns the number of misaligned pairs within strings s1 and s2"""
    return sum([i != j for i, j in zip_longest(s1, s2)])

def generate_random_gfp_data_mutations(num_of_mutations_lst, num_per_mutation_count = 1000): 
    """
    Input: num_of_mutations_lst is a list defining the count of mutations from the base sequence we want, 
           num_per_mutation_count is an int defining how many of each mutation count do we want. Often set to the test size
    Output: a pandas dataframe comprised of two columns: the number of mutations and the mutation dna sequence
    """
    start_time = time.time()
    total_data_points = len(num_of_mutations_lst) * num_per_mutation_count
    assert(total_data_points < 200000)
    wild_type_sequence = get_wild_type_dna_sequence()
    wild_type_lst = list(wild_type_sequence)
    mutation_lst = np.vstack([wild_type_lst] * total_data_points)
    mutation_count_lst = np.array([[mutation_count for _ in range(num_per_mutation_count)] for mutation_count in num_of_mutations_lst]).flatten()
    
    np.testing.assert_array_equal(mutation_lst[0], mutation_lst[2])
    np.testing.assert_array_equal(mutation_lst[0], wild_type_lst)
    assert(len(mutation_lst) == total_data_points and len(mutation_count_lst) == len(mutation_lst))
    assert(mutation_count_lst[0] == 1 and mutation_count_lst[num_per_mutation_count] == 2 and mutation_count_lst[num_per_mutation_count - 1] == 1)
    
    bases = "ACTG"
    index = list(range(0, 4))
    base_index_map = dict(zip(bases, index))
    index_base_map = dict(zip(index, bases))

    for i, mutation_count in enumerate(mutation_count_lst):   
        mutation_index = np.random.choice(len(wild_type_sequence), mutation_count, replace = False).tolist()
        for j in mutation_index: 
            k = base_index_map[mutation_lst[i, j]]
            new_index = (k + np.random.randint(1, 4)) % 4
            mutation_lst[i, j] = index_base_map[new_index] 
    mutation_sequence_lst = np.array(["".join(lst) for lst in mutation_lst])
    mutated_df = pd.DataFrame.from_dict({'mutation_count' : mutation_count_lst, 'mutated_dna_sequence' : mutation_sequence_lst})
    mutated_df["mutated_amino_acid_sequence"] = mutated_df["mutated_dna_sequence"].apply(lambda x : dna_to_amino_acid(x))
    print(time.time() - start_time, "seconds to generate the mutated df")
    return mutated_df

def get_mutation(string, num_mutations, alphabet): 
    mutation = list(string)
    num_characters = len(alphabet)
    characters_to_index = dict(zip(alphabet, range(num_characters)))
    index_to_characters = dict(zip(range(num_characters), alphabet))
    indexes = np.random.choice(range(len(string)), num_mutations, replace=False)
    for i in indexes:
        original_c = string[i]
        original_index = characters_to_index[original_c]
        new_index = np.random.randint(0, num_characters)
        while new_index == original_index: 
            new_index = np.random.randint(0, num_characters)
        mutation[i] = index_to_characters[new_index] 
    return "".join(mutation)

def save_mutated_gfp_data(mutated_df, path = "./data/mutated_df.csv"):
    mutated_df.to_csv(path, index = None)
    
def load_saved_mutated_gfp_data(path = "./data/mutated_df.csv"): 
    return pd.read_csv(path)

def get_gfp_data(amino_acid = False, gfp_data_path = "./data/gfp_data.csv", x_feature = "nucSequence", y_feature = "medianBrightness", normalize_y = True, test_size = 0.2, shuffle = False):
    # returns gfp data in dna or amino acid form which is then split into train and test set
    df = pd.read_csv(gfp_data_path, index_col = 0)
    if amino_acid: 
        X = df[x_feature].apply(lambda x : dna_to_amino_acid(x)).values
    else: 
        X = df[x_feature].values
    y = df[y_feature].values
    if normalize_y: 
        y = normalize(y)
    return train_test_split(X, y, test_size = test_size, shuffle = shuffle)

def save_gfp_data(X_train, X_test, y_train, y_test, gfp_data_path): 
    np.save(gfp_data_path + "X_train.npy", X_train)
    np.save(gfp_data_path + "X_test.npy", X_test)
    np.save(gfp_data_path + "y_train.npy", y_train)
    np.save(gfp_data_path + "y_test.npy", y_test)

def load_gfp_data(gfp_data_path):
    X_train = np.load(gfp_data_path + "X_train.npy")
    X_test = np.load(gfp_data_path + "X_test.npy")
    y_train = np.load(gfp_data_path + "y_train.npy")
    y_test = np.load(gfp_data_path + "y_test.npy")
    return X_train, X_test, y_train, y_test

  
def one_hot_encode_dna_sequence(X):
    """
    Input: X is a list of DNA Sequences represented by the base pairs ACTG. 
        All DNA Sequences must be the same length
    Output: one hot encoded list of dna sequences
    Example: one_hot_encode(["ACT", "ACG"]) = [[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0], 
                                              [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]]
    """
    assert(len(X) > 0)
    assert(all([len(X[0]) == len(X[i]) for i in range(len(X))]))
    alphabet = ["A", "C", "T", "G"]
    alphabet_size = len(alphabet)
    alphabet_dict = dict(zip(alphabet, range(alphabet_size)))
    one_hot_matrix = np.zeros((len(X), alphabet_size * len(X[0]))) 
    for i, dna_sequence in enumerate(X):
        for j, base_pair in enumerate(dna_sequence):
            index = alphabet_dict[base_pair]
            one_hot_matrix[i, alphabet_size * j + index] = 1.0
    return one_hot_matrix


def one_hot_decode_dna_sequence(X): 
    """
    Input: X is a one hot encoded list of DNA Sequences represented by the base pairs ACTG. 
        All DNA Sequences must be the same length
    Output: list of dna sequences
    Example: one_hot_decode([[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0], 
                            [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]]) = ["ACT", "ACG"]
    """
    assert(len(X) > 0)
    assert(all([len(X[0]) == len(X[i]) for i in range(len(X))]))
    alphabet = ["A", "C", "T", "G"]
    alphabet_size = len(alphabet)
    dna_sequences = []
    for i, one_hot_sequence in enumerate(X): 
        dna_sequence = []
        for j in range(0, len(one_hot_sequence), 4): 
            if one_hot_sequence[j]:
                dna_sequence.append("A")
            elif one_hot_sequence[j + 1]: 
                dna_sequence.append("C")
            elif one_hot_sequence[j + 2]: 
                dna_sequence.append("T")
            elif one_hot_sequence[j + 3]: 
                dna_sequence.append("G")
        dna_sequences.append("".join(dna_sequence))
    return np.array(dna_sequences)

def one_hot_encode(X, alphabet):
    """
    Input: X is a list of sequences represented by the set of letters in alphabet 
        All sequences must be the same length
    Output: one hot encoded list of X sequences
    Example: one_hot_encode(["ACT", "ACG"], "ACTG") = [[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0], 
                                              [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]]
    """
    assert(len(X) > 0)
    assert(all([len(X[0]) == len(X[i]) for i in range(len(X))]))
    alphabet_size = len(alphabet)
    alphabet_dict = dict(zip(alphabet, range(alphabet_size)))
    one_hot_matrix = np.zeros((len(X), alphabet_size * len(X[0])))
    for i, sequence in enumerate(X):
        for j, letter in enumerate(sequence):
            if letter not in alphabet:
                raise KeyError("letter not in alphabet")
            index = alphabet_dict[letter]    
            one_hot_matrix[i, alphabet_size * j + index] = 1.0
    return one_hot_matrix

def one_hot_decode(X, alphabet):
    """
    Input: X is a one hot encoded list of DNA Sequences represented by the base pairs ACTG. 
        All DNA Sequences must be the same length
    Output: list of dna sequences
    Example: one_hot_decode([[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0], 
                            [1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]], "ACTG") = ["ACT", "ACG"]
    """
    assert(len(X) > 0)
    assert(all([len(X[0]) == len(X[i]) for i in range(len(X))]))
    alphabet_size = len(alphabet)
    sequences_lst = []
    for i, one_hot_sequence in enumerate(X): 
        sequence, sequence_len = [], len(one_hot_sequence)
        for j in range(0, sequence_len, alphabet_size):
            index = np.argmax(one_hot_sequence[j:j+alphabet_size])
            sequence.append(alphabet[index])        
        sequences_lst.append("".join(sequence))
    return sequences_lst


In [30]:
x = pd.read_csv("./data/gfp_data.csv")["nucSequence"].values

## Tests

In [40]:
def test_mutated_df(save_mutated_df=False): 
    num_of_mutations_lst = list(range(1, 10)) + [i * 10 for i in range(1, 6)]
    num_per_mutation_count = 11684 #test size = len(X_test)
    mutated_df = generate_random_gfp_data_mutations(num_of_mutations_lst=num_of_mutations_lst, num_per_mutation_count=num_per_mutation_count)
    wild_type_dna_seq, wild_type_amino_acid_seq = get_wild_type_dna_sequence(), get_wild_type_amino_acid_sequence()
    for mutation_count, dna_seq, amino_acid_seq in zip(mutated_df["mutation_count"].values, 
                                                       mutated_df["mutated_dna_sequence"].values, 
                                                       mutated_df["mutated_amino_acid_sequence"].values): 
        assert(mutation_count == count_substring_mismatch(wild_type_dna_seq, dna_seq))
        assert(mutation_count >= count_substring_mismatch(wild_type_amino_acid_seq, amino_acid_seq))
        assert(dna_to_amino_acid(dna_seq) == amino_acid_seq)    
    if save_mutated_df: 
        save_mutated_gfp_data(mutated_df=mutated_df)
        load_mutated_df = load_saved_mutated_gfp_data()
        assert(load_mutated_df.equals(mutated_df))

def test_get_save_load_gfp_dna_seq():
    X_train, X_test, y_train, y_test = get_gfp_data(amino_acid = False)
    save_gfp_data(X_train, X_test, y_train, y_test, "./data/gfp_dna_")
    train_X, test_X, train_y, test_y = load_gfp_data("./data/gfp_dna_")
    np.testing.assert_array_equal(X_train, train_X)
    np.testing.assert_array_equal(X_test, test_X)
    np.testing.assert_array_equal(y_train, train_y)
    np.testing.assert_array_equal(y_test, test_y)
    assert(X_train[0] == get_wild_type_dna_sequence())
    assert(count_substring_mismatch(X_test[0], get_wild_type_dna_sequence()) == 2)
    assert(X_train.shape == (46733,))
    assert(X_test.shape == (11684,))
    assert(y_train.shape == (46733,))
    assert(y_test.shape == (11684,))
    assert(isinstance(X_train[-1], str))
    assert(isinstance(X_test[-1], str))
    assert(isinstance(y_train[-1], float))
    assert(isinstance(y_test[-1], float))
    
def test_get_save_load_gfp_amino_acid_seq():
    X_train, X_test, y_train, y_test = get_gfp_data(amino_acid = True)
    save_gfp_data(X_train, X_test, y_train, y_test, "./data/gfp_amino_acid_")
    train_X, test_X, train_y, test_y = load_gfp_data("./data/gfp_amino_acid_")
    np.testing.assert_array_equal(X_train, train_X)
    np.testing.assert_array_equal(X_test, test_X)
    np.testing.assert_array_equal(y_train, train_y)
    np.testing.assert_array_equal(y_test, test_y)
    assert(X_train[0] == get_wild_type_amino_acid_sequence())
    assert(count_substring_mismatch(X_test[0], get_wild_type_amino_acid_sequence()) == 2)
    assert(X_train.shape == (46733,))
    assert(X_test.shape == (11684,))
    assert(y_train.shape == (46733,))
    assert(y_test.shape == (11684,))
    assert(isinstance(X_train[-1], str))
    assert(isinstance(X_test[-1], str))
    assert(isinstance(y_train[-1], float))
    assert(isinstance(y_test[-1], float))

def test_encode_decode():
    for data_dir in ["./data/gfp_amino_acid_", "./data/gfp_dna_"]:
        train_X, test_X, train_y, test_y = load_gfp_data(data_dir)
        if "dna" in data_dir:
            alphabet = "ACTG"
        else: 
            alphabet = get_all_amino_acids()
        one_hot_X_train, one_hot_X_test = one_hot_encode(train_X, alphabet=alphabet), one_hot_encode(test_X, alphabet=alphabet)
        X_train, X_test = one_hot_decode(one_hot_X_train, alphabet=alphabet), one_hot_decode(one_hot_X_test, alphabet=alphabet)
        np.testing.assert_array_equal(train_X, X_train)
        np.testing.assert_array_equal(test_X, X_test)


In [90]:
test_mutated_df(True)
test_get_save_load_gfp_dna_seq()
test_get_save_load_gfp_amino_acid_seq()
test_encode_decode()

225.9694128036499 seconds to generate the mutated df


## Code

In [63]:
start_time = time.time()
X_train, X_test, y_train, y_test = load_gfp_data("./data/gfp_amino_acid_")
data = np.array([x[:] for x in X_train[0:]])
one_hot_X_train = one_hot_encode(data, get_all_amino_acids())
z = one_hot_decode([one_hot_X_train[0]], get_all_amino_acids())[0]
assert(z == get_wild_type_amino_acid_sequence())
print("Finished encoding data in {:.2f} seconds".format(time.time() - start_time))

Finished encoding data in 8.30 seconds
