#Import Data

In [None]:
import numpy as np
from pathlib import Path

# connect drive
from google.colab import drive
drive.mount('/content/gdrive/')

#path to folder
project_folder = Path('/content/gdrive/MyDrive/BIEN410_FinalProject')

Mounted at /content/gdrive/


In [None]:
input_file = project_folder/'input_file/infile.txt'

#look at the text file
with open(input_file, 'r') as f:
  data = f.read(1000) # first 1000 characters
  print(data)

>d1dlwa | 116
SLFEQLGGQAAVQAVTAQFYANIQADATVATFFNGIDMPNQTNKTAAFLCAALGGPNAWTGRNLKEVHANMGVSNAQFTTVIGHLRSALTGAGVAAALVEQTVAVAETVRGDVVTV
>d2bkma | 128
EQWQTLYEAIGGEETVAKLVEAFYRRVAAHPDLRPIFPDDLTETAHKQKQFLTQYLGGPPLYTAEHGHPMLRARHLRFEITPKRAEAWLACMRAAMDEIGLSGPAREQFYHRLVLTAHHMVNTPDHLD
>d1asha | 147
ANKTRELCMKSLEHAKVDTSNEARQDGIDLYKHMFENYPPLRKYFKSREEYTAEDVQNDPFFAKQGQKILLACHVLCATYDDRETFNAYTRELLDRHARDHVHMPPEVWTDFWKLFEEYLGKKTTLDEPTKQAWHEIGREFAKEINK
>d4hswa | 137
GFKQDIATIRGDLRTYAQDIFLAFLNKYPDERRYFKNYVGKSDQELKSMAKFGDHTEKVFNLMMEVADRATDCVPLASDANTLVQMKQHSSLTTGNFEKFFVALVEYMRASGQSFDSQSWDRFGKNLVSALSSAGMK
>d1x9fd | 140
ECLVTESLKVKLQWASAFGHAHERVAFGLELWRDIIDDHPEIKAPFSRVRGDNIYSPEFGAHSQRVLSGLDITISMLDTPDMLAAQLAHLKVQHVERNLKPEFFDIFLKHLLHVLGDRLGTHFDFGAWHDCVDQIIDGIK
>d1x9fc | 149
HEHCCSEEDHRIVQKQWDILWRDTESSKIKIGFGRLLLTKLAKDIPEVNDLFKRVDIEHAEGPKFSAHALRILNGLDLAINLLDDPPALDAALDHLAHQHEVREGVQKAHFKKFGEILATGLPQVLDDYDALAWKSCLKGILTKISSRL
>d1jl7a | 147
GLSAAQRQVVASTWKDIAGADNGAGVGKECLSKFISAHPEMAAVFGFSGASDPGVAELGAKVLAQIGVAVSHLGDEGKM

In [None]:
#split the file based on new lines
data = [x for x in open(input_file).read().splitlines()]
#keep only the even entries (sequences) 
seqs = data[1::2]
len(seqs)

5326

In [None]:
#get the targets/labels
label_file = project_folder/'training_data/labels.txt'

#look at the text file
with open(label_file, 'r') as f:
  data = f.read(1000) # first 1000 characters
  print(data)

>d1dlwa | 116
SLFEQLGGQAAVQAVTAQFYANIQADATVATFFNGIDMPNQTNKTAAFLCAALGGPNAWTGRNLKEVHANMGVSNAQFTTVIGHLRSALTGAGVAAALVEQTVAVAETVRGDVVTV
-HHHHH--HHHHHHHHHHHHHHHHH----HHHH----HHHHHHHHHHHHHHHH----------HHHHH------HHHHHHHHHHHHHHHHHH---HHHHHHHHHHHHHHHHHH---
>d2bkma | 128
EQWQTLYEAIGGEETVAKLVEAFYRRVAAHPDLRPIFPDDLTETAHKQKQFLTQYLGGPPLYTAEHGHPMLRARHLRFEITPKRAEAWLACMRAAMDEIGLSGPAREQFYHRLVLTAHHMVNTPDHLD
-----HHHHH-HHHHHHHHHHHHHHHHHH-----------HHHHHHHHHHHHHHHH----HHHHHH----HHHHHH-----HHHHHHHHHHHHHHHHHH----HHHHHHHHHHHHHHHHH--------
>d1asha | 147
ANKTRELCMKSLEHAKVDTSNEARQDGIDLYKHMFENYPPLRKYFKSREEYTAEDVQNDPFFAKQGQKILLACHVLCATYDDRETFNAYTRELLDRHARDHVHMPPEVWTDFWKLFEEYLGKKTTLDEPTKQAWHEIGREFAKEINK
-HHHHHHHHHHHHH------HHHHHHHHHHHHHHHHH-HHHHHH--------HHHHHH-HHHHHHHHHHHHHHHHHHHH---HHHHHHHHHHHHHHHHH------HHHHHHHHHHHHHHHHHH----HHHHHHHHHHHHHHHHHHH-
>d4hswa | 137
GFKQDIATIRGDLRTYAQDIFLAFLNKYPDERRYFKNYVGKSDQELKSMAKFGDHTEKVFNLMMEVADRATDCVPLASDANTLVQMKQHSSLTTGNFEKFFVALVEYMRASGQSFDSQSWDRFGKNLVSALSSAGMK
-HHHHHHHHHHHHHHHHH

