In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# 기본 데이터 처리 및 파일 시스템 라이브러리
import os
import random
import numpy as np
import pandas as pd
from pathlib import Path

# PyTorch 및 Transformer 라이브러리
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm.auto import tqdm
from transformers import AutoTokenizer, AutoModelForMaskedLM

In [None]:
def set_seed(seed=42):
    """재현성 확보를 위한 시드(Seed) 설정"""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
set_seed(42)

In [None]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
use_amp = (DEVICE == "cuda") # 자동 혼합 정밀도(AMP) 사용 여부
print(f"Device: {DEVICE}")

Device: cuda


In [None]:
# 데이터 경로
data_path = '/content/drive/MyDrive/gene/'
TRAIN_DATA_PATH = data_path + 'human_genome_train.csv'


In [None]:
# --- 공통 설정 ---
BATCH_SIZE = 64                        # 추론 시 배치 사이즈
NUM_WORKERS = 2

# --- Fine-Tuning 설정 ---
FT_BATCH_SIZE = 4                    # Fine-Tuning 시 배치 사이즈 (메모리 부족 시 4 또는 2로 조정)
NUM_EPOCHS = 3                         # 학습 에포크 수
LEARNING_RATE = 2e-5                   # 학습률
OUTPUT_DIR = './ft_model'              # 파인 튜닝된 모델 저장 경로
GRAD_ACC_STEPS = 8                     # Gradient Accumulation 단계 (논리적 배치 크기 = FT_BATCH_SIZE * 4)
MODEL_ID = "InstaDeepAI/nucleotide-transformer-v2-500m-multi-species"

In [None]:
def reduce_mem_usage(df):
    """
    데이터프레임의 메모리 사용량을 줄이는 함수 (대용량 데이터 대비)\n
    숫자 컬럼의 데이터 타입을 더 작은 크기로 변환합니다.
    """
    start_mem = df.memory_usage().sum() / 1024**2
    print(f'Original Memory usage: {start_mem:.2f} MB')

    # int 및 float 컬럼의 데이터 타입을 더 작은 크기로 변환하는 로직
    for col in df.columns:
        col_type = df[col].dtype
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32)
                else: df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32)
                else: df[col] = df[col].astype(np.float64)

    end_mem = df.memory_usage().sum() / 1024**2
    print(f'Optimized Memory usage: {end_mem:.2f} MB. Reduced by {100 * (start_mem - end_mem) / start_mem:.1f}%')
    return df

# [Sparse Loading] 필요한 컬럼(ID, seq)만 로드
df_train = pd.read_csv(TRAIN_DATA_PATH, usecols=['ID', 'seq'])
df = pd.read_csv(data_path + 'test.csv', usecols=['ID', 'seq'])

# 메모리 최적화 적용
df_train = reduce_mem_usage(df_train)
df = reduce_mem_usage(df)

# 데이터 최대 길이 계산 (토크나이저 설정에 사용)
max_seq_len_train = df_train["seq"].str.len().max()
max_seq_len_test = df["seq"].str.len().max()
max_seq_len = max(max_seq_len_train, max_seq_len_test)
print(f"✅ Max sequence length = {max_seq_len}")

Original Memory usage: 3.05 MB
Optimized Memory usage: 3.05 MB. Reduced by 0.0%
Original Memory usage: 0.21 MB
Optimized Memory usage: 0.21 MB. Reduced by 0.0%
✅ Max sequence length = 1024


In [None]:
# 사전 학습된 Nucleotide Transformer 로드
# AutoTokenizer를 통해 토크나이저를 로드합니다.
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
# AutoModelForMaskedLM을 통해 Masked Language Model(MLM) 구조의 모델을 로드합니다.
model = AutoModelForMaskedLM.from_pretrained(MODEL_ID, trust_remote_code=True)

model = model.to(DEVICE) # 모델을 GPU/CPU로 이동합니다.

