In [1]:
import re
from typing import Union, List, NamedTuple

import numpy as np
import pandas as pd
from scipy.io import loadmat
from skbio.sequence import Sequence

# Data pre-processing

In [2]:
sequences = \
  pd.read_table("data/3U_sequences_final.txt",
                header=None,
                names=["id", "sequence"],
                dtype={"id": str},
                converters={"sequence": lambda string: string.upper()})

In [3]:
deg_rate_a_plus = pd.read_table("data/3U.models.3U.40A.seq1022_param.txt",
                                header=None,
                                names=["id", "log2_deg_rate", "log2_x0", "onset_time"],
                                dtype={"id": str})
deg_rate_a_minus = pd.read_table("data/3U.models.3U.00A.seq1022_param.txt",
                                 header=None,
                                 names=["id", "log2_deg_rate", "log2_x0", "onset_time"],
                                 dtype={"id": str})
assert (deg_rate_a_plus["id"] == deg_rate_a_minus["id"]).all()

In [5]:
def occurrences(string: Union[str, pd.Series, pd.Index], sub: str) -> np.ndarray:
    """
    Count the overlapping occurrences of a string in another string.
    """
    if isinstance(string, str):
        string = pd.Series([string])
    # https://stackoverflow.com/a/11706065/851560
    return string.str.count(f"(?={re.escape(sub)})").to_numpy()

# Linear Regression

In [6]:
class Model(NamedTuple):
    coefficients: np.ndarray
    intercept: np.float
    kmers: List[str]

    def predict(self, sequences: pd.Series) -> np.ndarray:
        X = np.zeros((len(sequences), len(self.coefficients)), dtype=int)

        # Count the occurrences of each feature k-mer in each of the sequences
        # we are predicting.
        for column, kmer in enumerate(self.kmers):
            X[:, column] = occurrences(sequences, kmer)

        prediction = X @ self.coefficients + self.intercept

        return prediction

def load_model(filename: str) -> Model:
    raw_model = loadmat(filename)
    kmers = [element[0][0] for element in raw_model["Krows"]]

    return Model(raw_model["B0"], raw_model["b0"][0][0], kmers)

In [7]:
model_a_plus = load_model("data/run_linear_3U_40A_dg_BEST.out.mat")
model_a_minus = load_model("data/run_linear_3U_00Am1_dg_BEST.out.mat")

In [8]:
prediction_a_minus = np.log2(model_a_minus.predict(sequences["sequence"]))

  """Entry point for launching an IPython kernel.


In [9]:
prediction_a_plus = np.log2(model_a_plus.predict(sequences["sequence"]))

  """Entry point for launching an IPython kernel.


In [10]:
prediction_df = pd.DataFrame({"id": sequences["id"],
                              "a_minus": prediction_a_minus.T[0],
                              "a_plus": prediction_a_plus.T[0]})
prediction_df.sort_values(by=["id"], inplace=True)

In [11]:
compare_df = \
  pd.read_table("data/models_full_dg.txt",
                header=None,
                names=["id", "a_plus", "a_minus"])
compare_df.sort_values(by=["id"], inplace=True)

assert compare_df.loc[0]["id"] == "EMPTY"
a_minus_clip = compare_df.loc[0]["a_minus"]
a_plus_clip = compare_df.loc[0]["a_plus"]

compare_df.drop(0, inplace=True)

In [12]:
prediction_df["a_minus"].fillna(a_minus_clip, inplace=True)
prediction_df["a_minus"].clip(lower=a_minus_clip, inplace=True)

prediction_df["a_plus"].fillna(a_plus_clip, inplace=True)
prediction_df["a_plus"].clip(lower=a_plus_clip, inplace=True)

In [13]:
np.allclose(prediction_df["a_minus"].to_numpy(), compare_df["a_minus"].to_numpy(), atol=1e-4, rtol=0, equal_nan=True)

True

In [14]:
np.allclose(prediction_df["a_plus"].to_numpy(), compare_df["a_plus"].to_numpy(), atol=1e-4, rtol=0, equal_nan=True)

True