In [None]:
#split the file based on new lines
label_data = [x for x in open(label_file).read().splitlines()] 
labels = label_data[2::3]
len(labels)

5326

#Preprocess data

In [None]:
class Preprocessor():
  def __init__(self):
    self.seq_dct = None
    self.label_dct = None
    self.flatX = None
    self.flatY = None
    self.flatYoh = None

  def encode(self, sequence_list:list, dct):
    """ Takes in a list of sequences and returns a 
    list of sequences after numerically encoding"""
    seqs2int = []
    #nested = any(isinstance(i, list) for i in sequence_list) # check if list is nested or not
    for seq in sequence_list:
      ints=[]
      for i in list(seq):
        ints.append(list(dct.values()).index(i))
      seqs2int.append(ints)
    return seqs2int
  
  def flatten(self, l:list):
    """Takes in a list of lists and flattens it"""
    return [item for sublist in l for item in sublist]
  
  def transformX(self, seqs:list, N=3):
    """Takes in a sequence string or list of strings and creates input for NB model"""
    aas = ['A', 'R', 'N', 'D', 'C', 'Q', 'E', 'G', 'H', 'I', 'L', 'K', 'M', 'F', 'P', 'S', 'T', 'W', 'Y', 'V']
    dct = dict(enumerate(aas)) # create a dictionary of ammino acids
    self.seq_dct = dct
    enc = self.encode(seqs, dct)
    flat = self.flatten(enc)
    self.flatX = flat
    new_seq = flat.copy()
    #pad with N-1 dummy tokens (20)
    new_seq.extend([20]*(N-1))
    toks = [new_seq[i:i+N] for i in range(len(new_seq)-N+1)]
    return np.array(toks)
  
  def labeldict(self, y:list):
    set_labels = set([])
    for l in y:
      for seq in l:
        for char in list(seq):
          set_labels.add(char.lower())
    label2idx = {l: i for i, l in enumerate(list(set_labels))}
    self.label_dct = label2idx
    return label2idx

  def encodeY(self, arr, dict):
    seqs2int = []
    for lst in arr:
      ints = []
      for i in list(lst):
        ints.append(dict[i.lower()])
      seqs2int.append(ints)
    return seqs2int
  
  def one_hot(self, np_arr, categories=2):
    cat_seqs = []
    for idx, label in np.ndenumerate(np_arr):
      cat_seqs.append(np.zeros(categories))
      cat_seqs[-1][label] = 1.0
    return np.array(cat_seqs)
  
  def transformY(self, labels:list):
    label_dict = self.labeldict(labels)
    enc_y = self.encodeY(labels, label_dict)
    flat_y = self.flatten(enc_y)
    self.flatY = flat_y
    oh_y = self.one_hot(np.array(flat_y))
    self.flatYoh = oh_y
    return oh_y

In [None]:
pre = Preprocessor()
X = pre.transformX(seqs)
print(X.shape)
y = pre.transformY(labels)
y.shape

(1147861, 3)


(1147861, 2)

In [None]:
from sklearn.model_selection import train_test_split
#create training and testing splits
X_train, X_test, y_train, y_test = train_test_split(X, y, 
                                                    test_size=0.2, random_state=42)

#CNB Classifier

In [None]:
class CNB():
  def __init__(self):
    self.log_probs = []
    self.prior_probs = None
    self.count_matrix = []
    self.n_features = None
    self.class_count = None
    self.log_probs = []
    self.classes = np.array([0,1])
    
  def fit(self, X, y, alpha=1e-3):
    self.n_features = X.shape[1]
    n_classes = y.shape[1]

    unique_categories = np.unique(X)

    for i in range(self.n_features):
      feature_count = []
      feature = X[:,i]
      for j in range(n_classes):
        mask = y[:,j].astype(bool)
        counts = np.bincount(feature[mask], minlength=len(unique_categories))
        feature_count.append(counts)
      self.count_matrix.append(np.array(feature_count))
      self.class_count = y.sum(axis=0)
    
    for i in range(self.n_features):
      num = self.count_matrix[i] + alpha
      den = num.sum(axis=1).reshape(-1,1)
      log_prob = np.log(num) - np.log(den)
      self.log_probs.append(log_prob)
    
    self.prior_probs = np.log(self.class_count) - np.log(self.class_count.sum())

  def predict(self, test_sample):
    probs = np.zeros((1,2))

    for i in range(self.n_features):
      category = test_sample[i]
      probs += self.log_probs[i][:, category]
    
    probs += self.prior_probs
    return self.classes[np.argmax(probs)]

In [None]:
cnb = CNB()
cnb.fit(X_train,y_train)

In [None]:
preds = []

for aa in X_test:
  preds.append(cnb.predict(aa))

In [None]:
print(f'Validation Accuracy: {(np.array(preds) == np.argmax(y_test, axis=1)).mean()}')

