In [1]:
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix
from collections import Counter

with open('train.dat', 'r') as fh:
    train = fh.readlines()

with open('test.dat', 'r') as fh:
    test = fh.readlines()

In [2]:
# ytrain, = labels for the training set
ytr = [int(p.split('\t')[0]) for p in train]
tr = [p.split('\t')[1].strip() for p in train]
te = [p.strip() for p in test]

def kmer(p, k=2):
    return [p[i:i+k] for i in range(0, len(p)-k+1)]

def kmers(p, k=2):
    els = []
    for i in range(1,k):
        els.extend(kmer(p, k=i))
    return els

tr = [kmers(p, k=3) for p in tr]
te = [kmers(p, k=3) for p in te]

#Assign IDS and create dictionary. each peptide is represented as a vector with 436 features
# Now we put the count in that vector of how many times we find a particular feature
# Dense is bad, as it keeps all features including counts of 0
# Sparse is much faster as it removes the 0 values from the vector features

mp = {} # dictionary
for p in tr+te:
    for e in p:
        if e not in mp:
            mp[e] = len(mp)
#len(mp)

# for each of the peptides we create a count of their features. by default it is dense, but we can do work to remove the 0's to make it sparse.

# transforming peptide to vector
def dense(p):
    x  = np.zeros(len(mp))
    for e in p:
        x[mp[e]] += 1
    return x
#dense(tr[0])

# sparse dataset from all the peptides. # csr matrix only stores non-zeros. it does not store the zeros
def sparse(ds):
    nrows = len(ds) # number of rows
    ptr = np.zeros(nrows+1, dtype=int) #pointer array
    nnz = 0 # number of non-zeros
    for i, p in enumerate(ds): # figuring out size of each row
        ps = set(p)
        nnz += len(ps)
        ptr[i+1] = nnz # populate pointers of where the row starts and ends
    ind = np.zeros(nnz, dtype=int)
    val = np.zeros(nnz, dtype=float)
    nnz = 0
    for p in ds:
        ct = Counter(p).most_common()
        for e, c in ct:
            val[nnz] = c
            ind[nnz] = mp[e]
            nnz += 1
    return csr_matrix((val, ind, ptr), shape=(nrows, len(mp)))

sptr = sparse(tr)
spte = sparse(te)
X = sptr.toarray()
ytr_array = np.array(ytr)

In [3]:
positive_samples = X[ytr_array == 1]
negative_samples = X[ytr_array == -1]

# Count the instances
num_positive = positive_samples.shape[0]
num_negative = negative_samples.shape[0]

# Calculate how many positive samples to duplicate. Can change formula below to change oversampling of positives
num_to_duplicate = (num_negative - num_positive) // 2

# Randomly duplicate positive samples
duplicated_samples = positive_samples[np.random.randint(num_positive, size=num_to_duplicate)]

# Combine the original dataset with the duplicated samples
X_train_balanced = np.vstack((X, duplicated_samples))

y_train_balanced = np.hstack((ytr_array, np.ones(num_to_duplicate)))  # Label for positive class is 1

print("Positive labels:", num_positive)
print("Negative labels:", num_negative)
print("Labels to be duplicated:", len(duplicated_samples))
print("New 'balanced' size:", len(X_train_balanced))

Positive labels: 142
Negative labels: 1424
Labels to be duplicated: 641
New 'balanced' size: 2207


In [4]:
indices = np.arange(X_train_balanced.shape[0])
np.random.shuffle(indices)

# Use the shuffled indices to reorder both features and labels
X_train_balanced_shuffled = X_train_balanced[indices]
y_train_balanced_shuffled = y_train_balanced[indices]

# Split the dataset into two halves
split_index = X_train_balanced_shuffled.shape[0] // 2

# WORKING SETS
X_train_oversampled = X_train_balanced_shuffled
y_train_oversampled = y_train_balanced_shuffled
X_test_set = spte.toarray()

# SPLIT VALIDATION SETS
X_train_validation = X_train_balanced_shuffled[:split_index]
y_train_validation = y_train_balanced_shuffled[:split_index]

