In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '4'




In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from transformers import AutoModel, AutoTokenizer





In [None]:
# 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name = "neulab/codebert-cpp"

# 모델 정의
class CodeBERTDPR(nn.Module):
    def __init__(self):
        super(CodeBERTDPR, self).__init__()
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.encoder = AutoModel.from_pretrained(model_name)

    def forward(self, input_ids, attention_mask):
        outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
        return outputs.pooler_output  # [CLS] 토큰의 출력 사용

# # 데이터셋 정의
# class CodePairsDataset(Dataset):
#     def __init__(self, tokenizer, code_pairs, labels):
#         self.tokenizer = tokenizer
#         self.code_pairs = code_pairs
#         self.labels = labels

#     def __len__(self):
#         return len(self.code_pairs)

#     def __getitem__(self, idx):
#         code_pair = self.code_pairs[idx]
#         label = self.labels[idx]

#         # 토큰화 및 인코딩
#         encoding = self.tokenizer(code_pair[0], code_pair[1], return_tensors='pt', padding='max_length', truncation=True, max_length=512)
#         input_ids = encoding['input_ids'].squeeze(0)
#         attention_mask = encoding['attention_mask'].squeeze(0)

#         return input_ids, attention_mask, label



In [None]:
class CodePairsDataset(Dataset):
    def __init__(self, root_dir, tokenizer):
        self.tokenizer = tokenizer
        self.samples = []
        self.root_dir = root_dir
        self._prepare_dataset()

    def _prepare_dataset(self):
        problem_dirs = [os.path.join(self.root_dir, d) for d in os.listdir(self.root_dir) if os.path.isdir(os.path.join(self.root_dir, d))]
        problem_files = {}

        for dir in problem_dirs:
            problem_id = os.path.basename(dir)
            cpp_files = [os.path.join(dir, "cpp", f) for f in os.listdir(os.path.join(dir, "cpp")) if f.endswith('.cpp')]
            problem_files[problem_id] = cpp_files

        for problem_id, files in problem_files.items():
            # 긍정적인 샘플 추가
            for i in range(len(files)):
                for j in range(i + 1, len(files)):
                    self.samples.append((files[i], files[j], 1))  # 긍정적인 쌍

            # 부정적인 샘플 추가
            other_problems = list(set(problem_files.keys()) - {problem_id})
            for file in files:
                other_problem_id = random.choice(other_problems)
                other_file = random.choice(problem_files[other_problem_id])
                self.samples.append((file, other_file, 0))  # 부정적인 쌍

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        file1, file2, label = self.samples[idx]

        with open(file1, 'r', encoding='utf-8') as f:
            text1 = f.read()
        
        with open(file2, 'r', encoding='utf-8') as f:
            text2 = f.read()

        # 토큰화 및 인코딩
        encoding = self.tokenizer(text1, text2, return_tensors='pt', padding='max_length', truncation=True, max_length=512)
        input_ids = encoding['input_ids'].squeeze(0)
        attention_mask = encoding['attention_mask'].squeeze(0)

        return input_ids, attention_mask, label

In [None]:
# 데이터 로더 준비 (가정된 예시 데이터)
code_pairs_example = [
    ("def sum(a, b): return a + b", "def add(x, y): return x + y"),  # 긍정적 예시
    ("def sum(a, b): return a + b", "def subtract(x, y): return x - y")  # 부정적 예시
]
labels_example = [1, 0]  # 1: 긍정적, 0: 부정적



In [None]:
root_dir = "/home/leadawon5/decs_jupyter_lab/gitfiles/DACON/code_similarity/bigdata/train_code" 
dataset = CodePairsDataset(root_dir, tokenizer)

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_name)
# dataset = CodePairsDataset(tokenizer, code_pairs_example, labels_example)
data_loader = DataLoader(dataset, batch_size=2, shuffle=True)

# 모델 학습
model = CodeBERTDPR().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)



In [None]:
model.train()


In [None]:
for epoch in range(1):  # 예시에서는 1 에포크로 제한
    for input_ids, attention_mask, labels in data_loader:
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels = labels.to(device)

        # 모델을 통해 각 코드 스니펫 인코딩
        embeddings = model(input_ids, attention_mask)

        # 유사도 계산
        similarities = torch.matmul(embeddings, embeddings.T)

        # 손실 계산 (여기서는 간단화를 위해 실제 DPR 손실과 다를 수 있음)
        loss = F.cross_entropy(similarities, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        print(f"Loss: {loss.item()}")



In [None]:
# 추론
with torch.no_grad():
    model.eval()
    # 두 개의 코드 스니펫을 준비합니다 (예시로 첫 번째와 두 번째 스니펫을 사용)
    input_ids_1, attention_mask_1, _ = dataset[0]  # 첫 번째 예시
    input_ids_2, attention_mask_2, _ = dataset[1]  # 두 번째 예시
    
    # 각 코드 스니펫에 대해 임베딩을 계산
    input_ids_1 = input_ids_1.unsqueeze(0).to(device)  # 배치 차원 추가
    attention_mask_1 = attention_mask_1.unsqueeze(0).to(device)
    embeddings_1 = model(input_ids_1, attention_mask_1)
    
    input_ids_2 = input_ids_2.unsqueeze(0).to(device)
    attention_mask_2 = attention_mask_2.unsqueeze(0).to(device)
    embeddings_2 = model(input_ids_2, attention_mask_2)
    
    # 두 임베딩 간의 내적을 계산하여 유사도 점수를 얻음
    similarity_score = torch.matmul(embeddings_1, embeddings_2.T)
    
    print(f"Similarity Score: {similarity_score.item()}")