Validation Accuracy: 0.6640153676608312


#KNN

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

#normalize input and test input
X_train_norm = (X_train - np.mean(X_train, axis=0))/np.std(X_train, axis=0)
X_test_norm = (X_test - np.mean(X_test))/np.std(X_test)

y = np.argmax(y_train, axis=1)
y_ = np.argmax(y_test, axis=1)

In [None]:
neigh = KNeighborsClassifier(n_neighbors=90)
neigh.fit(X_train_norm, y)
preds = neigh.predict(X_test_norm)

In [None]:
print("Validation accuracy: ", str(accuracy_score(preds, y_)))

Validation accuracy:  0.6697521049949253


#HMM

Code reproduced from numpy ML library to train HMM: https://github.com/ddbourgin/numpy-ml

In [None]:
#numpy ml
class MultinomialHMM:
    def __init__(self, A=None, B=None, pi=None, eps=None):
        r"""
        A simple hidden Markov model with multinomial emission distribution.
        Parameters
        ----------
        A : :py:class:`ndarray <numpy.ndarray>` of shape `(N, N)` or None
            The transition matrix between latent states in the HMM. Index `i`,
            `j` gives the probability of transitioning from latent state `i` to
            latent state `j`. Default is None.
        B : :py:class:`ndarray <numpy.ndarray>` of shape `(N, V)` or None
            The emission matrix. Entry `i`, `j` gives the probability of latent
            state i emitting an observation of type `j`. Default is None.
        pi : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)` or None
            The prior probability of each latent state. If None, use a uniform
            prior over states. Default is None.
        eps : float or None
            Epsilon value to avoid :math:`\log(0)` errors. If None, defaults to
            the machine epsilon. Default is None.
        Attributes
        ----------
        A : :py:class:`ndarray <numpy.ndarray>` of shape `(N, N)`
            The transition matrix between latent states in the HMM. Index `i`,
            `j` gives the probability of transitioning from latent state `i` to
            latent state `j`.
        B : :py:class:`ndarray <numpy.ndarray>` of shape `(N, V)`
            The emission matrix. Entry `i`, `j` gives the probability of latent
            state `i` emitting an observation of type `j`.
        N : int
            The number of unique latent states
        V : int
            The number of unique observation types
        O : :py:class:`ndarray <numpy.ndarray>` of shape `(I, T)`
            The collection of observed training sequences.
        I : int
            The number of sequences in `O`.
        T : int
            The number of observations in each sequence in `O`.
        """
        eps = np.finfo(float).eps if eps is None else eps

        # prior probability of each latent state
        if pi is not None:
            pi[pi == 0] = eps

        # number of latent state types
        N = None
        if A is not None:
            N = A.shape[0]
            A[A == 0] = eps

        # number of observation types
        V = None
        if B is not None:
            V = B.shape[1]
            B[B == 0] = eps

        self.parameters = {
            "A": A,  # transition matrix
            "B": B,  # emission matrix
            "pi": pi,  # prior probability of each latent state
        }

        self.hyperparameters = {
            "eps": eps,  # epsilon
        }

        self.derived_variables = {
            "N": N,  # number of latent state types
            "V": V,  # number of observation types
        }

    def generate(self, n_steps, latent_state_types, obs_types):
        """
        Sample a sequence from the HMM.
        Parameters
        ----------
        n_steps : int
            The length of the generated sequence
        latent_state_types : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            A collection of labels for the latent states
        obs_types : :py:class:`ndarray <numpy.ndarray>` of shape `(V,)`
            A collection of labels for the observations
        Returns
        -------
        states : :py:class:`ndarray <numpy.ndarray>` of shape `(n_steps,)`
            The sampled latent states.
        emissions : :py:class:`ndarray <numpy.ndarray>` of shape `(n_steps,)`
            The sampled emissions.
        """
        P = self.parameters
        A, B, pi = P["A"], P["B"], P["pi"]

        # sample the initial latent state
        s = np.random.multinomial(1, pi).argmax()
        states = [latent_state_types[s]]

        # generate an emission given latent state
        v = np.random.multinomial(1, B[s, :]).argmax()
        emissions = [obs_types[v]]

        # sample a latent transition, rinse, and repeat
        for i in range(n_steps - 1):
            s = np.random.multinomial(1, A[s, :]).argmax()
            states.append(latent_state_types[s])

            v = np.random.multinomial(1, B[s, :]).argmax()
            emissions.append(obs_types[v])

        return np.array(states), np.array(emissions)

    def log_likelihood(self, O):
        r"""
        Given the HMM parameterized by :math:`(A`, B, \pi)` and an observation
        sequence `O`, compute the marginal likelihood of `O`,
        :math:`P(O \mid A,B,\pi)`, by marginalizing over latent states.
        Notes
        -----
        The log likelihood is computed efficiently via DP using the forward
        algorithm, which produces a 2D trellis, ``forward`` (sometimes referred
        to as `alpha` in the literature), where entry `i`, `j` represents the
        probability under the HMM of being in latent state `i` after seeing the
        first `j` observations:
        .. math::
            \mathtt{forward[i,j]} = P(o_1, \ldots, o_j, q_j=i \mid A, B, \pi)
        Here :math:`q_j = i` indicates that the hidden state at time `j` is of
        type `i`.
        The DP step is:
        .. math::
            \mathtt{forward[i,j]}
               &= \sum_{s'=1}^N \mathtt{forward[s',j-1]} \cdot
                   \mathtt{A[s',i]} \cdot \mathtt{B[i,o_j]} \\
               &= \sum_{s'=1}^N P(o_1, \ldots, o_{j-1}, q_{j-1}=s' \mid A, B, \pi)
                    P(q_j=i \mid q_{j-1}=s') P(o_j \mid q_j=i)
        In words, ``forward[i,j]`` is the weighted sum of the values computed on
        the previous timestep. The weight on each previous state value is the
        product of the probability of transitioning from that state to state `i`
        and the probability of emitting observation `j` in state `i`.
        Parameters
        ----------
        O : :py:class:`ndarray <numpy.ndarray>` of shape `(1, T)`
            A single set of observations.
        Returns
        -------
        likelihood : float
            The likelihood of the observations `O` under the HMM.
        """
        if O.ndim == 1:
            O = O.reshape(1, -1)  # noqa: E741

        I, T = O.shape  # noqa: E741

        if I != 1:  # noqa: E741
            raise ValueError("Likelihood only accepts a single sequence")

        forward = self._forward(O[0])
        log_likelihood = logsumexp(forward[:, T - 1])
        return log_likelihood

    def decode(self, O):
        r"""
        Given the HMM parameterized by :math:`(A, B, \pi)` and an observation
        sequence :math:`O = o_1, \ldots, o_T`, compute the most probable
        sequence of latent states, :math:`Q = q_1, \ldots, q_T`.
        Notes
        -----
        HMM decoding is done efficiently via DP using the Viterbi algorithm,
        which produces a 2D trellis, ``viterbi``, where entry `i`, `j` represents the
        probability under the HMM of being in state `i` at time `j` after having
        passed through the *most probable* state sequence :math:`q_1,\ldots,q_{j-1}`:
        .. math::
            \mathtt{viterbi[i,j]} =
                \max_{q_1, \ldots, q_{j-1}}
                    P(o_1, \ldots, o_j, q_1, \ldots, q_{j-1}, q_j=i \mid A, B, \pi)
        Here :math:`q_j = i` indicates that the hidden state at time `j` is of
        type `i`, and :math:`\max_{q_1,\ldots,q_{j-1}}` represents the maximum over
        all possible latent state sequences for the first `j-1` observations.
        The DP step is:
        .. math::
            \mathtt{viterbi[i,j]} &=
                \max_{s'=1}^N \mathtt{viterbi[s',j-1]} \cdot
                    \mathtt{A[s',i]} \cdot \mathtt{B[i,o_j]} \\
               &=  \max_{s'=1}^N
                   P(o_1,\ldots, o_j, q_1, \ldots, q_{j-1}, q_j=i \mid A, B, \pi)
                   P(q_j=i \mid q_{j-1}=s') P(o_j \mid q_j=i)
        In words, ``viterbi[i,j]`` is the weighted sum of the values computed
        on the previous timestep. The weight on each value is the product of
        the probability of transitioning from that state to state `i` and the
        probability of emitting observation `j` in state `i`.
        To compute the most probable state sequence we maintain a second
        trellis, ``back_pointer``, whose `i`, `j` entry contains the value of the
        latent state at timestep `j-1` that is most likely to lead to latent
        state `i` at timestep `j`.
        When we have completed the ``viterbi`` and ``back_pointer`` trellises for
        all `T` timseteps/observations, we greedily move backwards through the
        ``back_pointer`` trellis to construct the best path for the full
        sequence of observations.
        Parameters
        ----------
        O : :py:class:`ndarray <numpy.ndarray>` of shape `(T,)`
            An observation sequence of length `T`.
        Returns
        -------
        best_path : list of length `T`
            The most probable sequence of latent states for observations `O`.
        best_path_prob : float
            The probability of the latent state sequence in `best_path` under
            the HMM.
        """
        P = self.parameters
        N = self.derived_variables["N"]
        eps = self.hyperparameters["eps"]
        A, B, pi = P["A"], P["B"], P["pi"]

        if O.ndim == 1:
            O = O.reshape(1, -1)  # noqa: E741

        # number of observations in each sequence
        T = O.shape[1]

        # number of training sequences
        I = O.shape[0]  # noqa: E741
        if I != 1:  # noqa: E741
            raise ValueError("Can only decode a single sequence (O.shape[0] must be 1)")

        # initialize the viterbi and back_pointer matrices
        viterbi = np.zeros((N, T))
        back_pointer = np.zeros((N, T)).astype(int)

        ot = O[0, 0]
        for s in range(N):
            back_pointer[s, 0] = 0
            viterbi[s, 0] = np.log(pi[s] + eps) + np.log(B[s, ot] + eps)

        for t in range(1, T):
            ot = O[0, t]
            for s in range(N):
                seq_probs = [
                    viterbi[s_, t - 1] + np.log(A[s_, s] + eps) + np.log(B[s, ot] + eps)
                    for s_ in range(N)
                ]

                viterbi[s, t] = np.max(seq_probs)
                back_pointer[s, t] = np.argmax(seq_probs)

        best_path_log_prob = viterbi[:, T - 1].max()

        # backtrack through the trellis to get the most likely sequence of
        # latent states
        pointer = viterbi[:, T - 1].argmax()
        best_path = [pointer]
        for t in reversed(range(1, T)):
            pointer = back_pointer[pointer, t]
            best_path.append(pointer)
        best_path = best_path[::-1]

        return best_path, best_path_log_prob

    def _forward(self, Obs):
        r"""
        Computes the forward probability trellis for an HMM parameterized by
        :math:`(A, B, \pi)`.
        Notes
        -----
        The forward trellis (sometimes referred to as `alpha` in the HMM
        literature), is a 2D array where entry `i`, `j` represents the probability
        under the HMM of being in latent state `i` after seeing the first `j`
        observations:
        .. math::
            \mathtt{forward[i,j]} =
                P(o_1, \ldots, o_j, q_j=i \mid A, B, \pi)
        Here :math:`q_j = i` indicates that the hidden state at time `j` is of
        type `i`.
        The DP step is::
        .. math::
            forward[i,j] &=
                \sum_{s'=1}^N forward[s',j-1] \times A[s',i] \times B[i,o_j] \\
                &= \sum_{s'=1}^N P(o_1, \ldots, o_{j-1}, q_{j-1}=s' \mid A, B, \pi)
                    \times P(q_j=i \mid q_{j-1}=s') \times P(o_j \mid q_j=i)
        In words, ``forward[i,j]`` is the weighted sum of the values computed
        on the previous timestep. The weight on each previous state value is
        the product of the probability of transitioning from that state to
        state `i` and the probability of emitting observation `j` in state `i`.
        Parameters
        ----------
        Obs : :py:class:`ndarray <numpy.ndarray>` of shape `(T,)`
            An observation sequence of length `T`.
        Returns
        -------
        forward : :py:class:`ndarray <numpy.ndarray>` of shape `(N, T)`
            The forward trellis.
        """
        P = self.parameters
        N = self.derived_variables["N"]
        eps = self.hyperparameters["eps"]
        A, B, pi = P["A"], P["B"], P["pi"]

        T = Obs.shape[0]

        # initialize the forward probability matrix
        forward = np.zeros((N, T))

        ot = Obs[0]
        for s in range(N):
            forward[s, 0] = np.log(pi[s] + eps) + np.log(B[s, ot] + eps)

        for t in range(1, T):
            ot = Obs[t]
            for s in range(N):
                forward[s, t] = logsumexp(
                    [
                        forward[s_, t - 1]
                        + np.log(A[s_, s] + eps)
                        + np.log(B[s, ot] + eps)
                        for s_ in range(N)
                    ]  # noqa: C812
                )
        return forward

    def _backward(self, Obs):
        r"""
        Compute the backward probability trellis for an HMM parameterized by
        :math:`(A, B, \pi)`.
        Notes
        -----
        The backward trellis (sometimes referred to as `beta` in the HMM
        literature), is a 2D array where entry `i`,`j` represents the probability
        of seeing the observations from time `j+1` onward given that the HMM is
        in state `i` at time `j`
        .. math::
            \mathtt{backward[i,j]} = P(o_{j+1},o_{j+2},\ldots,o_T \mid q_j=i,A,B,\pi)
        Here :math:`q_j = i` indicates that the hidden state at time `j` is of type `i`.
        The DP step is::
            backward[i,j] &=
                \sum_{s'=1}^N backward[s',j+1] \times A[i, s'] \times B[s',o_{j+1}] \\
                &= \sum_{s'=1}^N P(o_{j+1}, o_{j+2}, \ldots, o_T \mid q_j=i, A, B, pi)
                    \times P(q_{j+1}=s' \mid q_{j}=i) \times P(o_{j+1} \mid q_{j+1}=s')
        In words, ``backward[i,j]`` is the weighted sum of the values computed
        on the following timestep. The weight on each state value from the
        `j+1`'th timestep is the product of the probability of transitioning from
        state i to that state and the probability of emitting observation `j+1`
        from that state.
        Parameters
        ----------
        Obs : :py:class:`ndarray <numpy.ndarray>` of shape `(T,)`
            A single observation sequence of length `T`.
        Returns
        -------
        backward : :py:class:`ndarray <numpy.ndarray>` of shape `(N, T)`
            The backward trellis.
        """
        P = self.parameters
        A, B = P["A"], P["B"]
        N = self.derived_variables["N"]
        eps = self.hyperparameters["eps"]

        T = Obs.shape[0]

        # initialize the backward trellis
        backward = np.zeros((N, T))

        for s in range(N):
            backward[s, T - 1] = 0

        for t in reversed(range(T - 1)):
            ot1 = Obs[t + 1]
            for s in range(N):
                backward[s, t] = logsumexp(
                    [
                        np.log(A[s, s_] + eps)
                        + np.log(B[s_, ot1] + eps)
                        + backward[s_, t + 1]
                        for s_ in range(N)
                    ]  # noqa: C812
                )
        return backward

    def _initialize_parameters(self):
        P = self.parameters
        A, B, pi = P["A"], P["B"], P["pi"]
        N, V = self.derived_variables["N"], self.derived_variables["V"]

        # Uniform initialization of prior over latent states
        if pi is None:
            pi = np.ones(N)
            pi = pi / pi.sum()

        # Uniform initialization of A
        if A is None:
            A = np.ones((N, N))
            A = A / A.sum(axis=1)[:, None]

        # Random initialization of B
        if B is None:
            B = np.random.rand(N, V)
            B = B / B.sum(axis=1)[:, None]

        P["A"], P["B"], P["pi"] = A, B, pi

    def fit(
        self,
        O,
        latent_state_types,
        observation_types,
        pi=None,
        tol=1e-5,
        verbose=False,
    ):
        """
        Given an observation sequence `O` and the set of possible latent states,
        learn the MLE HMM parameters `A` and `B`.
        Notes
        -----
        Model fitting is done iterativly using the Baum-Welch/Forward-Backward
        algorithm, a special case of the EM algorithm.
        We begin with an intial estimate for the transition (`A`) and emission
        (`B`) matrices and then use these to derive better and better estimates
        by computing the forward probability for an observation and then
        dividing that probability mass among all the paths that contributed to
        it.
        Parameters
        ----------
        O : :py:class:`ndarray <numpy.ndarray>` of shape `(I, T)`
            The set of `I` training observations, each of length `T`.
        latent_state_types : list of length `N`
            The collection of valid latent states.
        observation_types : list of length `V`
            The collection of valid observation states.
        pi : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            The prior probability of each latent state. If None, assume each
            latent state is equally likely a priori. Default is None.
        tol : float
            The tolerance value. If the difference in log likelihood between
            two epochs is less than this value, terminate training. Default is
            1e-5.
        verbose : bool
            Print training stats after each epoch. Default is True.
        Returns
        -------
        A : :py:class:`ndarray <numpy.ndarray>` of shape `(N, N)`
            The estimated transition matrix.
        B : :py:class:`ndarray <numpy.ndarray>` of shape `(N, V)`
            The estimated emission matrix.
        pi : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            The estimated prior probabilities of each latent state.
        """
        # observations
        if O.ndim == 1:
            O = O.reshape(1, -1)  # noqa: E741

        # number of training examples (I) and their lengths (T)
        I, T = O.shape

        # number of types of observation
        self.derived_variables["V"] = len(observation_types)

        # number of latent state types
        self.derived_variables["N"] = len(latent_state_types)

        self._initialize_parameters()

        P = self.parameters

        # iterate E and M steps until convergence criteria is met
        step, delta = 0, np.inf
        ll_prev = np.sum([self.log_likelihood(o) for o in O])

        while delta > tol:
            gamma, xi, phi = self._E_step(O)
            P["A"], P["B"], P["pi"] = self._M_step(O, gamma, xi, phi)
            ll = np.sum([self.log_likelihood(o) for o in O])
            delta = ll - ll_prev
            ll_prev = ll
            step += 1

            if verbose:
                fstr = "[Epoch {}] LL: {:.3f} Delta: {:.5f}"
                print(fstr.format(step, ll_prev, delta))

        #  return A, B, pi

    def _E_step(self, O):
        r"""
        Run a single E-step update for the Baum-Welch/Forward-Backward
        algorithm. This step estimates ``xi`` and ``gamma``, the excepted
        state-state transition counts and the expected state-occupancy counts,
        respectively.
        ``xi[i,j,k]`` gives the probability of being in state `i` at time `k`
        and state `j` at time `k+1` given the observed sequence `O` and the
        current estimates for transition (`A`) and emission (`B`) matrices::
        .. math::
            xi[i,j,k] &= P(q_k=i,q_{k+1}=j \mid O,A,B,pi) \\
                      &= \frac{
                            P(q_k=i,q_{k+1}=j,O \mid A,B,pi)
                         }{P(O \mid A,B,pi)} \\
                      &= \frac{
                            P(o_1,o_2,\ldots,o_k,q_k=i \mid A,B,pi) \times
                            P(q_{k+1}=j \mid q_k=i) \times
                            P(o_{k+1} \mid q_{k+1}=j) \times
                            P(o_{k+2},o_{k+3},\ldots,o_T \mid q_{k+1}=j,A,B,pi)
                         }{P(O \mid A,B,pi)} \\
                      &= \frac{
                            \mathtt{fwd[j, k] * self.A[j, i] *
                            self.B[i, o_{k+1}] * bwd[i, k + 1]}
                         }{\mathtt{fwd[:, T].sum()}}
        The expected number of transitions from state `i` to state `j` across the
        entire sequence is then the sum over all timesteps: ``xi[i,j,:].sum()``.
        ``gamma[i,j]`` gives the probability of being in state `i` at time `j`
        .. math:: \mathtt{gamma[i,j]} = P(q_j = i \mid O, A, B, \pi)
        Parameters
        ----------
        O : :py:class:`ndarray <numpy.ndarray>` of shape `(I, T)`
            The set of `I` training observations, each of length `T`.
        Returns
        -------
        gamma : :py:class:`ndarray <numpy.ndarray>` of shape `(I, N, T)`
            The estimated state-occupancy count matrix.
        xi : :py:class:`ndarray <numpy.ndarray>` of shape `(I, N, N, T)`
            The estimated state-state transition count matrix.
        phi : :py:class:`ndarray <numpy.ndarray>` of shape `(I, N)`
            The estimated prior counts for each latent state.
        """
        I, T = O.shape
        P = self.parameters
        A, B = P["A"], P["B"]
        N = self.derived_variables["N"]
        eps = self.hyperparameters["eps"]

        phi = np.zeros((I, N))
        gamma = np.zeros((I, N, T))
        xi = np.zeros((I, N, N, T))

        for i in range(I):
            Obs = O[i, :]
            fwd = self._forward(Obs)
            bwd = self._backward(Obs)
            log_likelihood = logsumexp(fwd[:, T - 1])

            t = T - 1
            for si in range(N):
                gamma[i, si, t] = fwd[si, t] + bwd[si, t] - log_likelihood
                phi[i, si] = fwd[si, 0] + bwd[si, 0] - log_likelihood

            for t in range(T - 1):
                ot1 = Obs[t + 1]
                for si in range(N):
                    gamma[i, si, t] = fwd[si, t] + bwd[si, t] - log_likelihood
                    for sj in range(N):
                        xi[i, si, sj, t] = (
                            fwd[si, t]
                            + np.log(A[si, sj] + eps)
                            + np.log(B[sj, ot1] + eps)
                            + bwd[sj, t + 1]
                            - log_likelihood
                        )

        return gamma, xi, phi

    def _M_step(self, O, gamma, xi, phi):
        """
        Run a single M-step update for the Baum-Welch/Forward-Backward
        algorithm.
        Parameters
        ----------
        O : :py:class:`ndarray <numpy.ndarray>` of shape `(I, T)`
            The set of `I` training observations, each of length `T`.
        gamma : :py:class:`ndarray <numpy.ndarray>` of shape `(I, N, T)`
            The estimated state-occupancy count matrix.
        xi : :py:class:`ndarray <numpy.ndarray>` of shape `(I, N, N, T)`
            The estimated state-state transition count matrix.
        phi : :py:class:`ndarray <numpy.ndarray>` of shape `(I, N)`
            The estimated starting count matrix for each latent state.
        Returns
        -------
        A : :py:class:`ndarray <numpy.ndarray>` of shape `(N, N)`
            The estimated transition matrix.
        B : :py:class:`ndarray <numpy.ndarray>` of shape `(N, V)`
            The estimated emission matrix.
        pi : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            The estimated prior probabilities for each latent state.
        """
        I, T = O.shape
        P = self.parameters
        DV = self.derived_variables
        eps = self.hyperparameters["eps"]

        N, V = DV["N"], DV["V"]
        A, B, pi = P["A"], P["B"], P["pi"]

        # initialize the estimated transition (A) and emission (B) matrices
        A = np.zeros((N, N))
        B = np.zeros((N, V))
        pi = np.zeros(N)

        count_gamma = np.zeros((I, N, V))
        count_xi = np.zeros((I, N, N))

        for i in range(I):
            Obs = O[i, :]
            for si in range(N):
                for vk in range(V):
                    if not (Obs == vk).any():
                        count_gamma[i, si, vk] = np.log(eps)
                    else:
                        count_gamma[i, si, vk] = logsumexp(gamma[i, si, Obs == vk])

                for sj in range(N):
                    count_xi[i, si, sj] = logsumexp(xi[i, si, sj, :])

        pi = logsumexp(phi, axis=0) - np.log(I + eps)
        np.testing.assert_almost_equal(np.exp(pi).sum(), 1)

        for si in range(N):
            for vk in range(V):
                B[si, vk] = logsumexp(count_gamma[:, si, vk]) - logsumexp(
                    count_gamma[:, si, :]  # noqa: C812
                )

            for sj in range(N):
                A[si, sj] = logsumexp(count_xi[:, si, sj]) - logsumexp(
                    count_xi[:, si, :]  # noqa: C812
                )

            np.testing.assert_almost_equal(np.exp(A[si, :]).sum(), 1)
            np.testing.assert_almost_equal(np.exp(B[si, :]).sum(), 1)
        return np.exp(A), np.exp(B), np.exp(pi)

