In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KernelDensity
from sklearn.metrics import r2_score

In [None]:
path = "/home/j.derks/mTRAQ_WC_THP1_mac_library_04202023.tsv"
df = pd.read_csv(path, delimiter='\t')
original_df = pd.DataFrame(df)
distinct_df = original_df.drop_duplicates(subset='transition_group_id')

In [None]:
peptide_sequence = "ACDEFGHIKLMNPQRSTVWYacdefghiklmnpqrstvwy"   # define peptide sequence as a string 
max_sequence_length = 30    # set the maximum sequence length
num_amino_acids = 20      # number of possible amino acids

# create a dictionary to map amino acids to their indices
amino_acid_to_index = {aa: i for i, aa in enumerate(peptide_sequence)}

def one_hot_encode_sequence(sequence):
    one_hot_sequence = np.zeros((max_sequence_length, num_amino_acids), dtype=np.float32)
    for i, aa in enumerate(sequence[:max_sequence_length]):
        aa_index = amino_acid_to_index.get(aa.upper(), None)
        if aa_index is not None:
            one_hot_sequence[i, aa_index] = 1.0
    return one_hot_sequence

X_peptide = [one_hot_encode_sequence(seq) for seq in distinct_df['PeptideSequence']]

# One-hot encode charge states (assumes a maximum charge state of 5)
max_charge_state = 5
X_charge = np.zeros((len(distinct_df), max_charge_state), dtype=np.float32)

for i, charge in enumerate(distinct_df['PrecursorCharge']):
    if 1 <= charge <= max_charge_state:
        X_charge[i, charge - 1] = 1.0

# concatenate peptide sequences and charge states to create the final input (X)
X = [np.concatenate((X_peptide[i], np.tile(X_charge[i], (max_sequence_length, 1))), axis=1) for i in range(len(X_peptide))]
Y = distinct_df['IonMobility'].values
weights = distinct_df['Ms1ProfileCorr'].values

# Split the data into training and testing sets
X_train, X_test, Y_train, Y_test, weights_train, weights_test = train_test_split(X, Y, weights, test_size=0.1, random_state=42)

In [None]:
def weighted_mean_absolute_error(y_true, y_pred, weights):
    """
    Weighted Mean Absolute Error custom loss function.

    :param y_true: True labels
    :param y_pred: Predicted labels
    :param weights: Weights for each sample (based on Ms1ProfileCorr)
    :return: Weighted MAE
    """
    # Calculate the absolute error
    error = tf.abs(y_pred - y_true)
    transformed_weights = weights+0.1

    # Apply weights
    weighted_error = error * transformed_weights
    
    # Return the mean of the weighted error
    return tf.reduce_mean(weighted_error)

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=(max_sequence_length, num_amino_acids + max_charge_state)),
    tf.keras.layers.Dense(300, activation='swish'),
    tf.keras.layers.Dense(220, activation='swish'),
    tf.keras.layers.Dense(160, activation='swish'),
    tf.keras.layers.Dense(100, activation='swish'),
    tf.keras.layers.Dense(50, activation='swish'),
    tf.keras.layers.Dense(10, activation='swish'),
    tf.keras.layers.Dense(1, activation='linear') 
])

# compile
model.compile(optimizer='adam', loss=lambda y_true, y_pred: weighted_mean_absolute_error(y_true, y_pred, weights))
#model.compile(optimizer='adam', loss='mse')  # Customize loss function for your task

# train
model.fit(np.array(X_train), Y_train, epochs=10, batch_size=32, validation_split=0.1)

# predict
Y_pred = model.predict(np.array(X_test))
Y_pred = Y_pred.flatten()

# evaluate the model
mse = np.mean((Y_pred - Y_test) ** 2)
print("Mean Squared Error:", mse)

mae = np.mean(np.abs(Y_pred - Y_test))
print("Mean Absolute Error:", mae)

med_ae = np.median(np.abs(Y_pred - Y_test))
print("Median Absolute Error:", med_ae)

coefficient_of_dermination = r2_score(Y_test, Y_pred)
print("R-squared: ",coefficient_of_dermination)

model.save(f'IM_model')