In [6]:
import pandas as pd
import random
import sklearn as sk
import numpy as np
from typing import List, Dict, Tuple, Union
from numpy.typing import ArrayLike
from nn import io
import re
from nn import preprocess

In [4]:
# read txt file of sequences
pos_seqs=io.read_text_file("./data/rap1-lieb-positives.txt")
neg_seqs=io.read_fasta_file("./data/yeast-upstream-1k-negative.fa")

In [17]:
# preprocessing functions

# splitting imbalanced class labels
def sample_seqs(seqs: List[str], labels: List[bool]) -> Tuple[List[str], List[bool]]:
    """
    This function should sample the given sequences to account for class imbalance. 
    Consider this a sampling scheme with replacement.
    
    Args:
        seqs: List[str]
            List of all sequences.
        labels: List[bool]
            List of positive/negative labels

    Returns:
        sampled_seqs: List[str]
            List of sampled sequences which reflect a balanced class size
        sampled_labels: List[bool]
            List of labels for the sampled sequences
    """
    # we can use the positive classes as they are (all of them) and just sample the same number of negative classes from the large pool.
    all_pos_seqs=[]
    all_pos_labels=[]
    all_neg_seqs=[]

    # isolate positive and negativeclasses
    for seq, label in zip(seqs, labels):
        if (label==True):
            all_pos_seqs.append(seq)
            all_pos_labels.append(label)
        else:
            all_neg_seqs.append(seq)


    # randomly sample negative classes in the same amount as there are positive classes
    neg_seqs_sample=random.sample(all_neg_seqs, len(all_pos_seqs)) 

    sampled_seqs=all_pos_seqs+neg_seqs_sample
    sampled_labels=all_pos_labels+[False]*len(neg_seqs_sample)

    return sampled_seqs, sampled_labels
    # pass

# one-hot encode
"""
encodings for base pairs

A -> [1, 0, 0, 0]
T -> [0, 1, 0, 0]
C -> [0, 0, 1, 0]
G -> [0, 0, 0, 1]
"""
def one_hot_encode_seqs(seq_arr):
    # define encodings # https://stackoverflow.com/questions/6116978/how-to-replace-multiple-substrings-of-a-string
    encodings={'A':'1000', 'T':'0100', 'C':'0010', 'G':'0001'}
    encodings=dict((re.escape(k), v) for k, v in encodings.items()) 
    pattern=re.compile("|".join(encodings.keys()))

    # encode over loop and store in new list; goal is to replace each base pair with a binary string then convert the entire binary string to a list to create the encodings
    encoded_seq_arr=[]
    for seq in seq_arr:
        alt_seq=pattern.sub(lambda bp: encodings[re.escape(bp.group(0))], seq)
        alt_seq=list(alt_seq)
        encoded_seq_arr.append(alt_seq)

    return encoded_seq_arr

In [5]:
# architecture and nn initialization
nn_arch=[{'input_dim': 64, 'output_dim': 32, 'activation': 'relu'}, {'input_dim': 32, 'output_dim': 8, 'activation': 'sigmoid'}]
global nn_arch

# Seed NumPy
seed=10
np.random.seed(seed)

# Define parameter dictionary
param_dict = {}

# Initialize each layer's weight matrices (W) and bias matrices (b)
for idx, layer in enumerate(nn_arch):
    layer_idx = idx + 1
    input_dim = layer['input_dim']
    output_dim = layer['output_dim']
    param_dict['W' + str(layer_idx)] = np.random.randn(output_dim, input_dim) * 0.1
    param_dict['b' + str(layer_idx)] = np.random.randn(output_dim, 1) * 0.1

# returns param_dict

In [6]:
# activation functions
def _sigmoid(Z):
    return 1/(1+np.exp(-Z))

def _relu(Z):
    # https://www.digitalocean.com/community/tutorials/relu-function-in-python
    return np.array([max(0.0, z) for z in Z])

In [7]:
# single forward pass private method
def _single_forward(W_curr, b_curr, A_prev, activation=None):
    Z_curr=np.dot(W_curr, A_prev)+b_curr

    # pass through activation function
    if (activation=='sigmoid'):
        A_curr=_sigmoid(Z_curr)
    elif (activation=='relu'):
        A_curr=_relu(Z_curr)

    return A_curr, Z_curr

