In [1]:
import numpy as np
import pandas as pd
import os
from pathlib import Path
# from itertools import product
# import matplotlib.pyplot as plt

results_path = "./KNN_Pipeline_results"
os.makedirs(results_path, exist_ok=True)

**Data**

In [2]:
# Read CSV files into pandas DataFrames
df_train  = pd.read_csv("../random_split/train.csv", index_col=0)
df_dev  = pd.read_csv("../random_split/dev.csv", index_col=0)
df_test  = pd.read_csv("../random_split/test.csv", index_col=0)

# Factorize the 'family_accession' column and create a new 'label_numeric' column with numeric labels
df_train['label_numeric'] = pd.factorize(df_train['family_accession'], sort=True)[0]
df_dev['label_numeric'] = pd.factorize(df_dev['family_accession'], sort=True)[0]
df_test['label_numeric'] = pd.factorize(df_test['family_accession'], sort=True)[0]

# Convert label columns to integer lists
y_train = df_train['label_numeric'].astype(int).tolist()
y_dev = df_dev['label_numeric'].astype(int).tolist()
y_test = df_test['label_numeric'].astype(int).tolist()

# Count the number of different classes
num_of_classes = len(list(set(y_train))) # counts the number different classes

# Extract sequence as string lists
X_train = df_train['sequence'].astype(str).tolist()
X_dev = df_dev['sequence'].astype(str).tolist()
X_test = df_test['sequence'].astype(str).tolist()

**Extracting a subset of the training**

The dataset being very heavy, we will try to subset the training set to build a model on incrementally larger portion of the training set to see how much of the data is required to achieve optimal performances.

In [3]:
from sklearn.model_selection import train_test_split
from collections import Counter

# % of the training set we use
subset_size = 5e-2

# Find the classes represented only once
class_counts = Counter(y_train)
classes_to_remove = [cls for cls, count in class_counts.items() if count == 1]
print(f'Classes removed (N={len(classes_to_remove)}) : {classes_to_remove}')

# Filter out instances with classes represented only once
## In training set
train_mask = ~np.isin(np.array(y_train), classes_to_remove)
X_train_filtered = [X_train[i] for i in range(len(X_train)) if train_mask[i]]
y_train_filtered = [y_train[i] for i in range(len(y_train)) if train_mask[i]]
print(f'X_train_filtered length : {len(X_train_filtered)}')
print(f'y_test_filtered length : {len(y_train_filtered)}')
## In dev set
dev_mask = ~np.isin(np.array(y_dev), classes_to_remove)
X_dev_filtered = [X_dev[i] for i in range(len(X_dev)) if dev_mask[i]]
y_dev_filtered = [y_dev[i] for i in range(len(y_dev)) if dev_mask[i]]
print(f'X_dev_filtered length : {len(X_dev_filtered)}')
print(f'y_test_filtered length : {len(y_dev_filtered)}')
## In test set
test_mask = ~np.isin(np.array(y_test), classes_to_remove)
X_test_filtered = [X_test[i] for i in range(len(X_test)) if test_mask[i]]
y_test_filtered = [y_test[i] for i in range(len(y_test)) if test_mask[i]]
print(f'X_test_filtered length : {len(X_test_filtered)}')
print(f'y_test_filtered length : {len(y_test_filtered)}')

# Subsetting the training set
_, X_train_subset, _, y_train_subset = train_test_split(
    X_train_filtered, y_train_filtered, test_size=subset_size, stratify=y_train_filtered, random_state=42
)
print(f'X_train_subset.len : {len(X_train_subset)}')
print(f'y_train_subset.len : {len(y_train_subset)}')

