# 2024 생명연구자원 AI활용 경진대회 : 인공지능 활용 부문
#### https://dacon.io/competitions/official/236355/overview/description
#### LLM 이용 Classification
#### 사전학습모델: 

In [1]:
import pandas as pd
import numpy as np
train = pd.read_csv('./train.csv')
test = pd.read_csv('./test.csv')

In [2]:
# 결과를 저장할 리스트
data = []

# 각 row를 순회하면서 'WT'가 아닌 변이들만 추출
for index, row in train.iterrows():
    subclass = row['SUBCLASS']
    mutations = []

    # 'ID'와 'SUBCLASS'를 제외한 나머지 컬럼에서 'WT'가 아닌 값을 추출
    for col in train.columns[2:]:
        mutation = row[col]

        # 변이가 'WT'가 아닐 때만 (R195R, 1499_1500HL>HL 등은 제외)
        if mutation != "WT" and not (
            mutation.endswith(mutation[0]) or  # 예: R195R
            ">" in mutation and mutation.split(">")[0] == mutation.split(">")[1]  # 예: 1499_1500HL>HL
        ):
            mutations.append(f"({col},{mutation})")

    # 변이가 있을 경우 리스트에 추가
    if mutations:
        data.append({
            "subclass": subclass,
            "mutations": " ".join(mutations)
        })


In [3]:
import re
import torch
from transformers import BertForSequenceClassification, Trainer, TrainingArguments
from sklearn.model_selection import train_test_split


# Custom Tokenizer 함수
def mutation_tokenizer(mutation_data):
    tokenized_data = []
    
    for entry in mutation_data:
        subclass = entry['subclass']
        mutations = entry['mutations']
        
        # 변이별로 유전자와 변이 패턴을 추출
        mutation_list = re.findall(r'\((\w+),([A-Za-z0-9\*\>fs]+)\)', mutations)
        tokenized_mutations = []
        
        for gene, mutation in mutation_list:
            tokenized_mutation = mutation_tokenize(gene, mutation)
            tokenized_mutations.append(tokenized_mutation)
        
        # 토큰화된 변이를 추가
        tokenized_data.append({
            "subclass": subclass,
            "tokenized_mutations": tokenized_mutations
        })
    
    return tokenized_data

In [4]:
# 개별 변이를 처리하는 함수
def mutation_tokenize(gene, mutation):
    patterns = [
        (r"([A-Z])(\d+)([A-Z])", lambda m: [gene, m.group(1), m.group(2), m.group(3)]),
        (r"([A-Z])(\d+)(\*)", lambda m: [gene, m.group(1), m.group(2), '*']),
        (r"([A-Z])(\d+)(fs)", lambda m: [gene, m.group(1), m.group(2), 'fs']),
        (r"([A-Z])(\d+)del", lambda m: [gene, m.group(1), m.group(2), 'del']),
        (r"([A-Z])(\d+)_([A-Z])(\d+)ins([A-Z]+)", lambda m: [gene, m.group(1), m.group(2), '_', m.group(3), m.group(4), 'ins', m.group(5)]),
        (r"(\d+)_(\d+)([A-Z]+)>([A-Z]+)", lambda m: [gene, m.group(1), '_', m.group(2), m.group(3), '>', m.group(4)])
    ]
    
    for pattern, handler in patterns:
        match = re.search(pattern, mutation)
        if match:
            return handler(match)
    
    return [gene, mutation]

In [5]:
# Custom Tokenizer로 변환된 데이터를 단순 문자열로 변환
def convert_tokenized_data_to_text(tokenized_data):
    texts = []
    labels = []
    
    for entry in tokenized_data:
        subclass = entry['subclass']
        mutations = " ".join([" ".join(mutation) for mutation in entry['tokenized_mutations']])
        texts.append(mutations)
        labels.append(subclass)
    
    return texts, labels

In [6]:
# 데이터 토큰화 및 문자열 변환
tokenized_data = mutation_tokenizer(data)
texts, labels = convert_tokenized_data_to_text(tokenized_data)