In [29]:
#get the prior probabilities of the 2 states ('-' -> 1 or 'H' -> 0)
dct = pre.label_dct
hidden_states = pre.flatY

count_zero = hidden_states.count(0)
count_one = hidden_states.count(1)

pi = np.array([count_zero/len(hidden_states), count_one/len(hidden_states)])
pi

array([0.39728068, 0.60271932])

In [30]:
def transition_matrix(arr):
    M = np.zeros(shape=(max(arr) + 1, max(arr) + 1))
    for (i, j) in zip(arr, arr[1:]):
        M[i, j] += 1
    return (M.T / M.sum(axis=1)).T

In [32]:
A = transition_matrix(np.array(pre.flatY))
A

array([[0.90451578, 0.09548422],
       [0.06293824, 0.93706176]])

In [None]:
#calculate the observable emission matrix

#determine the indices for helix and non-helix
h_inds = np.where(np.array(pre.flatY) == 0)[0]
nh_inds = np.where(np.array(pre.flatY) == 1)[0]

#get all the corresponding 'observations' or AAs for when in a helix or not
obs_h = np.array(pre.flatX)[h_inds]
obs_nh = np.array(pre.flatX)[nh_inds]

#get the counts for each of these amino acids in both states
aa_counts_h = np.bincount(obs_h)
aa_counts_nh = np.bincount(obs_nh)

