# import

In [None]:
!pip install ase



In [None]:
pip install rdkit-pypi ase



In [None]:
import os, random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from ase.io import read

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, Dataset, DataLoader

from tqdm.auto import tqdm

np.set_printoptions(threshold=np.inf)

In [None]:
def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)

seed_everything(42) # Seed 고정

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
train = read('경로/train.xyz', format='extxyz', index=':') # 전체 데이터 불러오기
test = read('경로/test.xyz', format='extxyz', index=':')
sample = pd.read_csv('경로/sample_submission.csv')

# 데이터 전처리

## 데이터 불러오기 (데이터프레임 생성)

In [None]:
def calculate_distance(atom1_coord, atom2_coord):
    return np.linalg.norm(np.array(atom1_coord)-np.array(atom2_coord))

def calculate_angle(coord1, coord2, coord3):
    ba = np.array(coord1) - np.array(coord2)
    bc = np.array(coord3) - np.array(coord2)

    cosine_angle = np.dot(ba,bc)/(np.linalg.norm(ba)*np.linalg.norm(bc))
    angle = np.arccos(cosine_angle)

    return angle

element_charges = {
    "Si": +4,  # 규소
    "N" : -3, # 질소
    "O": -2,   # 산소
    "In": +3,  # 인듐
    "Ga": +3,  # 갈륨
    "As": -3,  # 비소
    "Se": -2,  # 셀레늄
    "Cd": +2   # 카드뮴
}

In [None]:
def process_mole(mole, is_train=True):
    positions_x, positions_y, positions_z = [], [], []
    distances, masses, charges, angles, forces = [], [], [], [], []

    atoms = len(mole)  # 원자 개수 파악
    position = mole.get_positions()  # 원자 위치 정보
    force = mole.get_forces() if is_train else None  # label 1

    for j in range(len(mole)):  # 각 원자에 대해
        atom = mole[j]

        # 학습 데이터 셋일 경우 force 목적변수 저장
        if is_train:
            forces.append(force[j])

        # 각 원자의 위치
        positions_x.append(position[j][0])
        positions_y.append(position[j][1])
        positions_z.append(position[j][2])


        # 각 원자의 질량
        mass = atom.mass
        masses.append(mass)

        # 각 원자의 전하량
        element_symbol = atom.symbol  # 화학 요소 기호 (원자 식별)
        charge = element_charges.get(element_symbol, None)
        charges.append(charge)

        # 근접한 원자간의 거리
        atom1_coord = position[j]
        if j < len(mole) - 1:
            atom2_coord = position[j+1]
        else:
            atom2_coord = position[0]  # 마지막 원자에 대한 거리는 첫 번째 원자와의 거리로 설정
        distance = calculate_distance(atom1_coord, atom2_coord)
        distances.append(distance)

        # 근접한 원자간의 각도
        if 0 < j < len(mole) - 1:
            angle_atom3_coord=position[j+1]
            angle=calculate_angle(position[j-1],atom1_coord ,angle_atom3_coord)
            angles.append(angle)
        elif j == len(mole) - 1:
            angle_atom3_coord=position[0]
            angle=calculate_angle(position[j-1],atom1_coord ,angle_atom3_coord)
            angles.append(angle)
        else:
             angles.append(np.nan)

    return {
            'position_x': positions_x,
            'position_y': positions_y,
            'position_z': positions_z,
            'distance': distances,
            'angle': angles,
            'mass' : masses,
            'charge' : charges,
            'force': forces if is_train else None,
           }


def load_data(data_set):
    all_data=[]

    print('data load start')
    for i in tqdm(range(len(data_set))):
       mole = data_set[i]
       processed_mol = process_mole(mole,is_train=hasattr(mole,'get_forces'))
       all_data.append(pd.DataFrame(processed_mol))
    print('data load end')

    df=pd.concat(all_data).reset_index(drop=True)
    df['angle'] = df['angle'].fillna(df['angle'].mean())

    return df

In [None]:
energies = []
sequence_train, sequence_test = [], []

In [None]:
for i in tqdm(range(len(train))):
    atoms = len(train[i]) # 원자 개수 파악
    energies.append(train[i].get_total_energy())
    sequence_train.append(atoms)

