<a href="https://colab.research.google.com/github/rlaaudrb1104/Ai/blob/WOOK/GraphCodeBERT_%ED%8C%8C%EC%9D%B4%ED%86%A0%EC%B9%98.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install tokenizers
!pip install pandas
!pip install torch
!pip install tqdm
!pip install scikit-learn

Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch)
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-manylinux1_x86_64.whl (121.6 MB)
Collecting nvidia-curand-cu12==10.3.2.106 (from torch)
  Using cached nvidia_curand_cu12-10.3.2.106-py3-none-manylinux1_x86_64.whl (56.5 MB)
Collectin

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import RobertaTokenizer
from tqdm.auto import tqdm
import logging
from torch.utils.data import DataLoader, SequentialSampler
from sklearn.metrics import accuracy_score
from torch.utils.data import DataLoader, RandomSampler
from transformers import AdamW, get_linear_schedule_with_warmup
import os
from tqdm import tqdm
import argparse
from transformers import RobertaTokenizer, RobertaModel
import pickle


In [None]:
# 로깅 설정
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

In [None]:
# 인자 설정 클래스
class Args:
    train_data_file = '/content/drive/MyDrive/final_train2.csv'
    eval_data_file = '/content/drive/MyDrive/final_val.csv'
    test_data_file = '/content/drive/MyDrive/final_train2.csv'
    output_dir = '/content/drive/My Drive/output'
    model_name_or_path = 'microsoft/graphcodebert-base'
    tokenizer_name = 'microsoft/graphcodebert-base'
    block_size = 512

args = Args()

In [None]:
# 토크나이저 설정
tokenizer = RobertaTokenizer.from_pretrained(args.tokenizer_name)

In [None]:
# InputFeatures 클래스 정의
class InputFeatures:
    """데이터의 한 세트의 특성을 정의합니다."""
    def __init__(self, input_ids, attention_mask, cwe_type_label):
        self.input_ids = input_ids
        self.attention_mask = attention_mask
        self.cwe_type_label = cwe_type_label


In [None]:
# 특성 변환 함수
def convert_examples_to_features(func, cwe_type_label, tokenizer, max_length):
    """코드 스니펫을 모델 입력에 적합한 특성으로 변환합니다."""
    encoding = tokenizer.encode_plus(
        text=func,
        add_special_tokens=True,
        max_length=max_length,
        pad_to_max_length=True,
        return_attention_mask=True,
        return_tensors='pt',
        truncation=True
    )
    return InputFeatures(
        input_ids=encoding['input_ids'].flatten(),
        attention_mask=encoding['attention_mask'].flatten(),
        cwe_type_label=cwe_type_label
    )


In [None]:
# TextDataset 클래스 정의
class TextDataset(Dataset):
    def __init__(self, tokenizer, args, cwe_label_map, file_type="train"):
        file_path = getattr(args, f"{file_type}_data_file")
        self.examples = []
        df = pd.read_csv(file_path)
        funcs = df["CODE"].tolist()
        cwe_type_labels = df["CWE ID"].tolist()

        for i in tqdm(range(len(funcs))):
            cwe_type_label = cwe_label_map.get(cwe_type_labels[i], 0)
            features = convert_examples_to_features(funcs[i], cwe_type_label, tokenizer, args.block_size)
            self.examples.append(features)

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
        return {
            'input_ids': torch.tensor(self.examples[idx].input_ids),
            'attention_mask': torch.tensor(self.examples[idx].attention_mask),
            'labels': torch.tensor(self.examples[idx].cwe_type_label)
        }

In [None]:
cwe_label_map = {
    "CWE-20": 2,
    "CWE-119": 1,
    "CWE-78": 3,
    "CWE-122": 4,
    "CWE-121": 5,
    "CWE-415": 6,
    "CWE-399": 7,
    "CWE-190": 8,
    "CWE-125": 9,
    "CWE-416": 10
    # 여기에 더 많은 CWE ID와 인덱스 매핑을 추가할 수 있습니다.
}

In [None]:
# 데이터셋과 데이터 로더 생성
train_dataset = TextDataset(tokenizer, args, cwe_label_map, file_type='train')
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

  df = pd.read_csv(file_path)
100%|██████████| 27982/27982 [01:14<00:00, 376.71it/s]


In [None]:


def compute_adjustment(tau, args, cwe_label_map):
    """클래스 빈도에 따라 로짓 조정 값을 계산합니다.

    Args:
        tau (float): 조정 계수, 클래스 빈도의 지수화에 사용됩니다.
        args (Args class): 설정 정보를 담고 있는 클래스 인스턴스.
        cwe_label_map (dict): 각 CWE ID에 대한 [index, one_hot, frequency] 정보를 담은 사전.

    Returns:
        torch.Tensor: 조정된 로짓 값들이 담긴 텐서.
    """
    # 빈도 수집: 사전 순서대로 빈도를 배열에 저장
    freq = [v[2] for k, v in sorted(cwe_label_map.items(), key=lambda item: item[1][0])]

    # 레이블 빈도를 PyTorch 텐서로 변환하고 정규화
    label_freq_tensor = torch.tensor(freq, dtype=torch.float32, device=args.device)
    label_freq_tensor /= label_freq_tensor.sum()

    # 로짓 조정 계산
    adjustments = torch.log(torch.pow(label_freq_tensor, tau) + 1e-12)

    return adjustments