# 모델이 처리 가능한 최대 길이와 데이터의 최대 길이 비교
MODEL_CAP = tokenizer.model_max_length
# 토크나이징 시 사용할 최종 최대 길이 (모델 CAP과 데이터 길이 중 더 작은 값)
EFFECTIVE_MAX_LEN = min(MODEL_CAP, max_seq_len)
print(f"✅ Model Max Length: {MODEL_CAP}, Effective Max Length: {EFFECTIVE_MAX_LEN}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/129 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/101 [00:00<?, ?B/s]

config.json: 0.00B [00:00, ?B/s]

esm_config.py: 0.00B [00:00, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/InstaDeepAI/nucleotide-transformer-v2-500m-multi-species:
- esm_config.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


modeling_esm.py: 0.00B [00:00, ?B/s]

A new version of the following files was downloaded from https://huggingface.co/InstaDeepAI/nucleotide-transformer-v2-500m-multi-species:
- modeling_esm.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


model.safetensors:   0%|          | 0.00/1.99G [00:00<?, ?B/s]

✅ Model Max Length: 2048, Effective Max Length: 1024


In [None]:
# N 결측값을 [MASK] 토큰으로 치환하여 Fine-Tuning 시 강건성 확보
MASK_TOKEN = tokenizer.mask_token

if MASK_TOKEN is None:
    print("⚠️ Warning: Tokenizer does not have a defined [MASK] token. Skipping N replacement.")
else:
    print(f"✅ 'N' 결측값을 '{MASK_TOKEN}' 토큰으로 치환합니다.")

    # 훈련 데이터 및 테스트 데이터에 동일하게 적용
    # N을 MASK_TOKEN으로 치환합니다. regex=False로 설정하여 문자열 치환 속도를 높입니다.
    df_train['seq'] = df_train['seq'].str.replace('N', MASK_TOKEN, regex=False)
    df['seq'] = df['seq'].str.replace('N', MASK_TOKEN, regex=False)

    print("✅ 데이터 전처리 (N -> [MASK]) 완료.")

✅ 'N' 결측값을 '<mask>' 토큰으로 치환합니다.
✅ 데이터 전처리 (N -> [MASK]) 완료.


In [None]:
class SeqDataset(Dataset):
    """PyTorch Dataset 클래스: ID와 Sequence를 관리"""
    def __init__(self, df, tokenizer, max_len):
        self.ids  = df["ID"].tolist()
        self.seqs = df["seq"].tolist()
        self.tok  = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, idx):
        return {"ID": self.ids[idx], "seq": self.seqs[idx]}

def collate_fn(batch, tok=tokenizer, max_len=EFFECTIVE_MAX_LEN):
    """배치 데이터를 토크나이징하고 패딩 및 텐서화하는 함수"""
    ids  = [b["ID"] for b in batch]
    seqs = [b["seq"] for b in batch]
    enc  = tok.batch_encode_plus(
        seqs,
        return_tensors="pt",
        padding="longest",
        truncation=True,
        max_length=max_len
    )
    return {
        "ids": ids,
        "input_ids": enc["input_ids"],
        "attention_mask": enc["attention_mask"]
    }

# [Fine-Tuning Dataloader] 학습 시 사용, Shuffle=True
train_dataset = SeqDataset(df_train, tokenizer, EFFECTIVE_MAX_LEN)
train_loader  = DataLoader(
    train_dataset,
    batch_size=FT_BATCH_SIZE,
    shuffle=True, # 학습 시에는 데이터를 섞어줍니다.
    num_workers=NUM_WORKERS,
    collate_fn=collate_fn
)
print("✅ Fine-Tuning Dataloader ready.")

# [Test Dataloader] 추론 시 사용, Shuffle=False
test_dataset = SeqDataset(df, tokenizer, EFFECTIVE_MAX_LEN)
test_loader  = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False, # 추론 시에는 순서를 유지합니다.
    num_workers=NUM_WORKERS,
    collate_fn=collate_fn
)
print("✅ Test Dataloader ready.")

✅ Fine-Tuning Dataloader ready.
✅ Test Dataloader ready.


In [None]:
## 옵티마이저 및 학습 설정
# AdamW는 Transformer 모델에 적합한 옵티마이저입니다.
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

# 모델을 훈련 모드(train)로 전환 (Dropout 등이 활성화됩니다)
model.train()

print(f"🚀 Starting Fine-Tuning for {NUM_EPOCHS} epochs...")

for epoch in range(NUM_EPOCHS):
    total_loss = 0

    # tqdm을 사용하여 학습 진행률을 시각적으로 표시합니다.
    for step, batch in enumerate(tqdm(train_loader, desc=f"Epoch {epoch+1}")):

        input_ids = batch["input_ids"].to(DEVICE)
        attn_mask = batch["attention_mask"].to(DEVICE)
        labels = input_ids.clone() # MLM Loss 계산을 위해 input_ids를 label로 사용합니다.

        # Gradient Accumulation 초기화 (누적 스텝 시작 시 optimizer.step() 직전에 초기화)
        if step % GRAD_ACC_STEPS == 0:
            optimizer.zero_grad()

        # 학습 실행 (AMP 적용: 메모리 절약 및 속도 향상)
        with torch.autocast(device_type="cuda", dtype=torch.float16, enabled=use_amp):
            outputs = model(
                input_ids,
                attention_mask=attn_mask,
                labels=labels # labels를 제공하면 모델이 내부적으로 MLM loss를 계산합니다.
            )

        # Loss 정규화 및 역전파 (누적 스텝 수로 loss를 나눕니다.)
        loss = outputs.loss / GRAD_ACC_STEPS
        loss.backward()

        # Gradient Accumulation 업데이트 (누적 스텝에 도달하거나 마지막 스텝인 경우)
        if (step + 1) % GRAD_ACC_STEPS == 0 or (step + 1) == len(train_loader):
            optimizer.step() # 가중치 업데이트

        total_loss += loss.item() * GRAD_ACC_STEPS # 실제 Loss 크기를 복원하여 총 로스에 합산합니다.

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1} completed. Average Loss: {avg_loss:.4f}")