for i in tqdm(range(len(test))):
    atoms = len(test[i]) # 원자 개수 파악
    sequence_test.append(atoms)

  0%|          | 0/22510 [00:00<?, ?it/s]

  0%|          | 0/4101 [00:00<?, ?it/s]

In [None]:
train_df = load_data(train)
test_df = load_data(test)

data load start


  0%|          | 0/22510 [00:00<?, ?it/s]

  angle = np.arccos(cosine_angle)
  angle = np.arccos(cosine_angle)


data load end
data load start


  0%|          | 0/4101 [00:00<?, ?it/s]

data load end


## 데이터 표준화

-> 연산이 있는 부분이라 그런지 표준화는 꽝

In [None]:
# # Train 데이터와 Test 데이터를 각각 train, test에 저장, force 열을 제외한 모든 feature 선택
# train = train_df.drop(columns=['force'])
# test = test_df.drop(columns=['force'])

# # StandardScaler 객체 생성
# scaler = StandardScaler()

# # Train 데이터를 기반으로 평균과 표준 편차를 계산, Train 및 Test 데이터에 대해 표준화 수행
# train_std = scaler.fit_transform(train)
# test_std = scaler.transform(test)

# # NumPy 배열을 Pandas DataFrame으로 변환
# train_std_df = pd.DataFrame(train_std, columns=train.columns)
# test_std_df = pd.DataFrame(test_std, columns=test.columns)

# # force 열 추가
# train_std_df['force'] = train_df['force']
# test_std_df['force'] = test_df['force']

## 데이터 분포 확인

In [None]:
print(train_df.head())
print(test_df.head())

In [None]:
# 각 데이터프레임의 크기 확인
print("Train DataFrame Shape: ", train_df.shape)
print("Test DataFrame Shape: ", test_df.shape)

# 각 데이터프레임의 상위 5개 항목 출력
print(train_df.head())
print(test_df.head())

# 각 열의 정보와 결측치 유무 확인
train_df.info()
test_df.info()

In [None]:
# 기초 통계량 확인 (평균, 중앙값, 최대값, 최소값 등)
train_df.describe()
# test_df.describe()

In [None]:
# train_df force와 energy
df_force = pd.DataFrame(train_df['force'].tolist(), columns=['force_x', 'force_y', 'force_z'], index=train_df.index)
df_energies = pd.DataFrame(energies, columns=['energy'])

print(df_force.head())
print(df_energies.head())

## 텐서 데이터셋 생성

In [None]:
class ForceDataset(Dataset):
    def __init__(self, df, mode='test'):
        self.df = df
        self.mode = mode

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        pos_x = self.df.loc[idx, 'position_x']
        pos_y = self.df.loc[idx, 'position_y']
        pos_z = self.df.loc[idx, 'position_z']
        distance = self.df.loc[idx, 'distance']
        angle = self.df.loc[idx, 'angle']
        mass = self.df.loc[idx, 'mass']
        charge = self.df.loc[idx, 'charge']

        inputs = torch.tensor([pos_x, pos_y, pos_z, distance, angle, mass, charge], dtype=torch.float32) # 입력값 텐서로 변환
        # inputs = torch.tensor([pos_x, pos_y, pos_z, mass, charge], dtype=torch.float32) # 입력값 텐서로 변환

        if not self.mode == 'test':
            label = torch.tensor(self.df.loc[idx, 'force'], dtype=torch.float32) # 라벨 텐서로 변환
            return inputs, label
        else:
            return inputs # test일 경우 라벨을 제외하고 텐서로 변환

In [None]:
# 하이퍼파라미터
input_size = 7  # feature 개수
hidden_size = 256 # 은닉층의 뉴런 개수
output_size = 3 # target 개수
num_epochs = 5 # 에폭 수
batch_size =256 # 미니 배치 크기
learning_rate = 0.01 # 학습률 (가중치 업데이트 스텝의 크기)

In [None]:
# 텐서 데이터 셋 생성
train_dataset = ForceDataset(train_df, 'train')
test_dataset = ForceDataset(test_df, 'test')

In [None]:

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
for inputs, labels in train_loader:
    print(inputs.shape)
    print(labels.shape)
    break

torch.Size([256, 7])
torch.Size([256, 3])


In [None]:
for inputs in test_loader:
    print(inputs.shape)
    break

torch.Size([256, 7])


# 모델 학습