X_test_validation = X_train_balanced_shuffled[split_index:]
y_test_validation = y_train_balanced_shuffled[split_index:]

print("Previous dataset size:", len(X))
print("Balanced dataset size:", len(X_train_oversampled))

Previous dataset size: 1566
Balanced dataset size: 2207


In [5]:
class NeuralLayer:
    def __init__(self):
        self.input_data = None
        self.output_data = None

    def forward(self, input):
        """
        Abstract method: overridden in subclasses.
        Method should implement the forward propagation logic specific to the layer type.
        """
        raise NotImplementedError

    def backward(self, error, learning_rate):
        """
        Abstract method: overridden in subclasses.
        Method should implement the backward propagation logic specific to the layer type,
        including the computation of gradients and updating any learnable parameters.
        """
        raise NotImplementedError

In [6]:
class DenseLayer(NeuralLayer):
    def __init__(self, neurons_input, neurons_output):
        self.weights = np.random.rand(neurons_input, neurons_output) - 0.5
        self.biases = np.random.rand(1, neurons_output) - 0.5

    def forward(self, input):
        self.input_data = input
        self.output_data = np.dot(self.input_data, self.weights) + self.biases
        return self.output_data

    def backward(self, error, learning_rate):
        error_input = np.dot(error, self.weights.T)
        error_weights = np.dot(self.input_data.T, error)

        self.weights -= learning_rate * error_weights
        self.biases -= learning_rate * error
        return error_input

In [7]:
class ActivationFunctionLayer(NeuralLayer):
    def __init__(self, function, function_derivative):
        self.function = function
        self.function_derivative = function_derivative

    def forward(self, input):
        self.input_data = input
        self.output_data = self.function(self.input_data)
        return self.output_data

    def backward(self, error, learning_rate):
        return self.function_derivative(self.input_data) * error

In [8]:
# Activation functions
def activation_relu(x):
    return np.maximum(0, x)

def activation_relu_derivative(x):
    return (x > 0).astype(float)

In [9]:
# Activation functions
def activation_sigmoid(x):
    return 1 / (1 + np.exp(-x))

def activation_sigmoid_derivative(x):
    s = sigmoid(x)
    return s * (1 - s)

In [10]:
# Activation functions
def activation_tanh(x):
    return np.tanh(x)

def activation_tanh_derivative(x):
    return 1 - np.tanh(x) ** 2

In [11]:
# Loss functions
def loss_mse(actual, predicted):
    return np.mean(np.power(actual - predicted, 2))

def loss_mse_derivative(actual, predicted):
    return 2 * (predicted - actual) / actual.size

In [12]:
class NeuralNetwork:
    def __init__(self):
        self.layers = []
        self.loss_function = None
        self.loss_derivative = None

    def add_layer(self, layer):
        self.layers.append(layer)

    def set_loss(self, loss_function, loss_derivative):
        self.loss_function = loss_function
        self.loss_derivative = loss_derivative

    def predict(self, input_data):
        samples_count = len(input_data)
        results = []

        for i in range(samples_count):
            output = input_data[i]
            for layer in self.layers:
                output = layer.forward(output)
            results.append(output)

        return results

    def train(self, inputs, targets, epochs, learning_rate):
        sample_count = len(inputs)

        for epoch in range(epochs):
            error_sum = 0
            for i in range(sample_count):
                output = inputs[i]
                for layer in self.layers:
                    output = layer.forward(output)

                error_sum += self.loss_function(targets[i], output)

                error = self.loss_derivative(targets[i], output)
                for layer in reversed(self.layers):
                    error = layer.backward(error, learning_rate)

            avg_error = error_sum / sample_count
            print(f"Epoch {epoch+1}/{epochs} - Error: {avg_error:.6f}")

