In [1]:
!echo "Mounting Google Drive..."
%cd /

from google.colab import drive
drive.mount('/content/drive')
# 包含訓練用資料的位置
%cd /content/drive/MyDrive/Fine_Tune

Mounting Google Drive...
/
Mounted at /content/drive
/content/drive/.shortcut-targets-by-id/1sQLENaetXcCQadN7tIi2zQFA70T85QHN/Fine_Tune


In [2]:
import torch
# 模型相關設定
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PRE_TRAINED_MODEL_NAME = 'microsoft/unixcoder-base'
FINE_TUNED_MODEL_PATH = './' + PRE_TRAINED_MODEL_NAME.replace("/", "-")



In [3]:
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModel
from sklearn.model_selection import GroupShuffleSplit
import numpy as np
import pandas as pd
from tqdm import tqdm
import os
import json
import random

num_layers = 4 # 選擇要用最後幾層的Output平均作為特徵
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class TripletDataset(Dataset):
    """
    三元組數據集，用於微調密集檢索模型
    """
    def __init__(self, data, code_id_to_code_map, tokenizer, max_length=512):
        """
        初始化數據集。
        :param data: 從 train_data_with_negatives.json 載入的列表。
        :param code_id_to_code_map: 從 code_id 到 code 內容的映射字典。
        :param tokenizer: HuggingFace 的 tokenizer。
        """
        self.data = data
        # 注意：這裡改成用 code_snippets.csv 作為負樣本來源，讓模型盡可能學習到真實情境
        self.code_id_to_code_map = code_id_to_code_map
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        """
        獲取一個數據樣本，包含一個查詢、一個正樣本和一個困難負樣本。
        """
        item = self.data[idx]
        anchor_text = item['query'] # 錨點(anchor)是查詢
        positive_code = item['positive_code'] # 正樣本(positive)是與查詢對應的正確程式碼

        # 從預先計算的困難負樣本列表中隨機選擇一個
        hard_negative_id = random.choice(item['hard_negative_ids'])
        negative_code = self.code_id_to_code_map[hard_negative_id]

        return anchor_text, positive_code, negative_code

def collate_fn(batch, tokenizer, max_length=512):
    """將 batch 的文字一次性 tokenizer，提高效率"""
    anchors, positives, negatives = zip(*batch)

    anchor_inputs = tokenizer(list(anchors), return_tensors='pt', truncation=True, padding='max_length', max_length=max_length)
    positive_inputs = tokenizer(list(positives), return_tensors='pt', truncation=True, padding='max_length', max_length=max_length)
    negative_inputs = tokenizer(list(negatives), return_tensors='pt', truncation=True, padding='max_length', max_length=max_length)

    return {
        'anchor': {key: val.to(DEVICE) for key, val in anchor_inputs.items()},
        'positive': {key: val.to(DEVICE) for key, val in positive_inputs.items()},
        'negative': {key: val.to(DEVICE) for key, val in negative_inputs.items()}
    }

def get_embedding(model, tokenizer, text, max_length=512):
    """輔助函式，用於獲取單個文本的嵌入向量"""
    inputs = tokenizer(text, return_tensors='pt', truncation=True, padding='max_length', max_length=max_length).to(DEVICE)
    with torch.no_grad():
        # 判斷模型是否為 Encoder-Decoder 架構
        if hasattr(model, 'get_encoder'):
            outputs = model.get_encoder()(**inputs, output_hidden_states=True)
        else:
            outputs = model(**inputs, output_hidden_states=True)
        hidden_states = outputs.hidden_states
        stacked_layers = torch.stack(hidden_states[-num_layers:])
        mean_last_layers = torch.mean(stacked_layers, dim=0)
        embedding = mean_last_layers.mean(dim=1)
    return embedding.cpu()

# 將anchor、positive、negative的token輸入模型，取多層hidden_state做平均
def get_layerwise_embeddings(model, batch_inputs, num_layers=num_layers):
    """
    batch_inputs: batch['anchor'] / batch['positive'] / batch['negative']
    num_layers: 取最後幾層做平均（作為文字的特徵）
    """
    # Check if the model has an encoder (i.e., is an encoder-decoder model)
    if hasattr(model, 'get_encoder'):
        outputs = model.get_encoder()(input_ids=batch_inputs['input_ids'],attention_mask=batch_inputs['attention_mask'], output_hidden_states=True)
    else:
        outputs = model(**batch_inputs, output_hidden_states=True)

    hidden_states = outputs.hidden_states  # tuple of all layers

    # 取最後 num_layers 層平均
    stacked_layers = torch.stack(hidden_states[-num_layers:])  # shape: (num_layers, batch_size, seq_len, hidden_size)
    mean_last_layers = torch.mean(stacked_layers, dim=0)      # shape: (batch_size, seq_len, hidden_size)

    # 對 token 平均 pooling，得到每個樣本的句子向量
    embeddings = mean_last_layers.mean(dim=1)  # shape: (batch_size, hidden_size)
    return embeddings