In [None]:
#  ForceModel 클래스는 주어진 입력(x)에 대해 여러 개의 선형 변환과 활성화 함수가 적용된 다층 신경망을 통해 출력(y) 값을 예측하는 역할

class ForceModel(nn.Module): # nn.Module 클래스 상속
    def __init__(self, input_size, hidden_size): # 입력 크기, 은닉층 크기
        super(ForceModel, self).__init__()

        self.layers = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.BatchNorm1d(hidden_size),
            nn.ReLU(),
            nn.Dropout(0.5),

            nn.Linear(hidden_size, 128),
            nn.BatchNorm1d(128),
            nn.LeakyReLU(0.01),
            nn.Dropout(0.5),

            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.5),

            nn.Linear(64, 3)
        )

    def forward(self, x):
        y = self.layers(x)
        return y

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"current device is {device}")
model = ForceModel(input_size, hidden_size).to(device) # 모델 정의
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

current device is cuda


In [None]:
print("Training Start!")

model.train()
for epoch in range(num_epochs):
    print(f"{epoch+1}/{num_epochs} epoch..")
    i = 1
    for inputs, labels in tqdm(train_loader):
        optimizer.zero_grad() # 그래디언트값 초기화

        inputs = inputs.to(device)
        labels = labels.to(device) # 값 GPU로 전송

        outputs = model(inputs) # 모델에 입력값 전달 후 예측 수행

        force_rmse_x = torch.sum((outputs[:, 0] - labels[:, 0])**2)
        force_rmse_y = torch.sum((outputs[:, 1] - labels[:, 1])**2)
        force_rmse_z = torch.sum((outputs[:, 2] - labels[:, 2])**2)
        loss = torch.sqrt((force_rmse_x + force_rmse_y + force_rmse_z) / (3 * len(inputs)))

        loss.backward() # 역전파를 통한 손실에 대한 그래디언트 계산
        optimizer.step() # 계산된 그래디언트 정보를 활용하여 파라미터 업데이트 (최소값을 찾아내도록 도와줌)

        if i % 6000 == 0:
             print(f"loss: {loss.item():>7f}")
        i+=1

print("Training Complete!")

Training Start!
1/5 epoch..


  0%|          | 0/5020 [00:00<?, ?it/s]

2/5 epoch..


  0%|          | 0/5020 [00:00<?, ?it/s]

3/5 epoch..


  0%|          | 0/5020 [00:00<?, ?it/s]

4/5 epoch..


  0%|          | 0/5020 [00:00<?, ?it/s]

5/5 epoch..


  0%|          | 0/5020 [00:00<?, ?it/s]

Training Complete!


In [None]:
print("Inference Start!") # 추론 시작

model.eval() # 평가 모드로 설정

preds = []
with torch.no_grad():
    for inputs in tqdm(test_loader):
        inputs = inputs.to(device) # input값 GPU로 이동
        outputs = model(inputs) # 예측 수행

        pred = outputs.detach().cpu().numpy() # 예측값에서 그래디언트 연결, CPU로 이동 ,numpy array로 변환하여 변수 저장
        preds.extend(pred) # 값 리스트에 추가

print("Inference Complete!")
len(preds)

Inference Start!


  0%|          | 0/1154 [00:00<?, ?it/s]

Inference Complete!


295234

In [None]:
test_df['force'] = preds # 예측 결과 저장

In [None]:
# 한 분자가 몇 개의 원자로 이루어져 있는지에 따라 범위를 생성
bundles_train, bundles_test = [], []

flag = 0
for size in sequence_train:
    bundles_train.append((flag, flag+size)) # 시작점, 끝점-1
    flag += size

flag = 0
for size in sequence_test:
    bundles_test.append((flag, flag+size)) # 시작점, 끝점-1
    flag += size

In [None]:
preds_force = []

for start, end in bundles_test:
    preds_force.append(np.vstack(preds[start:end])) # 2차원 array로 분자 내의 원자의 예측값 저장

sample['force'] = preds_force
sample

Unnamed: 0,ID,energy,force
0,TEST_0000,0,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
1,TEST_0001,0,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
2,TEST_0002,0,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
3,TEST_0003,0,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
4,TEST_0004,0,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
...,...,...,...
4096,TEST_4096,0,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
4097,TEST_4097,0,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
4098,TEST_4098,0,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
4099,TEST_4099,0,"[[0.0037456483, -0.03332818, -0.006307662], [0..."