# define forward method for entire nn; will be similar to initialization but we use activation functions to derive the next A and Z matrices
def forward(X):

    # define cache as param_dict as we are going to update it
    cache={}

    # set A_curr as the input batches but transpose to have the batches along the columns and features along the rows; this matches it to the W matrix, which contains the weights for the current layer in each row
    A_curr=X.T
    cache['A0']=A_curr # add as initial layer

    # go through each layer and call _single_forward
    for idx, layer in enumerate(nn_arch):
        A_prev=A_curr
        layer_idx=idx+1
        A_curr, Z_curr=_single_forward(param_dict['W' + str(layer_idx)], param_dict['b' + str(layer_idx)], A_prev, layer['activation'])

        # add the A_curr and Z_curr to cache (on the last run, we get the final output layer)
        cache['A' + str(layer_idx)]=A_curr # has dimensions output_dim x batch_size
        cache['Z' + str(layer_idx)]=Z_curr # has dimensions output_dim x batch_size

    return A_curr, cache

In [8]:
def _sigmoid_backprop(dA_curr, Z_curr):
    pass

def _relu_backprop(dA_curr, Z_curr):
    pass

def _binary_cross_entropy_backprop(y, y_hat):
    pass

def _mean_squared_error_backprop(y, y_hat):
    pass


# define arbitrary loss function
_loss_func='bce' # or mse


# backprop functions
def _single_backprop(W_curr, b_curr, Z_curr, A_prev, dA_curr, activation_curr):
    # dA_curr is the dC/dA_last, which is multiplied in the backpropagation step with dA/dZ, the output of this multiplication is dZ_curr, which represents dC/dZ -> we compute this first
    # dA_curr has dimensions output_dim x batch_size, as does Z_curr and dZ_curr
    if (activation_curr=='sigmoid'):
        dZ_curr=_sigmoid_backprop(dA_curr, Z_curr)
    elif (activation_curr=='relu'):
        dZ_curr=_relu_backprop(dA_curr, Z_curr)

    # dA_prev, represents dC/dA_prev, can be thought of as expanding the chain rule using dZcurr (dC/dZ) and dZ/dA_prev; the derivative of Z_curr with respect to A_prev is simply W_curr
    # this means, taking the dot product of dZ_curr and W_curr will give us dA_prev
    dA_prev=np.dot(W_curr.T, dZ_curr)

    # dZ/dW, the derivative of the linear combination with respect to weights, is simply A_prev, which has dimensions of previous layer, i.e. input_layer x batch_size
    # W_curr, has dimensions output_dim x input_dim (A_prev length); so dW_curr must also be this dimension, and we can take the dot product of dZ_curr and A_prev.T
    # Note, dW_curr represents dC/dW, which is the full chain multiplication of the subcomponents
    dW_curr=np.dot(dZ_curr, A_prev.T)/dZ_curr.shape[1] # normalize by number of samples in batch (batch_size)

    #
    db_curr=None

    return dA_prev, dW_curr, db_curr

def backprop(self, y: ArrayLike, y_hat: ArrayLike, cache: Dict[str, ArrayLike]):
    # first compute dA_curr given the loss_function, y, and y_hat
    if (_loss_func=="bce"):
        dA_curr=_binary_cross_entropy_backprop(y, y_hat)
    elif (_loss_func=="mse"):
        dA_curr=_mean_squared_error_backprop(y, y_hat)


    # initialize gradient dictionary
    gradient_dict={}
    
    # the cache and param_dict is indexed such that the input layer is A0 and the final layer is AN where N is the number of layers. So with a three layer system, we would have A0, A1, and A2; The weights and biases are labeled for A1 and A2.
    # iterate and enumerate the list in reverse to get the components that are fed into  
    # https://stackoverflow.com/questions/529424/traverse-a-list-in-reverse-order-in-python
    for idx, layer in reversed(list(enumerate(nn_arch))):
        layer_idx=idx+1
        dA_prev, dW_curr, db_curr=_single_backprop(param_dict['W'+str(layer_idx)], param_dict['b'+str(layer_idx)], cache['Z'+str(layer_idx)], cache['A'+str(idx)], dA_curr, layer['activation'])
        dA_curr=dA_prev # update dA_curr as the previous dA since we are going backward

        # update gradient dicts with the same labels as the param_dict
        gradient_dict['W'+str(layer_idx)]=dW_curr
        gradient_dict['b'+str(layer_idx)]=db_curr

    return gradient_dict
    # pass




In [39]:
# fit function with minibatching
batch_size=4 # set arbitrary batch size