# --- Fine-Tuning 완료 후 모델 저장 ---
Path(OUTPUT_DIR).mkdir(exist_ok=True)
model.save_pretrained(OUTPUT_DIR)
tokenizer.save_pretrained(OUTPUT_DIR)
print(f"✅ Fine-Tuning completed. Model saved to {OUTPUT_DIR}")

🚀 Starting Fine-Tuning for 3 epochs...


Epoch 1:   0%|          | 0/50000 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
# Fine-Tuned된 모델을 저장 경로에서 다시 로드
MODEL_PATH = OUTPUT_DIR

print(f"Loading Fine-Tuned Model Structure from: {MODEL_ID}")
print(f"Loading Fine-Tuned Weights from: {MODEL_PATH}")

# 1. 모델 구조와 토크나이저는 원본 ID에서 가져옵니다.
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, trust_remote_code=True)
model = AutoModelForMaskedLM.from_pretrained(MODEL_ID, trust_remote_code=True)

# 2. 저장된 가중치 파일 확인 (bin 또는 safetensors)
weights_path_bin = Path(MODEL_PATH) / "pytorch_model.bin"
weights_path_safe = Path(MODEL_PATH) / "model.safetensors"

if weights_path_bin.exists():
    print("✅ Found 'pytorch_model.bin'. Loading weights...")
    model.load_state_dict(torch.load(weights_path_bin, map_location=DEVICE))
    print("✅ Fine-Tuned 가중치 로드 완료.")

elif weights_path_safe.exists():
    print("✅ Found 'model.safetensors'. Loading weights...")
    # safetensors 라이브러리를 사용하여 로드
    from safetensors.torch import load_file
    state_dict = load_file(weights_path_safe)
    model.load_state_dict(state_dict)
    print("✅ Fine-Tuned 가중치(Safetensors) 로드 완료.")

else:
    print("❌ Critical Warning: 저장된 가중치 파일을 찾을 수 없습니다!")
    print(f"Checked: {weights_path_bin} and {weights_path_safe}")
    print("Fine-Tuning이 제대로 완료되지 않았거나 파일이 저장되지 않았을 수 있습니다.")

# 추론 준비
model = model.to(DEVICE).eval()

all_ids = []
all_embs = []

with torch.no_grad():
    for batch in tqdm(test_loader, desc="Extracting Embeddings"):
        input_ids = batch["input_ids"].to(DEVICE)
        attn_mask = batch["attention_mask"].to(DEVICE)

        with torch.autocast(device_type="cuda", dtype=torch.float16, enabled=use_amp):
            outs = model(
                input_ids,
                attention_mask=attn_mask,
                output_hidden_states=True,
            )

            # Layer Mixing & Pooling
            hidden_states = outs.hidden_states
            last_4_layers = hidden_states[-4:]
            token_embeddings = torch.stack(last_4_layers, dim=0).mean(dim=0)

            mask_exp = attn_mask.unsqueeze(-1)
            summed = (token_embeddings * mask_exp).sum(dim=1)
            counts = mask_exp.sum(dim=1).clamp(min=1)
            seq_emb = summed / counts

        all_ids.extend(batch["ids"])
        all_embs.append(seq_emb.detach().cpu())

emb = torch.vstack(all_embs).float()
N, H = emb.shape
print(f"✅ Final Embedding shape = {N} x {H}")

In [None]:
# 샘플 제출 파일을 로드하여 ID 컬럼을 가져옵니다.
sample_submission = pd.read_csv(data_path + 'sample_submission.csv')

emb_np = emb.numpy()
# 임베딩 컬럼명 생성 (emb_0000, emb_0001, ...)
emb_cols = [f"emb_{i:04d}" for i in range(emb_np.shape[1])]
emb_df = pd.DataFrame(emb_np, columns=emb_cols)

# ID와 임베딩 데이터프레임을 결합
submission = pd.concat([sample_submission['ID'], emb_df], axis=1)

# 최종 CSV 파일 저장
submission.to_csv('ft_enhanced_submission.csv', index=False)
print("✅ Submission file saved as 'ft_enhanced_submission.csv'")