## 코드 해설

In [2]:
import pandas as pd
import numpy as np
import os
import random
from rdkit import Chem
from rdkit.Chem import AllChem

import torch
import torch.nn.functional as F
from torch_geometric.data import Data, DataLoader
from torch_geometric.nn import GCNConv, global_mean_pool

In [3]:
# 시드 고정
seed = 5
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

### 전처리1. IC50값을 pIC50값으로 변환
> 로그(log)를 취해 pIC50으로 변환하면, 데이터의 분포가 정규분포와 유사하게 되어 모델이 훨씬 안정적으로 학습할 수 있습니다.

In [None]:
### IC50 to pIC50 변환 함수
def IC50_to_pIC50(ic50_nM):
    ic50_nM = np.clip(ic50_nM, 1e-10, None) #IC50 값이 0이거나 음수이면 로그 계산 시 오류가 발생하므로, 최소값을 아주 작은 양수(1e-10)로 강제하여 에러를 방지하는 안정장치입니다.
    return 9 - np.log10(ic50_nM) # IC50 값(nM 단위)을 pIC50으로 변환하는 수학 공식입니다.
                                 # pIC50 = -log10(IC50_Molar)

In [None]:
### 데이터 로드 및 전처리
chembl = pd.read_csv("/Users/skku_aws24/Desktop/AIdrug/open/ChEMBL_ASK1(IC50).csv", sep=';')
pubchem = pd.read_csv("/Users/skku_aws24/Desktop/AIdrug/open/Pubchem_ASK1.csv")

### 전처리2. chembl 처리

In [None]:
chembl.columns = chembl.columns.str.strip().str.replace('\"', '') # 모든 컬럼 이름에서 불필요한 공백(strip())과 따옴표(replace('\"', ''))를 제거합니다. 
chembl = chembl[chembl['Standard Type'] == 'IC50'] # 'Standard Type' 컬럼이 'IC50'인 데이터만 남기고 나머지는 버리는 필터링 과정입니다.

In [None]:
chembl = chembl[['Smiles', 'Standard Value'] # 수많은 컬럼 중에서 우리가 실제로 필요한 **분자 구조(Smiles)**와 활성값(Standard Value) 두 개의 컬럼만 선택합니다.
    ].rename(columns={'Smiles': 'smiles', 'Standard Value': 'ic50_nM'} # 컬럼 이름을 소문자인 smiles와 ic50_nM으로 통일합니다.
        ).dropna() # Smiles나 Standard Value 중 어느 하나라도 값이 비어있는(NA) 행은 분석에 사용할 수 없으므로 제거합니다.

chembl['ic50_nM'] = pd.to_numeric(chembl['ic50_nM'], errors='coerce') # IC50 값이 담긴 ic50_nM 컬럼의 데이터 타입을 문자열(text)에서 숫자(numeric)로 변환합니다.
                                                                      # 만약 숫자로 변환할 수 없는 값이 있다면, 해당 값을 강제로 결측치(NaN)로 만듭니다. 

### 전처리3. pubchem 처리

In [None]:
pubchem = pubchem[
    ['SMILES', 'Activity_Value']
    ].rename(
        columns={'SMILES': 'smiles', 'Activity_Value': 'ic50_nM'}
        ).dropna()
        
pubchem['ic50_nM'] = pd.to_numeric(pubchem['ic50_nM'], errors='coerce')

### 전처리4. chembl, pubchem 데이터 통합하여 'total' dataframe 생성

In [None]:
total = pd.concat([chembl, pubchem], ignore_index=True) # 정제된 chembl과 pubchem 데이터프레임을 위아래로 이어 붙여...
total = total.drop_duplicates(subset='smiles') # smiles 컬럼을 기준으로 중복된 행을 제거합니다.