In [13]:
def classify(X_train, y_train, X_test):
    # Expand dims of sets
    X_train_3d = np.expand_dims(X_train, axis=1)
    y_train_3d = np.expand_dims(y_train, axis=1)
    X_test_3d = np.expand_dims(X_test, axis=1)
    
    # neural net create
    neural_net = NeuralNetwork()
    neural_net.add_layer(DenseLayer(436, 64)) # Input layer
    neural_net.add_layer(ActivationFunctionLayer(activation_tanh, activation_tanh_derivative)) 
    neural_net.add_layer(DenseLayer(64, 32)) # Hiddden layer 1
    neural_net.add_layer(ActivationFunctionLayer(activation_tanh, activation_tanh_derivative)) 
    neural_net.add_layer(DenseLayer(32, 16)) # Hidden layer 2
    neural_net.add_layer(ActivationFunctionLayer(activation_tanh, activation_tanh_derivative))
    neural_net.add_layer(DenseLayer(16, 1)) # Output layer
    neural_net.add_layer(ActivationFunctionLayer(activation_tanh, activation_tanh_derivative))

    # training
    neural_net.set_loss(loss_mse, loss_mse_derivative)
    neural_net.train(X_train_3d, y_train_3d, epochs=200, learning_rate=0.01)
    
    # predict
    predictions = neural_net.predict(X_test_3d)
    
    predictions_array = np.array([item[0][0] for item in predictions]) # Strip dimensions
    
    threshold = 0.5
    predictions_classified = np.where(predictions_array >= threshold, 1, -1)
    
    # Write predicted labels to file
    with open('test.txt', 'w') as fh:
        fh.write("\n".join(map(str, predictions_classified)))
    
    return predictions_classified



#out_classified = classify(X_train_oversampled, y_train_oversampled, X_test_set)

In [14]:
out_classified = classify(X_train_validation, y_train_validation, X_test_validation)

print(out_classified)

print(np.array(ytr_array)) # for viewing predicted labels vs true labels

Epoch 1/200 - Error: 0.295088
Epoch 2/200 - Error: 0.138415
Epoch 3/200 - Error: 0.064585
Epoch 4/200 - Error: 0.034820
Epoch 5/200 - Error: 0.027940
Epoch 6/200 - Error: 0.030739
Epoch 7/200 - Error: 0.022494
Epoch 8/200 - Error: 0.020473
Epoch 9/200 - Error: 0.018819
Epoch 10/200 - Error: 0.018662
Epoch 11/200 - Error: 0.018567
Epoch 12/200 - Error: 0.018501
Epoch 13/200 - Error: 0.018451
Epoch 14/200 - Error: 0.018413
Epoch 15/200 - Error: 0.018382
Epoch 16/200 - Error: 0.018356
Epoch 17/200 - Error: 0.018335
Epoch 18/200 - Error: 0.018316
Epoch 19/200 - Error: 0.018300
Epoch 20/200 - Error: 0.018285
Epoch 21/200 - Error: 0.018271
Epoch 22/200 - Error: 0.018259
Epoch 23/200 - Error: 0.018246
Epoch 24/200 - Error: 0.018233
Epoch 25/200 - Error: 0.018218
Epoch 26/200 - Error: 0.018200
Epoch 27/200 - Error: 0.018170
Epoch 28/200 - Error: 0.018106
Epoch 29/200 - Error: 0.017797
Epoch 30/200 - Error: 0.012546
Epoch 31/200 - Error: 0.005752
Epoch 32/200 - Error: 0.004705
Epoch 33/200 - Er

In [15]:
# Calculate Matthew's Correlation Coefficient
def calculate_mcc(actual, predicted):
    tp = np.sum((actual == 1) & (predicted == 1))
    tn = np.sum((actual == -1) & (predicted == -1))
    fp = np.sum((actual == -1) & (predicted == 1))
    fn = np.sum((actual == 1) & (predicted == -1))

    total_samples = len(actual)
    
    # Normalize the counts to prevent overflow
    tp /= total_samples
    tn /= total_samples
    fp /= total_samples
    fn /= total_samples

    mcc_numerator = (tp * tn) - (fp * fn)
    mcc_denominator = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))

    # Handling division by zero
    if mcc_denominator == 0:
        return 0

    mcc = mcc_numerator / mcc_denominator
    return mcc

mcc_score = calculate_mcc(np.array(y_test_validation), out_classified)
print("Matthew's Correlation Coefficient:", mcc_score)


Matthew's Correlation Coefficient: 0.9739832430540253