#turn into emission probabilities matrix (probabilities of each amino acid given a helix or not)
p_aa_h = aa_counts_h/len(h_inds)
p_aa_nh = aa_counts_nh/len(nh_inds)

theta = np.array([p_aa_h, p_aa_nh])
theta

In [None]:
model = MultinomialHMM(A=A, B=theta, pi=pi)

In [None]:
obs, prob = model.decode(np.array(pre.flatX))

In [None]:
print(f'Training Accuracy: {(np.array(obs) == np.array(pre.flatY)).mean()}')

#Preprocessing for LSTM

In [33]:
ngrams = np.array([[seq[i:i+3] for i in range(len(seq))] for seq in seqs], dtype=object) # creat ngrams from input training data

def tokenize(arr):
  set_ngrams = set([])
  for lst in arr:
    for ngram in lst:
      set_ngrams.add(ngram.lower())

  ngram2idx = {ng: i+1 for i, ng in enumerate(list(set_ngrams))}
  return ngram2idx

ngram_dict = tokenize(ngrams)

def transform(arr, dict):
  seqs2int = []
  for lst in arr:
    ints = []
    for i in lst:
      ints.append(dict[i.lower()])
    seqs2int.append(ints)
  return seqs2int

train_X = transform(ngrams, ngram_dict) # now all the ngrams are tokenized