* '1 SMILES = 1 IC50' 원칙을 가장 단순한 방식으로 구현한 것
* 단점
    * 이 코드는 chembl과 pubchem을 합친 데이터에서 smiles가 중복될 경우, 단순히 먼저 나타난 행 하나만 남기고 나머지는 모두 버립니다.
    * 즉, chembl에 있던 데이터가 남고 pubchem에 있던 중복 데이터는 버려지게 됩니다.
    * pubchem에 있는 데이터가 더 정확하거나 의미 있는 값일 수 있는데도, 순서가 뒤에 있다는 이유만으로 정보가 손실됩니다.

In [None]:
total['pIC50'] = IC50_to_pIC50(total['ic50_nM']) # 중복이 제거된 total 데이터의 ic50_nM 컬럼에 우리가 맨 처음 살펴봤던 IC50_to_pIC50 함수를 적용하여, 모델이 최종적으로 예측할 목표인 pIC50 컬럼을 생성
total = total[total['ic50_nM'] > 0 # IC50 값이 0 이하인, 의미 없는 데이터를 제거
    ].dropna(subset=['smiles', 'pIC50']) # 혹시라도 smiles나 pIC50 값에 결측치가 있다면 최종적으로 제거하여 모델 학습에 완벽하게 준비된 데이터만 남깁니다.

### 전처리5. 유효한 SMILES만 필터링 

In [None]:
# 유효한 SMILES만 필터링
total['mol'] = total['smiles'].apply(Chem.MolFromSmiles) # total 데이터프레임의 smiles 컬럼에 있는 모든 SMILES 문자열을 RDKit의 **'분자 객체(Molecule Object)'**로 변환하여, mol이라는 새로운 컬럼에 저장
total = total.dropna(subset=['mol'] # mol 컬럼을 기준으로 결측치(None은 결측치로 취급됨)가 있는 행을 삭제합니다. 결과적으로 유효한 분자 객체를 가진 행들만 살아남게 됩니다.
    ).reset_index(drop=True) # 일부 행이 삭제되면서 중간중간 이가 빠진 것처럼 변한 인덱스(행 번호)를 0부터 다시 순서대로 깔끔하게 정리해주는 역할을 합니다.

### 함수 정의
* 이 함수는 분자 정보를 바탕으로 노드 특징, 엣지 정보 등을 계산하여 PyTorch Geometric의 Data 객체 하나를 만들어 graph 변수에 저장합니다.

In [None]:
### RDKit Mol -> PyG Graph 변환 함수
def mol_to_graph_data_obj(mol, y=None):
    # 원자 수
    num_atoms = mol.GetNumAtoms()

    # 노드 특성: 원자 번호 (atomic number)
    # 각 원자(atom)를 그래프의 노드(node)로 정의
    atom_features = []
    for atom in mol.GetAtoms(): # 이 코드에서는 각 원자의 원자 번호(Atomic Number) 단 하나만을 특징으로 사용
        atom_features.append([atom.GetAtomicNum()])
    x = torch.tensor(atom_features, dtype=torch.float) # x는 **노드 특징 행렬(Node Feature Matrix)**을 의미합니다.

    # 엣지 (결합) 정보 추출
    # 분자 내 원자들을 연결하는 화학 결합(bond)을 그래프의 엣지(edge)로 정의
    edge_index = [] # 어떤 노드들이 서로 연결되어 있는지를 나타내는 연결 리스트
    edge_attr = [] # **엣지 특징 행렬(Edge Feature Matrix)**을 의미
    for bond in mol.GetBonds():
        i = bond.GetBeginAtomIdx()
        j = bond.GetEndAtomIdx()

        # 무방향 그래프이므로 양쪽 다 추가
        # 원자 i와 j 사이의 결합이 양방향임을 명시
        edge_index.append([i, j])
        edge_index.append([j, i])

        # 결합 타입 원-핫 인코딩 (예: 단일, 이중, 삼중, 방향족)
        # 예를 들어, 이중 결합은 [0, 1, 0, 0] 이라는 벡터로 변환됩니다.
        bond_type = bond.GetBondType()
        bond_feat = [
            int(bond_type == Chem.rdchem.BondType.SINGLE),
            int(bond_type == Chem.rdchem.BondType.DOUBLE),
            int(bond_type == Chem.rdchem.BondType.TRIPLE),
            int(bond_type == Chem.rdchem.BondType.AROMATIC),
        ]
        edge_attr.append(bond_feat) 
        edge_attr.append(bond_feat)

    edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()  # shape [2, num_edges]
    edge_attr = torch.tensor(edge_attr, dtype=torch.float)

    # 지금까지 만든 x(노드 특징), edge_index(연결 정보), edge_attr(엣지 특징), 그리고 y(예측 목표인 pIC50 값)를
    # PyTorch Geometric의 Data라는 표준 객체에 담아 하나로 포장하는 과정
    if y is not None:
        y = torch.tensor([y], dtype=torch.float)

    data = Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=y)

    return data