In [None]:
def compute_adjustment(tau, args, cwe_label_map):
    """클래스 빈도에 따라 로짓 조정 값을 계산합니다.

    Args:
        tau (float): 조정 계수, 클래스 빈도의 지수화에 사용됩니다.
        args (Args class): 설정 정보를 담고 있는 클래스 인스턴스.
        cwe_label_map (dict): 각 CWE ID에 대한 [index, one_hot, frequency] 정보를 담은 사전.

    Returns:
        torch.Tensor: 조정된 로짓 값들이 담긴 텐서.
    """
    # 빈도 수집: 사전 순서대로 빈도를 배열에 저장
    freq = [v[2] for k, v in sorted(cwe_label_map.items(), key=lambda item: item[1][0])]

    # 레이블 빈도를 PyTorch 텐서로 변환하고 정규화
    label_freq_tensor = torch.tensor(freq, dtype=torch.float32, device=args.device)
    label_freq_tensor /= label_freq_tensor.sum()

    # 로짓 조정 계산
    adjustments = torch.log(torch.pow(label_freq_tensor, tau) + 1e-12)

    return adjustments

In [None]:
import random

def set_seed(args):
    """
    모든 난수 생성기의 시드를 설정하여 실험의 재현성을 보장합니다.

    Args:
    - args (Args class): 시드와 GPU 사용 여부를 포함하는 설정 클래스 인스턴스.

    이 함수는 Python의 내장 난수 생성기, NumPy, 그리고 PyTorch에 대해 동일한 시드를 설정합니다.
    또한, 여러 GPU를 사용하는 경우 각 디바이스의 CUDA 난수 생성기에 대해서도 시드를 설정합니다.
    """
    random.seed(args.seed)  # Python 내장 난수 생성기의 시드 설정
    np.random.seed(args.seed)  # NumPy 난수 생성기의 시드 설정
    torch.manual_seed(args.seed)  # PyTorch 난수 생성기의 시드 설정

    # CUDA 난수 생성기의 시드 설정 (GPU 사용 시)
    if args.n_gpu > 0:
        torch.cuda.manual_seed_all(args.seed)

# 예시로 Args 클래스를 정의하고 seed와 n_gpu를 설정합니다.
class Args:
    seed = 42
    n_gpu = 1  # 이 값을 Colab에서 사용할 GPU 수에 맞춰 설정하세요.

args = Args()
set_seed(args)  # 시드 설정 함수 호출


In [None]:

def train(args, train_dataset, model, tokenizer, eval_dataset, cwe_label_map):
    """ 모델을 훈련합니다. """
    # 데이터 로더 구성
    train_sampler = RandomSampler(train_dataset)
    train_dataloader = DataLoader(train_dataset, sampler=train_sampler, batch_size=args.train_batch_size, num_workers=0)

    # 로짓 조정 사용 여부 확인
    if args.use_logit_adjustment:
        logit_adjustment = compute_adjustment(tau=args.tau, args=args, cwe_label_map=cwe_label_map)
    else:
        logit_adjustment = None

    # 최적화 도구 및 스케줄러 설정
    optimizer = AdamW(model.parameters(), lr=args.learning_rate, eps=args.adam_epsilon)
    scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=args.warmup_steps, num_training_steps=args.max_steps)

    model.to(args.device)
    model.train()

    # 훈련 루프
    for epoch in range(args.epochs):
        total_loss = 0
        for batch in tqdm(train_dataloader, desc=f"Epoch {epoch + 1}"):
            batch = {k: v.to(args.device) for k, v in batch.items()}
            outputs = model(**batch)
            loss = outputs.loss

            if args.n_gpu > 1:
                loss = loss.mean()  # multi-gpu 지원
            if args.gradient_accumulation_steps > 1:
                loss = loss / args.gradient_accumulation_steps

            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), args.max_grad_norm)

            total_loss += loss.item()
            if (step + 1) % args.gradient_accumulation_steps == 0:
                optimizer.step()
                scheduler.step()
                optimizer.zero_grad()

        # 에폭 종료 후 평가
        if eval_dataset is not None:
            eval_loss = evaluate(args, model, tokenizer, eval_dataset)
            print(f"Epoch {epoch + 1} Evaluation Loss: {eval_loss:.4f}")

        # 로스 감소 시 체크포인트 저장
        if eval_loss < best_loss:
            best_loss = eval_loss
            save_checkpoint(model, args.output_dir, 'best_model.pt')