In [11]:
# 아미노산 종류, 변이, 특수 기호 등을 포함한 기본 vocab
vocab = {
    # 아미노산 종류
    'A': 1, 'R': 2, 'N': 3, 'D': 4, 'C': 5,
    'E': 6, 'Q': 7, 'G': 8, 'H': 9, 'I': 10,
    'L': 11, 'K': 12, 'M': 13, 'F': 14, 'P': 15,
    'S': 16, 'T': 17, 'W': 18, 'Y': 19, 'V': 20,
    
    # 특수 변이 기호
    'fs': 21,   # 프레임시프트 변이
    'del': 22,  # 결실 변이
    'ins': 23,  # 삽입 변이
    '*': 24,    # 종결 코돈
    '>': 25,    # 변이를 나타내는 기호 (A>G 등)
    '_': 26,    # 위치 구분 (A35_L36 등)
    
    # 숫자 자릿수 인코딩
    '0': 27, '1': 28, '2': 29, '3': 30, '4': 31,
    '5': 32, '6': 33, '7': 34, '8': 35, '9': 36,
}

# 유전자 이름 예시 (유전자 이름도 포함하여 처리)
gene_names = train.columns[2:]

# 유전자 이름을 vocab에 추가 
for i, gene in enumerate(gene_names, start=len(vocab)+1):
    vocab[gene] = i

# 학습준비

In [8]:
import torch

# 텍스트(변이 정보)를 토큰화하고, vocab을 사용해 숫자 인덱스로 변환하는 함수
def tokenize_and_encode(text, vocab, max_length=512):
    tokens = text.split()  # 공백을 기준으로 문자열을 토큰화
    token_ids = [vocab.get(token, 0) for token in tokens]  # 토큰을 vocab에서 인덱스로 변환
    
    # 패딩 및 트렁케이션
    if len(token_ids) < max_length:
        token_ids += [0] * (max_length - len(token_ids))  # 부족한 부분을 0으로 패딩
    else:
        token_ids = token_ids[:max_length]  # 너무 긴 부분은 잘라냄
    
    return torch.tensor(token_ids)


# 각 변이 데이터를 숫자 인덱스로 변환
max_length = 512  # 고정된 시퀀스 길이
encoded_mutations = [tokenize_and_encode(text, vocab, max_length) for text in texts]

In [9]:
# 라벨 리스트 (총 26개 클래스)
cancer_type = ['KIPAN', 'SARC', 'SKCM', 'KIRC', 'GBMLGG', 'STES', 'BRCA', 'THCA',
          'LIHC', 'HNSC', 'PAAD', 'OV', 'PRAD', 'UCEC', 'LAML', 'COAD',
          'ACC', 'LGG', 'LUSC', 'LUAD', 'CESC', 'PCPG', 'THYM', 'BLCA',
          'TGCT', 'DLBC']

# 라벨을 인덱스로 매핑하는 사전 생성
label_to_id = {label: idx for idx, label in enumerate(cancer_type)}
id_to_label = {idx: label for label, idx in label_to_id.items()}

In [27]:
# 텍스트(변이 정보)를 Custom Tokenizer로 토큰화하고, vocab을 사용해 숫자 인덱스로 변환하는 함수
def custom_tokenize_and_encode(text, vocab, max_length=512):
    tokens = text.split()  # 공백을 기준으로 문자열을 Custom Tokenizer로 토큰화
    token_ids = [vocab.get(token, 0) for token in tokens]  # Custom Tokenizer를 이용해 토큰을 인덱스로 변환

    # 패딩 및 트렁케이션
    attention_mask = [1] * len(token_ids)  # 실제 토큰은 1로 표시
    if len(token_ids) < max_length:
        padding_length = max_length - len(token_ids)
        token_ids += [0] * padding_length  # 부족한 부분을 0으로 패딩
        attention_mask += [0] * padding_length  # 패딩 부분은 0으로 표시
    else:
        token_ids = token_ids[:max_length]  # 너무 긴 부분은 자름
        attention_mask = attention_mask[:max_length]
    
    return torch.tensor(token_ids), torch.tensor(attention_mask)

# 학습