### 전처리된 데이터셋을 GNN 모델에 넣을 수 있는 형태로 변환

In [None]:
### 전체 데이터셋 PyG 형식으로 변환
graph_list = []
for idx, row in total.iterrows():
    graph = mol_to_graph_data_obj(row['mol'], row['pIC50']) # 현재 처리 중인 데이터프레임의 한 줄(row)에서 **RDKit 분자 객체(row['mol'])**와 **정답 값(row['pIC50'])**을 꺼내,
                                                            # 우리가 바로 이전에 했던 함수에 입력으로 넣어줍니다.
    graph_list.append(graph)

* graph_list는 PyTorch Geometric의 Data 객체들로 가득 찬 리스트가 됩니다.
* 각 Data 객체는 훈련 데이터셋에 있던 분자 하나하나를 의미합니다.

### 데이터셋을 training set, validation set으로 나누기

In [None]:
### 데이터셋 분할 (간단하게 랜덤으로)
num_total = len(graph_list)
num_train = int(num_total * 0.8)
num_valid = num_total - num_train

In [None]:
random.shuffle(graph_list)
train_dataset = graph_list[:num_train]
valid_dataset = graph_list[num_train:]

In [None]:
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False)

### GCN 모델

In [None]:
### GNN 모델 정의
class GCN(torch.nn.Module):
    def __init__(self, hidden_channels=64):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(1, hidden_channels)   # input feature dim = 1 (atomic number) # 첫 번째 그래프 합성곱(Graph Convolution) 레이어
        self.conv2 = GCNConv(hidden_channels, hidden_channels) # 두 번째 그래프 합성곱 레이어
        self.lin = torch.nn.Linear(hidden_channels, 1)  # output regression # 최종 예측을 위한 일반적인 선형(Linear) 레이어

    def forward(self, data):
        x, edge_index = data.x, data.edge_index

        # 1. 첫 번째 GCN 레이어 통과
        x = self.conv1(x, edge_index) 
        x = F.relu(x)

        # 2. 두 번째 GCN 레이어 통과
        x = self.conv2(x, edge_index)
        x = F.relu(x)

        # 3. 그래프 풀링 (Graph Pooling)
        x = global_mean_pool(x, data.batch)  # graph-level representation

        # 4. 최종 선형 레이어 통과
        x = self.lin(x)

        return x.squeeze()

In [None]:
### 모델, 옵티마이저, 손실함수
model = GCN().to(device) # 실제 모델 객체를 생성. GPU가 사용 가능하다면 GPU로, 그렇지 않다면 CPU로
optimizer = torch.optim.Adam( # Adam은 현재 가장 널리 쓰이는 효과적인 옵티마이저 중 하나입니다.
    model.parameters(), # 옵티마이저에게 "이 모델(model)이 가진 모든 학습 가능한 파라미터들을 네가 책임지고 업데이트해줘" 라고 알려주는 것입니다.
    lr=0.001 # **학습률(Learning Rate)**을 의미합니다.
    ) 
criterion = torch.nn.MSELoss() # 손실 함수 정의