def evaluate_recall(model, tokenizer, val_df, corpus_df, cached_corpus_embeddings=None):
    model.eval()
    #  先計算全部的語料庫特徵
    if cached_corpus_embeddings is None:
        print("\nCreating cached embeddings for the corpus (code_snippets.csv)...")
        all_codes = list(corpus_df['code'])
        corpus_embeddings = []
        batch_size = 32
        for i in tqdm(range(0, len(all_codes), batch_size), desc="Corpus Embeddings"):
            batch_codes = all_codes[i:i+batch_size]
            inputs = tokenizer(batch_codes, return_tensors='pt', truncation=True, padding='max_length', max_length=512).to(DEVICE)
            with torch.no_grad():
                outputs = model(**inputs, output_hidden_states=True)
                hidden_states = outputs.hidden_states
                stacked_layers = torch.stack(hidden_states[-num_layers:])
                mean_last_layers = torch.mean(stacked_layers, dim=0)
                embeddings = mean_last_layers.mean(dim=1)
            corpus_embeddings.append(embeddings.cpu())
        corpus_embeddings = torch.cat(corpus_embeddings, dim=0)
    else:
        corpus_embeddings = cached_corpus_embeddings

    recall_at_10 = 0
    for _, row in tqdm(val_df.iterrows(), total=val_df.shape[0], desc="Evaluating Recall@10"):
        query = row['query']
        true_code_string = row['code']
        query_embedding = get_embedding(model, tokenizer, query)

        # 計算餘弦相似度
        scores = torch.nn.functional.cosine_similarity(query_embedding, corpus_embeddings)
        top_k_indices = torch.argsort(scores, descending=True)[:10]
        top_k_codes = corpus_df.iloc[top_k_indices]['code'].values
        if true_code_string in top_k_codes:
            recall_at_10 += 1
    return recall_at_10 / len(val_df), corpus_embeddings

class DenseRetriever:
    """密集檢索器"""
    def __init__(self, documents, model_name_or_path='microsoft/codebert-base', batch_size=32):
        """初始化密集檢索器"""
        self.documents = documents
        # 載入預訓練模型和斷詞器
        self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
        self.model = AutoModel.from_pretrained(model_name_or_path)
        self.model.to(DEVICE)
        self.model.eval() # 預設為評估模式
        self.batch_size = batch_size
        # 建立文件的嵌入向量（使用 batch 化）
        self.doc_embeddings = self._create_doc_embeddings()

    def _create_doc_embeddings(self):
        """建立所有文件的嵌入向量 (batch 化加速)"""
        all_codes = list(self.documents['code'])
        embeddings = []
        for i in tqdm(range(0, len(all_codes), self.batch_size), desc="Creating document embeddings"):
            batch_codes = all_codes[i:i+self.batch_size]
            inputs = self.tokenizer(batch_codes, return_tensors='pt', truncation=True, padding='max_length', max_length=512).to(DEVICE)
            with torch.no_grad():
                # Check if the model has an encoder (i.e., is an encoder-decoder model)
                if hasattr(self.model, 'get_encoder'):
                    outputs = self.model.get_encoder()(input_ids=inputs['input_ids'], attention_mask=inputs['attention_mask'], output_hidden_states=True)
                else:
                     outputs = self.model(**inputs, output_hidden_states=True)
                hidden_states = outputs.hidden_states
                stacked_layers = torch.stack(hidden_states[-num_layers:])
                mean_last_layers = torch.mean(stacked_layers, dim=0)
                batch_embeddings = mean_last_layers.mean(dim=1)
            embeddings.append(batch_embeddings.cpu().numpy())
        return np.vstack(embeddings)

    def retrieve(self, query, k=10):
        """根據查詢檢索文件"""
        # 對單一 query 編碼
        query_embedding = get_embedding(self.model, self.tokenizer, query).cpu().numpy()
        # 計算餘弦相似度
        scores = np.dot(self.doc_embeddings, query_embedding.T).flatten()
        top_k_indices = np.argsort(scores)[::-1][:k]
        top_k_scores = scores[top_k_indices]
        return top_k_indices, top_k_scores

def split_data(train_queries_df):
    # 90% 的code-query配對用於訓練，剩餘10%的query用於評估並對答案
    # 每個 code 是一個 group
    groups = train_queries_df['code']
    gss = GroupShuffleSplit(n_splits=1, test_size=0.1, random_state=42)
    train_idx, val_idx = next(gss.split(train_queries_df, groups=groups))
    train_df = train_queries_df.iloc[train_idx].reset_index(drop=True)
    val_df = train_queries_df.iloc[val_idx].reset_index(drop=True)
    return train_df, val_df