def fit(
    self,
    X_train: ArrayLike,
    y_train: ArrayLike,
    X_val: ArrayLike,
    y_val: ArrayLike
) -> Tuple[List[float], List[float]]:
    """
    This function trains the neural network by backpropagation for the number of epochs defined at
    the initialization of this class instance.

    Args:
        X_train: ArrayLike
            Input features of training set.
        y_train: ArrayLike
            Labels for training set.
        X_val: ArrayLike
            Input features of validation set.
        y_val: ArrayLike
            Labels for validation set.

    Returns:
        per_epoch_loss_train: List[float]
            List of per epoch loss for training set.
        per_epoch_loss_val: List[float]
            List of per epoch loss for validation set.
    """
    # intialize loss lists
    per_epoch_loss_train=[]
    per_epoch_loss_val=[]

    # split training data into batches # https://www.geeksforgeeks.org/break-list-chunks-size-n-python/
    X_train_batches=[X_train[i:i + batch_size] for i in range(0, len(X_train), batch_size)]  
    y_train_batches=[y_train[i:i + batch_size] for i in range(0, len(y_train), batch_size)]  

    # iterate and train wrt number of epoch
    for e in range(self._epochs):
        # all of the training is done with the batches iteratively and each time the loss is stored in the loss_train_list, which is averaged to give you the loss_train for the epoch
        loss_train_list=[]
        for X_train, y_train in zip(X_train_batches, y_train_batches):
            # first train via forward alg. and get training loss
            y_hat_train, cache_train=self.forward(X_train)
            if (self._loss_func=="bce"): 
                loss_train=self._binary_cross_entropy_backprop(y_train, y_hat_train)
            elif (self._loss_func=="mse"): 
                loss_train=self._mean_squared_error_backprop(y_train, y_hat_train)

            loss_train_list.append(loss_train)

            # update weights via backprop
            grad_dict=self.backprop(y_train, y_hat_train, cache_train)
            self._update_params(grad_dict)

        # weighted average of the training losses where the weights are the length of each batch. If the batches are balanced in size, then the weighting does nothing and it is simply equal to the unweighted average
        loss_train=(loss_train_list*[len(b) for b in X_train_batches])/len(X_train_batches)
        per_epoch_loss_train.append(loss_train) # store training loss

        # run validation on val data and store validation loss
        y_hat_val, cache_val=self.forward(X_val)
        if (self._loss_func=="bce"): 
            loss_val=self._binary_cross_entropy_backprop(y_val, y_hat_val)
        elif (self._loss_func=="mse"): 
            loss_val=self._mean_squared_error_backprop(y_val, y_hat_val)
        per_epoch_loss_val.append(loss_val) # store validation loss

    return per_epoch_loss_train, per_epoch_loss_val
    # pass


In [41]:
# https://www.geeksforgeeks.org/break-list-chunks-size-n-python/
arr=np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])  
batch_size=6
arr_split=[arr[i:i + batch_size] for i in range(0, len(arr), batch_size)]  


In [46]:

arr_split_lens=[len(b) for b in arr_split]


In [49]:
arr_split_lens

[6, 5]

In [47]:
arr_loss=np.array([1,2])

In [48]:
arr_loss*arr_split_lens

array([ 6, 10])

In [16]:
# https://stackoverflow.com/questions/529424/traverse-a-list-in-reverse-order-in-python
for idx, layer in reversed(list(enumerate(nn_arch))):
    layer_idx=idx+1
    print(layer_idx, layer['activation'])

2 sigmoid
1 relu


In [13]:
nn_arch

[{'input_dim': 64, 'output_dim': 32, 'activation': 'relu'},
 {'input_dim': 32, 'output_dim': 8, 'activation': 'sigmoid'}]

In [49]:
nn_arch[0]['input_dim']

64

In [25]:
param_dict['W1'].shape

(32, 64)

In [26]:
param_dict['b1'].shape

(32, 1)

In [27]:
W_t=np.array([[2,3], [3,4]])
b_t=np.array([5,6])
Z_t=np.array([7,8])

In [None]:
# scratch testing
Z=np.array([[-1,1,-1,1,1,-1], [-1,1,-1,1,1,-1], [-1,1,-1,1,1,-1]])
np.array([np.maximum(0,z) for z in Z.T])
np.maximum(0,Z)
np.where(Z>0, 1, 0)
np.mean(Z, axis=1)


np.array([np.maximum(0,z) for z in Z])

z=np.array([-1,1,-1,1,1,-1])
np.maximum(0,z)

# loss_train_list=[2,3,4]
# X_train_batches=[[4,3,2],[4,3,2],[4,3,2]]
# (np.sum(np.array(loss_train_list)*np.array([len(b) for b in X_train_batches])))/len(9)

# layer_idx=1
# update=nn._lr*nn.grad_dict['b'+str(layer_idx)]

# nn._param_dict['b'+str(layer_idx)]-=nn._lr*nn.grad_dict['b'+str(layer_idx)]