In [None]:
### 학습 함수
def train():
    model.train() # 1. 학습 모드 설정
    total_loss = 0
    for data in train_loader: # 2. 데이터 공급
        data = data.to(device)
        optimizer.zero_grad() # 3. 기울기 초기화
        output = model(data) # 4. 예측
        loss = criterion(output, data.y.view(-1)) # 5. 손실 계산
        loss.backward() # 6. 역전파 (기울기 계산)
        optimizer.step() # 7. 파라미터 업데이트
        total_loss += loss.item() * data.num_graphs
    return total_loss / len(train_loader.dataset) # 에포크가 끝난 후, 누적된 총 손실을 전체 훈련 데이터셋의 크기로 나누어 훈련 데이터셋 전체에 대한 평균 손실을 계산하고 반환

In [None]:
### 검증 함수
def evaluate(loader):
    model.eval() # 1. 평가 모드 설정
    total_loss = 0
    with torch.no_grad(): # 2. 기울기 계산 비활성화
        for data in loader: # 3. 데이터 공급 (검증용)
            data = data.to(device)
            output = model(data) # 4. 예측
            loss = criterion(output, data.y.view(-1)) # 5. 손실 계산
            total_loss += loss.item() * data.num_graphs
    return total_loss / len(loader.dataset)

### 학습 시작

In [None]:
### 학습 루프
epochs = 100
for epoch in range(1, epochs+1):
    train_loss = train()
    valid_loss = evaluate(valid_loader)
    print(f'Epoch {epoch:03d}, Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}')


### 테스트 데이터에 대한 추론

In [None]:
### 테스트 데이터 예측 (테스트 데이터도 graph로 변환 필요)
test_df = pd.read_csv("/Users/skku_aws24/Desktop/AIdrug/open/test.csv")
test_df['mol'] = test_df['Smiles'].apply(Chem.MolFromSmiles) # 훈련데이터와 동일하게 변환
test_df = test_df.dropna(subset=['mol']).reset_index(drop=True)
test_graphs = [mol_to_graph_data_obj(mol) for mol in test_df['mol']]
test_loader = DataLoader(test_graphs, batch_size=64, shuffle=False)

In [None]:
model.eval() # 모델을 **'평가 모드'**로 설정
predictions = []
with torch.no_grad(): # 불필요한 기울기 계산을 비활성화
    for data in test_loader: # 예측 루프 (Prediction Loop)
        data = data.to(device) # 데이터를 GPU로 보냅니다.
        output = model(data) # 학습이 완료된 모델에 테스트 데이터를 입력하여 pIC50 예측값을 얻습니다.
        predictions.extend(output.cpu().numpy()) # 변환된 Numpy 배열의 값들을 predictions 리스트에 하나씩 추가합니다.

### 최종 결과 생산

In [None]:
# 모델이 예측한 pIC50 값을 대회에서 요구하는 최종 형태인 IC50 값으로 되돌리는 과정
def pIC50_to_IC50_pred(pIC50):
    return 10 ** (9 - pIC50)

In [None]:
test_df['pIC50_pred'] = predictions # predictions 리스트에 담아두었던 pIC50 값들을 test_df 데이터프레임에 pIC50_pred 라는 새로운 컬럼으로 추가합니다. 
test_df['ASK1_IC50_nM'] = test_df['pIC50_pred'].apply(pIC50_to_IC50_pred) # 방금 만든 pIC50_pred 컬럼의 각 값에 pIC50_to_IC50_pred 함수를 적용(apply)하여, 최종 예측값인 IC50 값을 계산하고, 이를 ASK1_IC50_nM 이라는 새로운 컬럼에 저장합니다. 

In [None]:
submission = pd.read_csv("/Users/skku_aws24/Desktop/AIdrug/open/sample_submission.csv")
submission['ASK1_IC50_nM'] = test_df['ASK1_IC50_nM']
submission.to_csv("/Users/skku_aws24/Desktop/AIdrug/output/gnn_pytorch.csv", index=False)

print("Submission file created successfully with PyTorch Geometric GNN model!")