Classes removed (N=515) : [10920, 15732, 2691, 15451, 4913, 16756, 15919, 5011, 6054, 10261, 16125, 4818, 3422, 16651, 8423, 14984, 6817, 15878, 16626, 10873, 15308, 17161, 16652, 16688, 11391, 16645, 16737, 10088, 17357, 10601, 16665, 16684, 16692, 7656, 16625, 11392, 4415, 16697, 1461, 17413, 5437, 2597, 15771, 10710, 6049, 5415, 16611, 8621, 16649, 14727, 15332, 10783, 15501, 7449, 8772, 12102, 5324, 16613, 11559, 5164, 12128, 8501, 8715, 7817, 6139, 5020, 16535, 16658, 7467, 9709, 16765, 16655, 8896, 5137, 10637, 12192, 7236, 10345, 6868, 16702, 11555, 2141, 11298, 3172, 17777, 17042, 10823, 6654, 3131, 8597, 10082, 10320, 12297, 6196, 13437, 16638, 10821, 15053, 6752, 10452, 9708, 12311, 2831, 15026, 1371, 12373, 11601, 10824, 2943, 8927, 811, 2330, 6083, 16762, 3346, 11209, 3614, 10696, 4968, 16639, 8064, 16757, 3846, 4505, 7564, 16653, 1452, 8440, 17412, 6085, 16671, 1633, 861, 16654, 15774, 17055, 16764, 5162, 16596, 16773, 2223, 17578, 5857, 16936, 1904, 15165, 15185, 17331, 4

Subsetting dev and test (for quicker results)

In [4]:
# Find the classes represented only once in dev
class_counts = Counter(y_dev_filtered)
classes_to_remove = [cls for cls, count in class_counts.items() if count == 1]
mask = ~np.isin(np.array(y_dev_filtered), classes_to_remove)
X_dev_filtered = [X_dev_filtered[i] for i in range(len(X_dev_filtered)) if mask[i]]
y_dev_filtered = [y_dev_filtered[i] for i in range(len(y_dev_filtered)) if mask[i]]

# Subsetting the dev set
_, X_dev_subset, _, y_dev_subset = train_test_split(
    X_dev_filtered, y_dev_filtered, test_size=0.1, stratify=y_dev_filtered, random_state=42
)
print(f'X_dev_subset.len : {len(X_dev_subset)}')
print(f'y_dev_subset.len : {len(y_dev_subset)}')

# Find the classes represented only once in test
class_counts = Counter(y_test_filtered)
classes_to_remove = [cls for cls, count in class_counts.items() if count == 1]
mask = ~np.isin(np.array(y_test_filtered), classes_to_remove)
X_test_filtered = [X_test_filtered[i] for i in range(len(X_test_filtered)) if mask[i]]
y_test_filtered = [y_test_filtered[i] for i in range(len(y_test_filtered)) if mask[i]]

# Subsetting the dev set
_, X_test_subset, _, y_test_subset = train_test_split(
    X_test_filtered, y_test_filtered, test_size=0.1, stratify=y_test_filtered, random_state=42
)
print(f'X_test_subset.len : {len(X_test_subset)}')
print(f'y_test_subset.len : {len(y_test_subset)}')

X_dev_subset.len : 12026
y_dev_subset.len : 12026
X_test_subset.len : 12027
y_test_subset.len : 12027


**KNN tuning**

Here we create a KNN classifier based on hamming distance.

In [5]:
# Creating folder to save the results
os.makedirs(Path(results_path, f'knn_{subset_size}_metric_boxplots'), exist_ok=True)

Calculating the distances on train and validation datasets.

In [6]:
import numpy as np
from tqdm import tqdm

from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, classification_report

# Function to calculate Hamming distance between two sequences
def hamming_distance(seq1, seq2):
    return sum(c1 != c2 for c1, c2 in zip(seq1, seq2))

# Function to calculate distances between training and testing sequences
def calculate_distances(X_train, X_test):
    n_train = len(X_train)
    n_test = len(X_test)
    distances = np.zeros((n_test, n_train))

    with tqdm(total=n_test, desc="Calculating Distances", unit="sequence") as pbar:
        for i in range(n_test):
            for j in range(n_train):
                distances[i, j] = hamming_distance(X_test[i], X_train[j])
            pbar.update(1)
    
    return distances

# Calculate distances for training set
if Path(results_path, f'knn_{subset_size}_metric_boxplots', 'train_distances.csv').exists():
    print("Importing train distances...")
    distances_train = pd.read_csv(Path(results_path, f'knn_{subset_size}_metric_boxplots', 'train_distances.csv'), index_col=0).values
else:
    print("Calculating train distances...")
    distances_train = calculate_distances(X_train_subset, X_train_subset)
    pd.DataFrame(distances_train).to_csv(Path(results_path, f'knn_{subset_size}_metric_boxplots', 'train_distances.csv'))

# Calculate distances for testing set
if Path(results_path, f'knn_{subset_size}_metric_boxplots', 'dev_distances.csv').exists():
    print("Importing dev distances...")
    distances_dev_train = pd.read_csv(Path(results_path, f'knn_{subset_size}_metric_boxplots', 'dev_distances.csv'), index_col=0).values
else:
    print("Calculating dev-train distances...")
    distances_dev_train = calculate_distances(X_train_subset, X_dev_subset)
    pd.DataFrame(distances_dev_train).to_csv(Path(results_path, f'knn_{subset_size}_metric_boxplots', 'dev_distances.csv'))

Importing train distances...


MemoryError: 

In [None]:
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import make_scorer, accuracy_score

# Grid for k grid search
param_grid = {'n_neighbors': np.arange(5,100, 5)}

# Initiliazing a KNN classifier
knn_classifier = KNeighborsClassifier(metric='precomputed')

# Create the GridSearchCV object
grid_search = GridSearchCV(knn_classifier, param_grid, scoring=make_scorer(accuracy_score), cv=5)
grid_search.fit(distances_train, y_train_subset)

# Get the best parameters and the corresponding accuracy
best_params = grid_search.best_params_
best_accuracy = grid_search.best_score_

print(f"Best Parameters: {best_params}")
print(f"Best Accuracy: {best_accuracy}")

# Get the results of the grid search
results = grid_search.cv_results_

# Extract the values of k and corresponding mean test scores
k_values = results['param_n_neighbors'].data.astype(int)
mean_test_scores = results['mean_test_score']

# Plot the results of the grid search
plt.figure(figsize=(10, 6))
plt.plot(k_values, mean_test_scores, marker='o')
plt.title('Cross-validated Accuracy vs. Number of Neighbors (k)')
plt.xlabel('Number of Neighbors (k)')
plt.ylabel('Mean Test Accuracy')
plt.grid(True)
plt.savefig(Path(results_path, f'knn_{subset_size}_metric_boxplots', 'crossvalidated_accuracy_plot.png'))
plt.show()

Building KNN classifier for optimal value of k

In [None]:
# Initialize and train KNN classifier
print("Building KNN classifier...")
knn_classifier = KNeighborsClassifier(n_neighbors=best_params['n_neighbors'], metric='precomputed')
knn_classifier.fit(distances_train, y_train_subset)

# Make predictions
print("Predicting dev labels...")
y_pred = knn_classifier.predict(distances_dev_train)

Building KNN classifier...
Predicting dev labels...


Evaluating final classifier and saving results

In [None]:
import pandas as pd
import seaborn as sns

# Evaluate the classifier
print("Calculating dev predictions...")
accuracy = accuracy_score(y_dev_subset, y_pred)
report = classification_report(y_dev_subset, y_pred)
report_dict = classification_report(y_dev_subset, y_pred, output_dict=True)

print(f"Accuracy: {accuracy}")
print("Classification Report:")
print(report)

# Save the DataFrame to a CSV file
report_df = pd.DataFrame(data=report_dict)
report_df.to_csv(Path(results_path, f'classification_report_{subset_size}.csv'), index=False)

# Extract metrics from the DataFrame
report_df = report_df.iloc[:-1,:-3].T
metrics = report_df.columns

# Create boxplots for each metric and save as PNG files
for metric in metrics:
    plt.figure(figsize=(8, 6))
    sns.boxplot(x=metric, data=report_df, color='skyblue')
    plt.title(f'Boxplot for {metric}')
    plt.savefig(Path(results_path, f'knn_{subset_size}_metric_boxplots', f'boxplot_{metric}.png'))  # Save the boxplot as a PNG file
    plt.close()

Calculating dev predictions...
Accuracy: 8.315316813570597e-05
Classification Report:
              precision    recall  f1-score   support

           0       0.00      0.00      0.00         1
           2       0.00      0.00      0.00         8
           3       0.00      0.00      0.00         2
           4       0.00      0.00      0.00         1
           5       0.00      0.00      0.00         5
           7       0.00      0.00      0.00         1
           8       0.00      0.00      0.00         1
           9       0.00      0.00      0.00         1
          12       0.00      0.00      0.00         9
          13       0.00      0.00      0.00         1
          15       0.00      0.00      0.00         1
          16       0.00      0.00      0.00         1
          17       0.00      0.00      0.00         4
          18       0.00      0.00      0.00         6
          19       0.00      0.00      0.00         0
          21       0.00      0.00      0.00      

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
  if pd.api.types.is_categorical_dtype(vector):