In [34]:
# function to pad or truncate list to specific length
pad = lambda a,i : a[0:i] if len(a) > i else a + [0] * (i-len(a))

padded = []

for l in train_X:
  pl = pad(l, 1419)
  padded.append(pl)

np_train_X = np.array(padded)

In [35]:
def char_tokenizer(lst): # pass in a list of the labels
  set_labels = set([])
  for l in lst:
    for seq in l:
      for char in list(seq):
        set_labels.add(char.lower())
  label2idx = {l: i+1 for i, l in enumerate(list(set_labels))}
  return label2idx 

label_dict = char_tokenizer(labels) # fit the labels

train_y = transform(labels, label_dict) #labels are now tokenized

padded_y = []
for l in train_y:
  pl = pad(l, 1419)
  padded_y.append(pl)

np_train_y = np.array(padded_y)

In [36]:
oh_y_train = np.eye(3)[np_train_y] # 3 categories since '-, H, 0' for the padding

In [37]:
#create training and testing splits
X_train, X_test, y_train, y_test = train_test_split(np_train_X, oh_y_train, 
                                                    test_size=0.25, random_state=42)

In [38]:
#append missing value
ngram_dict['mwm'] = 8413

In [39]:
X_train.shape, y_train.shape

((3994, 1419), (3994, 1419, 3))

