# Neural network for classification of contaminants with MAT

## 1. Formulate/outline the problem: classification

Simple neural network for classification of the contaminants using MAT transcriptomes


In [5]:
import seaborn as sns
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import sklearn
from tensorflow import keras

In [None]:
file_name = "gene_counts_NN_training.csv"

In [None]:
data = pd.read_csv(file_name)

## 2. Identify inputs and outputs

In [None]:
data.head()

## 3. Prepare data

In [None]:
data_features = data.drop(columns=["sample"])
target = data["sample"]

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    data_features, target, test_size=0.2, random_state=0, shuffle=True, stratify=target
)

In [None]:
def to_normalize_DESeq2_style(data):
    # Ensure all values are non-negative
    data = data.applymap(lambda x: max(x, 0))

    # Take the log
    log_data = np.log1p(data)

    # Calculate the pseudo-reference sample for each gene
    log_data["pseudo_reference"] = log_data.mean(axis=1)

    # Filter out genes with -Inf as their average
    filtered_log_data = log_data[log_data["pseudo_reference"] != float("-inf")]

    # Subtract the gene pseudo-references from log counts
    ratio_data = filtered_log_data.iloc[:, :-1].sub(
        filtered_log_data["pseudo_reference"], axis=0
    )

    # Find the median of the ratios for each sample
    sample_medians = ratio_data.median(axis=0)

    # Convert medians to scaling factors
    scaling_factors = np.exp(sample_medians)

    # Divide the original counts by the scaling factors
    manually_normalized = data.div(scaling_factors)

    return manually_normalized


X_train = to_normalize_DESeq2_style(X_train)

X_test = to_normalize_DESeq2_style(X_test)

In [None]:
from sklearn.preprocessing import LabelEncoder

# Initialize the LabelEncoder
label_encoder = LabelEncoder()

# Fit the encoder on the class names and transform them into integers
encoded_labels = label_encoder.fit_transform(y_train)

In [None]:
from sklearn.feature_selection import chi2, SelectKBest

# Define feature selection
fs = SelectKBest(score_func=chi2, k=300)

# Apply feature selection
X_train = fs.fit_transform(X_train, encoded_labels)

In [None]:
from sklearn.decomposition import PCA

n_components = 2
pca = PCA(n_components=n_components)
X_train_pca = pca.fit_transform(X_train)

# Scatter plot
plt.scatter(X_train_pca[:, 0], X_train_pca[:, 1], c=encoded_labels, cmap="viridis")
plt.xlabel("Principal Component 1")
plt.ylabel("Principal Component 2")
plt.title("PCA Visualization of Selected Features")
plt.colorbar(label="Class Label")
plt.show()

In [None]:
# Transform the test feature matrix
X_test = fs.transform(X_test)

In [None]:
y_train = pd.get_dummies(y_train, dtype=int)
y_test = pd.get_dummies(y_test, dtype=int)

In [None]:
X_train.shape, X_test.shape

In [None]:
y_train.shape, y_test.shape

## 4. Build an architecture from scratch or choose a pretrained model

In [None]:
keras.backend.clear_session()
keras.utils.set_random_seed(2)

In [None]:
# # create our model
# def create_nn(learning_rate=0.001):
#     inputs = keras.Input(shape=X_train.shape[1])
#     x = keras.layers.UnitNormalization()(inputs)
#     x = keras.layers.Dropout(0.3)(x)
#     x = keras.layers.Dense(
#         256, activation="relu", kernel_regularizer=keras.regularizers.L2(0.01)
#     )(x)
#     x = keras.layers.BatchNormalization()(x)
#     x = keras.layers.Dropout(0.2)(x)
#     x = keras.layers.Dense(
#         128, activation="relu", kernel_regularizer=keras.regularizers.L2(0.01)
#     )(x)
#     x = keras.layers.Dropout(0.1)(x)
#     outputs = keras.layers.Dense(9, activation="softmax")(x)

#     model = keras.Model(inputs=inputs, outputs=outputs, name="small_NN")

#     optimizer = keras.optimizers.Adam(learning_rate=learning_rate)

#     model.compile(
#         optimizer=optimizer,
#         loss=keras.losses.CategoricalCrossentropy(),
#         metrics=["accuracy", keras.metrics.AUC()],
#     )

#     return model


# model = create_nn()
# model.summary()

In [3]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, UnitNormalization, Activation