In [None]:
# 'force' 컬럼의 값을 분해하여 각각의 행으로 만듦
force_df = train_df['force'].apply(pd.Series)
force_df.columns = [f'force_{i}' for i in range(3)]

# 분해한 'force' 컬럼을 추가
train_df = train_df.drop('force', axis=1).join(force_df)

# 'force' 컬럼의 값을 분해하여 각각의 행으로 만듦
force_df = test_df['force'].apply(pd.Series)
force_df.columns = [f'force_{i}' for i in range(3)]

# 분해한 'force' 컬럼을 추가
test_df = test_df.drop('force', axis=1).join(force_df)
test_df.head()

Unnamed: 0,position_x,position_y,position_z,distance,angle,mass,charge,force_0,force_1,force_2
0,9.671275,8.734431,6.151755,10.338275,1.05752,28.085,4,0.003746,-0.033328,-0.006308
1,1.676806,2.238918,5.27045,9.864149,0.496928,28.085,4,0.003746,-0.033328,-0.006308
2,10.358608,4.824889,9.174357,6.048477,0.635066,28.085,4,0.003746,-0.033328,-0.006308
3,4.37062,5.391541,9.812298,5.410387,2.026667,28.085,4,0.003746,-0.033328,-0.006308
4,2.453404,10.449967,9.906622,5.256397,0.958819,28.085,4,0.003746,-0.033328,-0.006308


In [None]:
# 데이터프레임에서 분자별로 정보 자르기
sequences_train = [train_df.iloc[start:end].values for start, end in bundles_train]
sequences_test = [test_df.iloc[start:end].values for start, end in bundles_test]

In [None]:
input_size = 10  # feature 개수
hidden_size = 512
output_size = 1 # target 개수
num_epochs = 10
batch_size = 128
learning_rate = 0.01

In [None]:
# 분자당 원자 수가 다를 것을 방지한 패딩 수행

# 패딩을 사용하여 모든 시퀀스의 길이를 동일하게 만듦
max_len = max(seq.shape[0] for seq in sequences_train)
padded_sequences = [np.vstack([seq, np.zeros((max_len - seq.shape[0], 10))]) for seq in sequences_train]

# 패딩된 시퀀스를 2차원 배열로 변환 (수직 방향으로 쌓음)
padded_array_train = np.stack(padded_sequences)
X_tensor_train = torch.tensor(padded_array_train, dtype=torch.float32)
y_tensor_train = torch.tensor(energies, dtype=torch.float32).view(-1, 1)
train_dataset = TensorDataset(X_tensor_train, y_tensor_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# 패딩을 사용하여 모든 시퀀스의 길이를 동일하게 만듦
max_len = max(seq.shape[0] for seq in sequences_test)
padded_sequences = [np.vstack([seq, np.zeros((max_len - seq.shape[0], 10))]) for seq in sequences_test]

# 패딩된 시퀀스를 2차원 배열로 변환 (수직 방향으로 쌓음)
padded_array_test = np.stack(padded_sequences)
X_tensor_test = torch.tensor(padded_array_test, dtype=torch.float32)
test_dataset = TensorDataset(X_tensor_test)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# BiLSTM 모델 정의 <- 에너지의 값이 분자 내 원자들 간의 순서와 관련이 있는 "시퀀스 데이터"
# 이 외에도 GRU GNN, Set 등을 사용해볼 수 있음

class EnergyModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, dropout_rate=0.5):
        super(EnergyModel, self).__init__()

        # Bidirectional LSTM with Dropout
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
                            batch_first=True,
                            dropout=dropout_rate,
                            bidirectional=True)

        # Bidirectional LSTM이므로 hidden_size 조정
        self.linear = nn.Sequential(
            nn.Linear(hidden_size * 2, hidden_size),
            nn.ReLU(),
            nn.BatchNorm1d(hidden_size),
            nn.Dropout(dropout_rate),
            nn.Linear(hidden_size, 1)

        )

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        energy = self.linear(lstm_out[:, -1, :])
        return energy

In [None]:
# 모델, 손실 함수, 옵티마이저 초기화
model = EnergyModel(input_size, hidden_size).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)



In [None]:
print("Training Start!!")

