# Reproducibililty Test

## Import Libraries

In [1]:
import os
import pandas as pd
import numpy as np
import random
import matplotlib.pyplot as plt
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.metrics import mean_squared_error, r2_score
from typing import List
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import KBinsDiscretizer, StandardScaler
from typing import Tuple, Optional

In [2]:
torch.__version__

'2.3.0'

## Set Seed

In [3]:
CFG = {
    'SEED': 316,
}

In [4]:
# seed 고정
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    
seed_everything(CFG["SEED"]) # Seed 고정
os.environ["CUBLAS_WORKSPACE_CONFIG"]=":16:8"
torch.use_deterministic_algorithms(True)
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = 'cpu'

## Set Hyperparameters Used

In [5]:
# retrieved from MLflow
learning_rate = 0.00026100256506134736
is_fps2048_desc170 = True
num_layers = 3
layer_ratio = 0.5300733288106988
batch_size = 128
is_infomax300 = True
dropout_rate = 0.5

## Load Data

In [6]:
class DataHandler:
    
    def __init__(self, feature_version, path_data):
        self.feature_version = feature_version
        self.path_data = path_data
        self.fps2048_desc170 = None
        self.infomax300 = None
        self.bindaff = None
        self.y_train = None
        self.load_all_data()
        self.scaler = None

    def load_all_data(self):
        self.fps2048_desc170 = np.load(os.path.join(self.path_data, f"X_train.{feature_version}.npy"))
        self.infomax300 = np.load(os.path.join(self.path_data, "infomax300.train.npy"))
        self.load_y()

    def load_y(self):
        y = np.load(os.path.join(self.path_data, f"y_train.{feature_version}.npy"))
        self.y_train = np.log(self.pIC50_to_IC50(y))

    @staticmethod
    def pIC50_to_IC50(pic50_values):
        return 10 ** (9 - pic50_values)

    def load_data(self, is_infomax300: bool, is_fps2048_desc170: bool) -> Tuple[np.ndarray, np.ndarray]:
        if is_infomax300 and is_fps2048_desc170:
            X = np.hstack((self.fps2048_desc170, self.infomax300))
        elif is_infomax300:
            X = self.infomax300
        elif is_fps2048_desc170:
            X = self.fps2048_desc170
        else:
            raise ValueError("At least one of is_infomax300 or is_fps2048_desc170 must be True")
        
        return X, self.y_train

    def load_test_data(self, is_infomax300: bool, is_fps2048_desc170: bool) -> np.ndarray:
        fps2048_desc170_test = np.load(os.path.join(self.path_data, f"X_test.{feature_version}.npy"))
        infomax300_test = np.load(os.path.join(self.path_data, "infomax300.test.npy"))

        if is_infomax300 and is_fps2048_desc170:
            X_test = np.hstack((fps2048_desc170_test, infomax300_test))
        elif is_infomax300:
            X_test = infomax300_test
        elif is_fps2048_desc170:
            X_test = fps2048_desc170_test
        else:
            raise ValueError("At least one of is_infomax300 or is_fps2048_desc170 must be True")

        return X_test
    
    def fit_scaler(self, X: np.ndarray):
        self.scaler = StandardScaler().fit(X)

    def preprocess_data(self, X: np.ndarray, y: Optional[np.ndarray] = None) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
        if y is not None:
            return torch.FloatTensor(self.scaler.transform(X)), torch.FloatTensor(y)
        else:
            return torch.FloatTensor(self.scaler.transform(X)), None

In [7]:
feature_version = "feat_v0"
path_data = "./data"

data_handler = DataHandler(feature_version, path_data)

X, y = data_handler.load_data(is_infomax300, is_fps2048_desc170)

In [8]:
X.shape, y.shape

((1952, 2518), (1952,))

## Load Model

In [9]:
# Model definition
class MLP(nn.Module):
    def __init__(self, input_dim: int, hidden_dims: List[int], dropout_rate: float):
        torch.manual_seed(42)
        super(MLP, self).__init__()
        layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(dropout_rate)
            ])
            prev_dim = hidden_dim
        layers.append(nn.Linear(prev_dim, 1))
        self.model = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.model(x)

In [10]:
input_dim = X.shape[1]
hidden_dims = [int(input_dim * (1 - (1 - layer_ratio) * i / num_layers)) 
            for i in range(1, num_layers)]

model = MLP(input_dim = input_dim, hidden_dims=hidden_dims, dropout_rate=dropout_rate)

model.load_state_dict(torch.load(
    "./submitted/submisstion_model.pth",
    map_location = device,
    weights_only=False,
))

model.eval()

MLP(
  (model): Sequential(
    (0): Linear(in_features=2518, out_features=2123, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.5, inplace=False)
    (3): Linear(in_features=2123, out_features=1729, bias=True)
    (4): ReLU()
    (5): Dropout(p=0.5, inplace=False)
    (6): Linear(in_features=1729, out_features=1, bias=True)
  )
)

## Predict

In [11]:
n_fold = 5
fold_idx = 3 # trial 7 fold 4

kfold = StratifiedKFold(n_splits=n_fold, shuffle=True, random_state=42)
y_binned = KBinsDiscretizer(n_bins=n_fold, encode='ordinal', strategy='quantile').fit_transform(y.reshape(-1, 1))

train_idx, val_idx = list(kfold.split(X, y_binned))[fold_idx]

X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]

In [12]:
# train data로 부터 fit한 scaler를 사용하여 x_test normalize
data_handler.fit_scaler(X_train)
X_test = data_handler.load_test_data(is_infomax300, is_fps2048_desc170)

X_test_scaled, _ = data_handler.preprocess_data(X_test)
y_pred = model(X_test_scaled).detach().cpu().numpy().reshape(-1,)

df_submit = pd.read_csv("./sample_submission.csv")
df_submit['IC50_nM'] = np.exp(y_pred)

In [13]:
df_submit['IC50_nM'] 

0      207.308990
1       17.576488
2       23.412319
3       67.716873
4       45.156281
          ...    
108     11.402204
109      7.063308
110     32.437210
111     41.173954
112     40.979328
Name: IC50_nM, Length: 113, dtype: float32

In [14]:
# 리더보드 제출 파일
final_submission = pd.read_csv('./submitted/final_submisstion.csv')
final_submission

Unnamed: 0,ID,IC50_nM
0,TEST_000,207.308990
1,TEST_001,17.576488
2,TEST_002,23.412320
3,TEST_003,67.716870
4,TEST_004,45.156280
...,...,...
108,TEST_108,11.402204
109,TEST_109,7.063308
110,TEST_110,32.437210
111,TEST_111,41.173954


## Compare

In [15]:
# discrepancy
discrepancy = (df_submit.IC50_nM - final_submission.IC50_nM).sum()
print("Discrepancy =", discrepancy, "~=", discrepancy.round(4))

Discrepancy = -1.7569335964751076e-05 ~= -0.0