def save_checkpoint(model, save_path, filename):
    """ 모델 상태를 파일로 저장합니다. """
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    torch.save(model.state_dict(), os.path.join(save_path, filename))


In [None]:

logger = logging.getLogger(__name__)

def evaluate(args, model, tokenizer, eval_dataset, eval_when_training=False):
    # DataLoader 생성
    eval_sampler = SequentialSampler(eval_dataset)
    eval_dataloader = DataLoader(eval_dataset, sampler=eval_sampler, batch_size=args.eval_batch_size, num_workers=0)

    # Multi-GPU 설정
    if args.n_gpu > 1 and not eval_when_training:
        original_model = model
        model = torch.nn.DataParallel(model)

    # 모델 평가 모드 설정
    model.eval()
    total_loss = 0
    total_examples = 0

    # 평가 진행
    logger.info("***** Running evaluation *****")
    logger.info("Num examples = %d", len(eval_dataset))
    logger.info("Batch size = %d", args.eval_batch_size)
    for batch in eval_dataloader:
        batch = {k: v.to(args.device) for k, v in batch.items()}
        with torch.no_grad():
            outputs = model(**batch)
            loss = outputs.loss
            total_loss += loss.item()
            total_examples += batch['input_ids'].size(0)

    # 평균 손실 계산
    avg_loss = total_loss / total_examples

    # Multi-GPU 상태를 원래대로 복원
    if args.n_gpu > 1 and not eval_when_training:
        model = original_model

    # 결과 로깅
    logger.info("***** Eval results *****")
    logger.info("Total loss = %.4f", avg_loss)

    # 결과 반환
    result = {
        "eval_total_loss": avg_loss
    }
    return result


In [None]:


logger = logging.getLogger(__name__)

def test(args, model, tokenizer, test_dataset, best_threshold=0.5):
    # DataLoader 구성
    test_sampler = SequentialSampler(test_dataset)
    test_dataloader = DataLoader(test_dataset, sampler=test_sampler, batch_size=args.eval_batch_size, num_workers=0)

    # 다중 GPU 설정
    if args.n_gpu > 1:
        model = torch.nn.DataParallel(model)

    # 모델 평가 모드 설정
    model.eval()
    logger.info("***** Running Test *****")
    logger.info("Num examples = %d", len(test_dataset))
    logger.info("Batch size = %d", args.eval_batch_size)

    # 예측 및 실제 레이블 저장용 리스트
    cwe_type_preds = []
    cwe_type_trues = []

    # 테스트 데이터에 대한 평가
    for batch in test_dataloader:
        batch = {k: v.to(args.device) for k, v in batch.items()}
        with torch.no_grad():
            outputs = model(**batch)
            logits = outputs.logits
            predictions = torch.argmax(logits, dim=-1)
            cwe_type_preds.extend(predictions.cpu().numpy())
            cwe_type_trues.extend(batch['labels'].cpu().numpy())

    # 정확도 계산
    cwe_type_acc = accuracy_score(cwe_type_trues, cwe_type_preds)
    result_cwe = {"CWE Type Accuracy": cwe_type_acc}

    # 결과 로깅
    logger.info("***** CWE Type Classification Test Results *****")
    for key, value in result_cwe.items():
        logger.info(f"{key} = {value:.4f}")

    return result_cwe


In [None]:

def main():
    # Argument parsing
    parser = argparse.ArgumentParser(description="Train and evaluate the BERT model on CWE identification task")
    # Required parameters
    parser.add_argument("--use_logit_adjustment", action='store_true', help="Whether to use logit adjustment")
    # 여기에 다른 arguments 정의

    args = parser.parse_args()

    # Setup CUDA, GPU
    args.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    args.n_gpu = torch.cuda.device_count()
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    logger.info(f"Device: {args.device}, Number of GPUs: {args.n_gpu}")

    # Random seed 설정
    set_seed(args)

    # Load tokenizers and models
    tokenizer = RobertaTokenizer.from_pretrained(args.tokenizer_name)
    model = BERT(roberta=RobertaModel.from_pretrained(args.model_name_or_path), tokenizer=tokenizer, num_cwe_types=10, args=args)
    model.to(args.device)

    # 데이터셋 로딩
    train_dataset = TextDataset(args.train_data_file, tokenizer)
    eval_dataset = TextDataset(args.eval_data_file, tokenizer)

    # 훈련과 평가 실행
    if args.do_train:
        logger.info("Starting training...")
        train(args, model, train_dataset, eval_dataset)

    if args.do_eval:
        logger.info("Starting evaluation...")
        evaluate(args, model, eval_dataset)

    if args.do_test:
        logger.info("Starting testing...")
        test(args, model, tokenizer, test_dataset)

if __name__ == "__main__":
    main()


usage: colab_kernel_launcher.py [-h]
colab_kernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-b26f8e3a-14b2-4dd9-8f34-1721d5d576f9.json


SystemExit: 2