# 학습
model.train()
for epoch in range(num_epochs):
    print(f"{epoch+1}/{num_epochs} epoch..")

    i = 1

    for inputs, labels in tqdm(train_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)

        num_atoms = len(inputs[0])

        energy_rmse = torch.sqrt(torch.mean(((outputs[:, 0] - labels[:, 0])/num_atoms)**2)) # per-atom energy RMSE
        loss = energy_rmse

        loss.backward()
        optimizer.step()

        if i % 20 == 0:
             print(f"loss: {loss.item():>7f}")
        i+=1

print("Training Complete!")

Training Start!!
1/10 epoch..


  0%|          | 0/176 [00:00<?, ?it/s]

loss: 0.806539
loss: 0.729107
loss: 0.789554
loss: 0.366138
loss: 0.644325
loss: 0.639617
loss: 0.287848
loss: 0.252289
2/10 epoch..


  0%|          | 0/176 [00:00<?, ?it/s]

loss: 0.479367
loss: 0.233452
loss: 0.302706
loss: 0.178754
loss: 0.273765
loss: 0.434639
loss: 0.476252
loss: 0.221493
3/10 epoch..


  0%|          | 0/176 [00:00<?, ?it/s]

loss: 0.239054
loss: 0.165884
loss: 0.332586
loss: 0.185294
loss: 0.231519
loss: 0.148733
loss: 0.235971
loss: 0.209051
4/10 epoch..


  0%|          | 0/176 [00:00<?, ?it/s]

loss: 0.131409
loss: 0.392862
loss: 0.192386
loss: 0.366161
loss: 0.330489
loss: 0.265132
loss: 0.426293
loss: 0.194908
5/10 epoch..


  0%|          | 0/176 [00:00<?, ?it/s]

loss: 0.478570
loss: 0.375564
loss: 0.197955
loss: 0.360065
loss: 0.400107
loss: 0.292945
loss: 0.296453
loss: 0.487948
6/10 epoch..


  0%|          | 0/176 [00:00<?, ?it/s]

loss: 0.234852
loss: 0.261420
loss: 0.376910
loss: 0.446730
loss: 0.431501
loss: 0.210437
loss: 0.445306
loss: 0.360781
7/10 epoch..


  0%|          | 0/176 [00:00<?, ?it/s]

loss: 0.295627
loss: 0.327470
loss: 0.135929
loss: 0.447727
loss: 0.268358
loss: 0.148253
loss: 0.356400
loss: 0.235743
8/10 epoch..


  0%|          | 0/176 [00:00<?, ?it/s]

loss: 0.248788
loss: 0.300960
loss: 0.173123
loss: 0.173857
loss: 0.145157
loss: 0.182604
loss: 0.272480
loss: 0.187457
9/10 epoch..


  0%|          | 0/176 [00:00<?, ?it/s]

loss: 0.198021
loss: 0.203772
loss: 0.144830
loss: 0.192333
loss: 0.113423
loss: 0.264586
loss: 0.344553
loss: 0.159564
10/10 epoch..


  0%|          | 0/176 [00:00<?, ?it/s]

loss: 0.180552
loss: 0.208826
loss: 0.168289
loss: 0.183871
loss: 0.181674
loss: 0.141504
loss: 0.151734
loss: 0.165306
Training Complete!


In [None]:
print("Inference Start!")

model.eval()

preds = []
with torch.no_grad():
    for inputs in tqdm(test_loader):
        inputs = inputs[0].to(device)

        outputs = model(inputs)
        pred = outputs.detach().cpu().numpy()

        preds.extend(pred)

print("Inference Complete!")
len(preds)

Inference Start!


  0%|          | 0/33 [00:00<?, ?it/s]

Inference Complete!


4101

In [None]:
preds = [pred.item() for pred in preds]
sample['energy'] = preds
sample

Unnamed: 0,ID,energy,force
0,TEST_0000,389.364044,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
1,TEST_0001,411.816895,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
2,TEST_0002,409.761597,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
3,TEST_0003,406.687622,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
4,TEST_0004,412.988129,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
...,...,...,...
4096,TEST_4096,637.186462,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
4097,TEST_4097,634.899902,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
4098,TEST_4098,610.392944,"[[0.0037456483, -0.03332818, -0.006307662], [0..."
4099,TEST_4099,614.720947,"[[0.0037456483, -0.03332818, -0.006307662], [0..."


In [None]:
sample.to_csv('경로/파일명.csv', index=False)