def fine_tune_with_hard_negatives(model, tokenizer, train_data, code_id_to_code_map, epochs=3, lr=2e-5, batch_size=8):
    """微調預訓練模型，使用困難負樣本。"""
    model.to(DEVICE)

    # 這是對訓練資料的準備，會產生每個樣本的anchor/positive/negative張量
    dataset = TripletDataset(train_data, code_id_to_code_map, tokenizer)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=lambda x: collate_fn(x, tokenizer))

    # 設定優化器和損失函數
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr) # 標準Transformer訓練用優化器
    loss_fn = torch.nn.TripletMarginLoss(margin=1.0) # 三元組損失，目標是讓anchor（查詢）與positive（正確答案之間的距離小於anchor與negative（錯誤答案）之間的距離，至少相差一個margin

    # 訓練模型
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for batch in tqdm(dataloader, desc=f"Epoch {epoch + 1}/{epochs}"):
            optimizer.zero_grad()

            anchor_embeddings = get_layerwise_embeddings(model, batch['anchor'])
            positive_embeddings = get_layerwise_embeddings(model, batch['positive'])
            negative_embeddings = get_layerwise_embeddings(model, batch['negative'])

            loss = loss_fn(anchor_embeddings, positive_embeddings, negative_embeddings)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch: {epoch+1}, Average Loss: {avg_loss:.4f}")

    return model

if __name__ == '__main__':
    # --- 1. 準備資料 ---
    print("--- Preparing Data for Hard Negative Mining ---")

    with open('train_data_with_negatives.json', 'r', encoding='utf-8') as f:
        train_data = json.load(f)

    code_snippets_df = pd.read_csv('code_snippets.csv')
    code_id_to_code_map = pd.Series(code_snippets_df.code.values, index=code_snippets_df.code_id).to_dict()

    print(f"Training on {len(train_data)} samples with pre-computed hard negatives.")

    # --- 2. 初始化模型 ---
    print("\n--- Initializing Model ---")
    model_name = PRE_TRAINED_MODEL_NAME
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name)

    # --- 3. 微調模型 ---
    print("\n--- Fine-tuning model with Hard Negative Mining ---")
    # 注意：這裡的 fine_tune_model 函數名為了清晰，我稍微修改了
    fine_tuned_model = fine_tune_with_hard_negatives(model, tokenizer, train_data, code_id_to_code_map, epochs=3, lr=2e-5, batch_size=8)

    # --- 4. 儲存模型 ---
    output_dir = FINE_TUNED_MODEL_PATH
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    print(f"\nSaving the final model to {output_dir}...")
    fine_tuned_model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)

    print("\n--- Model training with hard negatives complete. ---")


--- Preparing Data for Hard Negative Mining ---
Training on 500 samples with pre-computed hard negatives.

--- Initializing Model ---


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/691 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/504M [00:00<?, ?B/s]


--- Fine-tuning model with Hard Negative Mining ---


Epoch 1/3:   0%|          | 0/63 [00:00<?, ?it/s]

model.safetensors:   0%|          | 0.00/504M [00:00<?, ?B/s]

Epoch 1/3: 100%|██████████| 63/63 [02:06<00:00,  2.00s/it]


Epoch: 1, Average Loss: 0.0943


Epoch 2/3: 100%|██████████| 63/63 [02:11<00:00,  2.09s/it]


Epoch: 2, Average Loss: 0.0448


Epoch 3/3: 100%|██████████| 63/63 [02:11<00:00,  2.09s/it]


Epoch: 3, Average Loss: 0.0155

Saving the final model to ./microsoft-unixcoder-base...

--- Model training with hard negatives complete. ---


In [4]:

# dense_retrieval
import torch
from transformers import AutoTokenizer, AutoModel
import pandas as pd


# 選擇要測試的model
try_prtrained_model = False
try_fine_tuned_model = True