In [13]:
from torch.utils.data import Dataset, DataLoader

class MutationCustomDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_length=512):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        
        # Custom Tokenizer로 토큰화 및 인코딩
        input_ids, attention_mask = custom_tokenize_and_encode(text, self.vocab, self.max_length)

        # input_ids와 attention_mask, 그리고 label을 반환
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': torch.tensor(label, dtype=torch.long)
        }


In [22]:
# 데이터 준비
train_labels = [label_to_id[label] for label in labels]  # 라벨을 숫자로 변환

# Dataset 생성
train_dataset = MutationCustomDataset(texts, train_labels, vocab)

# DataLoader 생성
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

In [24]:
from transformers import BertForSequenceClassification, AdamW

# BERT 모델 불러오기
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=26)  # 26개 클래스

# AdamW 옵티마이저 설정
optimizer = AdamW(model.parameters(), lr=5e-5)

# GPU 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(30522, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12,

In [43]:
from collections import Counter
import torch
import torch.nn as nn

# 라벨 데이터에서 각 클래스의 샘플 개수 계산
class_sample_counts = Counter(train_labels)  # train_labels는 학습 데이터의 실제 라벨 리스트
total_samples = len(train_labels)

# 클래스별 가중치 계산
class_weights = [1.0 / class_sample_counts[i] for i in range(len(class_sample_counts))]
class_weights_tensor = torch.tensor(class_weights).to(device)
# 3. CrossEntropyLoss에서 가중치 설정
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)

# 4. 모델 학습 코드 (손실 함수 사용)
epochs = 1
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch in train_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()

        # 모델에 데이터 입력
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = criterion(outputs.logits, labels)  # CrossEntropyLoss 사용
        
        # 역전파 및 최적화
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")

# 모델 저장 경로 설정
model_save_path = "bert_cancer_model.pth"

# 학습한 모델 저장
torch.save(model.state_dict(), model_save_path)

print(f"Model saved to {model_save_path}")

Epoch 1/1, Loss: 3.2644


In [46]:
# F1 스코어 계산을 위한 예측
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch in train_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        # 모델 예측
        outputs = model(input_ids, attention_mask=attention_mask)
        preds = torch.argmax(outputs.logits, dim=1)
        
        all_preds.extend(preds.cpu().tolist())
        all_labels.extend(labels.cpu().tolist())
f1 = f1_score(all_labels, all_preds, average='macro')
print(f"F1 Score: {f1:.4f}")

NameError: name 'f1_score' is not defined

## test.csv 예측

In [None]:
# 모델과 토크나이저 로드 (Fine-tuned model)
model = AutoModelForSequenceClassification.from_pretrained("/content/results/checkpoint-2444")
tokenizer = AutoTokenizer.from_pretrained("dmis-lab/biobert-v1.1")

In [None]:
# 예측할 클래스 저장할 리스트
predictions = []

# test.csv 전처리 및 예측
for _, row in test.iterrows():
    # 입력 데이터 포맷팅
    new_input = " ".join([f"{col}:{row[col]}" for col in test.columns if col != 'ID' and str(row[col]) != 'WT'])  # WT가 아닌 유전체 관련 컬럼만 결합
    formatted_input = f"SUBCLASS: [UNKNOWN] [SEP] {new_input}"  # [UNKNOWN]은 예측할 클래스가 없음

    # 입력 데이터 토크나이즈
    inputs = tokenizer(formatted_input, return_tensors="pt", padding="max_length", truncation=True, max_length=512)

    # 모델 평가 모드로 전환
    model.eval()

    # 예측
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        predicted_class = labels[torch.argmax(logits, dim=1).item()]

    # 예측된 클래스 저장
    predictions.append(predicted_class)
    if (_ + 1) % 100 == 0:
      print(_+1)



In [None]:
# sample_submission.csv 파일 로드
submission_df = pd.read_csv('sample_submission.csv')

# 예측 결과를 sample_submission DataFrame에 덮어쓰기
submission_df['SUBCLASS'] = predictions

# 결과를 csv 파일로 저장
submission_df.to_csv('submission_0929.csv', index=False)