def create_nn(
    hidden_units, dropout_rates, num_layers, activation, optimizer, weights_limit
):
    model = Sequential(name="small_NN")
    model.add(UnitNormalization())
    model.add(
        Dense(hidden_units, input_dim=300),
    )
    model.add(Activation(activation))

    for _ in range(num_layers - 1):
        model.add(
            Dense(hidden_units, kernel_regularizer=keras.regularizers.L2(weights_limit))
        )
        model.add(Activation(activation))
        model.add(Dropout(dropout_rates))

    model.add(Dense(9, activation="softmax"))

    model.compile(
        optimizer=optimizer,
        loss="categorical_crossentropy",
        metrics=["accuracy", keras.metrics.AUC(), keras.metrics.Recall()],
    )
    return model

2023-08-29 16:25:10.961137: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-08-29 16:25:11.004303: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [6]:
early_stop = keras.callbacks.EarlyStopping(monitor="loss", patience=3)

In [7]:
def learning_rate_schedule(epoch, lr):
    if epoch < 50:
        return lr
    else:
        return lr * 0.9


lr_scheduler = keras.callbacks.LearningRateScheduler(learning_rate_schedule)

## 5. Choose a loss function and optimizer

In [8]:
param_grid = {
    "hidden_units": [8, 16, 24, 56],
    "dropout_rates": [0.2],
    "num_layers": [2, 3, 4],
    "batch_size": [5, 10, 15],
    "optimizer": ["adam"],
    "activation": ["relu", "tanh", "swish", "gelu"],
    "weights_limit": [0.01, 0.001],
}

In [9]:
# Create a KerasClassifier for grid search
keras_clf = keras.wrappers.scikit_learn.KerasClassifier(
    build_fn=create_nn, epochs=300, verbose=1
)

# Create a GridSearchCV instance
grid_search = sklearn.model_selection.GridSearchCV(
    estimator=keras_clf, param_grid=param_grid, cv=3, scoring="f1_micro", verbose=1
)

# Perform grid search
grid_result = grid_search.fit(
    X_train, encoded_labels, callbacks=[lr_scheduler, early_stop]
)

# Print best parameters and best score
print("Best Parameters: ", grid_result.best_params_)
print("Best Score: ", grid_result.best_score_)

AttributeError: module 'keras.api._v2.keras' has no attribute 'wrappers'

In [None]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, BatchNormalization, Activation
from keras.callbacks import LearningRateScheduler


def create_nn(hidden_units, dropout_rates, num_layers, activation, optimizer):
    model = Sequential()
    model.add(BatchNormalization())
    model.add(Dense(hidden_units, input_dim=300))
    model.add(Activation(activation))
    model.add(Dropout(dropout_rates))

    for _ in range(num_layers - 1):
        model.add(Dense(hidden_units))
        model.add(Activation(activation))
        model.add(Dropout(dropout_rates))

    model.add(Dense(9, activation="softmax"))

    model.compile(
        optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"]
    )
    return model


def learning_rate_schedule(epoch, lr):
    if epoch < 50:
        return lr
    else:
        return lr * 0.9


lr_scheduler = LearningRateScheduler(learning_rate_schedule)

# Create a KerasClassifier for grid search
keras_clf = keras.wrappers.scikit_learn.KerasClassifier(
    build_fn=create_nn, epochs=300, verbose=0
)

# Create a GridSearchCV instance
grid_search = sklearn.model_selection.GridSearchCV(
    estimator=keras_clf, param_grid=param_grid, cv=3, scoring="accuracy", verbose=0
)

# Perform grid search
grid_result = grid_search.fit(X_train, y_train, callbacks=[lr_scheduler])

# Print best parameters and best score
print("Best Parameters: ", grid_result.best_params_)
print("Best Score: ", grid_result.best_score_)

## 6. Train model

In [None]:
history = model.fit(
    X_train, y_train, epochs=300, callbacks=[callback], validation_data=(X_test, y_test)
)

## 8. Measure performance

In [None]:
def plot_history(history, metrics):
    history_df = pd.DataFrame.from_dict(history.history)
    sns.lineplot(data=history_df[metrics])
    plt.xlabel("epochs")
    plt.ylabel("metric")

In [None]:
history_df = pd.DataFrame.from_dict(history.history)
history_df.columns

In [None]:
plot_history(history, ["accuracy", "val_accuracy"])

In [None]:
plot_history(history, ["loss", "val_loss"])

In [None]:
plot_history(history, ["auc", "val_auc"])

The AUC (Area under the curve) of the ROC (Receiver operating characteristic; default) or PR (Precision Recall) curves are quality measures of binary classifiers. 