if __name__ == '__main__':
    # 1. 載入資料並準備驗證集
    print("--- 1. 載入資料並準備驗證集 ---")
    # 注意，此處用於測試的語料庫也來自於train_queries.csv，訓練時保留了10%的query沒有用於訓練
    train_queries_df = pd.read_csv('train_queries.csv')


    # 使用與微調腳本完全相同的分割方式
    _, val_df = split_data(train_queries_df)
    print(f"已載入 {len(val_df)} 筆樣本用於驗證。")

    # 2. 評估預訓練模型
    if try_prtrained_model:
        print("\n--- 2. 評估預訓練模型 ---")
        print(f"模型: {PRE_TRAINED_MODEL_NAME}")
        pretrained_tokenizer = AutoTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)
        pretrained_model = AutoModel.from_pretrained(PRE_TRAINED_MODEL_NAME).to(DEVICE)

        pretrained_recall, corpus_embeddings_pretrained = evaluate_recall(pretrained_model, pretrained_tokenizer, val_df, train_queries_df)
        print(f"\n預訓練模型 Recall@10: {pretrained_recall:.4f}")

    # 3. 評估微調後的模型
    if try_fine_tuned_model:
        print("\n--- 3. 評估微調後的模型 ---")
        print(f"模型: {FINE_TUNED_MODEL_PATH}")
        try:
            finetuned_tokenizer = AutoTokenizer.from_pretrained(FINE_TUNED_MODEL_PATH)
            finetuned_model = AutoModel.from_pretrained(FINE_TUNED_MODEL_PATH).to(DEVICE)

            # 微調後的模型需要重新計算語料庫的嵌入向量
            finetuned_recall, _ = evaluate_recall(finetuned_model, finetuned_tokenizer, val_df, train_queries_df)
            print(f"\n微調後模型 Recall@10: {finetuned_recall:.4f}")

        except OSError:
            print(f"錯誤: 在 '{FINE_TUNED_MODEL_PATH}' 找不到微調後的模型。")
            print("請先執行 'fine_tune_model.py' 來訓練並儲存模型。")

    print("\n--- 評估完成 ---")

--- 1. 載入資料並準備驗證集 ---
已載入 50 筆樣本用於驗證。

--- 3. 評估微調後的模型 ---
模型: ./microsoft-unixcoder-base

Creating cached embeddings for the corpus (code_snippets.csv)...


Corpus Embeddings: 100%|██████████| 16/16 [00:13<00:00,  1.23it/s]
Evaluating Recall@10: 100%|██████████| 50/50 [00:01<00:00, 30.38it/s]


微調後模型 Recall@10: 0.9600

--- 評估完成 ---





In [5]:
# 生成dense model的submission
import pandas as pd
import os
from tqdm import tqdm
def generate_submission(retriever, test_df, output_path, query_expansion=False):
    """
    Generates a submission file for a given retriever.
    """
    print(f"Generating submission for {output_path}...")
    results = []
    # 使用tqdm顯示進度條
    for _, row in tqdm(test_df.iterrows(), total=test_df.shape[0], desc=output_path):
        query_id = row['query_id']
        query = row['query']
        top_k_indices, _ = retriever.retrieve(query, k=10)

        # 直接使用檢索器內部儲存的 documents DataFrame 來獲取 code_id
        top_k_code_ids = retriever.documents.iloc[top_k_indices]['code_id'].tolist()

        results.append({
            'query_id': query_id,
            'code_id': ' '.join(map(str, top_k_code_ids))
        })

    submission_df = pd.DataFrame(results)
    submission_df.to_csv(output_path, index=False)
    print(f"Submission file saved to {output_path}")

if __name__ == '__main__':
    # --- 載入資料 ---
    print("Loading data...")
    code_snippets_df = pd.read_csv('code_snippets.csv')
    test_queries_df = pd.read_csv('test_queries.csv')


    # --- 密集模型 ---
    # 檢查微調後的模型是否存在
    finetuned_model_path = FINE_TUNED_MODEL_PATH

    # 預訓練的密集檢索器
    print("\nInitializing pre-trained dense model...")
    pretrained_retriever = DenseRetriever(code_snippets_df, model_name_or_path=PRE_TRAINED_MODEL_NAME)
    generate_submission(pretrained_retriever, test_queries_df, 'submission_pretrained.csv')

    if not os.path.exists(finetuned_model_path):
        print(f"\nFine-tuned model not found at '{finetuned_model_path}'.")
        print("Skipping submission generation for the fine-tuned model.")
    else:
        # 微調後的密集檢索器
        print("\nInitializing fine-tuned dense model...")
        finetuned_retriever = DenseRetriever(code_snippets_df, model_name_or_path=finetuned_model_path)
        generate_submission(finetuned_retriever, test_queries_df, 'submission_finetuned.csv')

    print("\nAll submission files have been generated.")

Loading data...

Initializing pre-trained dense model...


Creating document embeddings: 100%|██████████| 16/16 [00:13<00:00,  1.18it/s]


Generating submission for submission_pretrained.csv...


submission_pretrained.csv: 100%|██████████| 500/500 [00:14<00:00, 34.14it/s]


Submission file saved to submission_pretrained.csv

Initializing fine-tuned dense model...


Creating document embeddings: 100%|██████████| 16/16 [00:13<00:00,  1.21it/s]


Generating submission for submission_finetuned.csv...


submission_finetuned.csv: 100%|██████████| 500/500 [00:14<00:00, 33.89it/s]


Submission file saved to submission_finetuned.csv

All submission files have been generated.