#LSTM Model

In [40]:
from keras.models import Sequential
from keras.layers import Dense, Embedding, LSTM, TimeDistributed
from keras.optimizers import Adam

In [41]:
n_words = len(ngram_dict) + 1 #plus 1 for the padding token 0
max_length = 1419

model = Sequential()
model.add(Embedding(input_shape=(max_length,), input_dim=n_words, output_dim=4))
model.add(LSTM(4, return_sequences=True))
model.add(TimeDistributed(Dense(3, activation='softmax')))

In [42]:
#we need to calculate accuracy that ignores the padding
import tensorflow as tf
from keras import backend as K

def no_pad_acc(y_true, y_pred):
  y = tf.argmax(y_true, axis=-1)
  y_ = tf.argmax(y_pred, axis=-1)
  mask = tf.greater(y, 0)
  return K.cast(K.equal(tf.boolean_mask(y, mask), tf.boolean_mask(y_, mask)), K.floatx())

In [43]:
model.compile(loss="categorical_crossentropy", optimizer=Adam(0.08),
              metrics=['accuracy', no_pad_acc])

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       (None, 1419, 4)           33656     
                                                                 
 lstm (LSTM)                 (None, 1419, 4)           144       
                                                                 
 time_distributed (TimeDistr  (None, 1419, 3)          15        
 ibuted)                                                         
                                                                 
Total params: 33,815
Trainable params: 33,815
Non-trainable params: 0
_________________________________________________________________


In [None]:
# use GPU for this
model.fit(X_train,y_train,epochs=6, validation_data=(X_test, y_test), verbose=1)