# 1. Downloading Llama & Preparing LoRA

First we check the GPU version available in the environment and install specific dependencies that are compatible with the detected GPU to prevent version conflicts.

In [1]:
%%capture
import torch
major_version, minor_version = torch.cuda.get_device_capability()
!pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
if major_version >= 8:
    !pip install --no-deps packaging ninja einops flash-attn xformers trl peft accelerate bitsandbytes
else:
    !pip install --no-deps xformers trl peft accelerate bitsandbytes
pass

Next we need to prepare to load a range of quantized language models, including a new 15 trillion token LLama-3 model, optimized for memory efficiency with 4-bit quantization.


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
from unsloth import FastLanguageModel
import torch
max_seq_length = 2048 # Choose any! Llama 3 is up to 8k
dtype = None
load_in_4bit = True # Use 4bit quantization to reduce memory usage. Can be False.

fourbit_models = [
    "unsloth/mistral-7b-bnb-4bit",
    "unsloth/mistral-7b-instruct-v0.2-bnb-4bit",
    "unsloth/llama-2-7b-bnb-4bit",
    "unsloth/gemma-7b-bnb-4bit",
    "unsloth/gemma-7b-it-bnb-4bit",
    "unsloth/gemma-2b-bnb-4bit",
    "unsloth/gemma-2b-it-bnb-4bit",
    "unsloth/llama-3-8b-bnb-4bit",
]

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/llama-3-8b-bnb-4bit", # Llama-3 70b also works (just change the model name)
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
    # token = "hf_...", # use one if using gated models like meta-llama/Llama-2-7b-hf
)

🦥 Unsloth: Will patch your computer to enable 2x faster free finetuning.
🦥 Unsloth Zoo will now patch everything to make training faster!
==((====))==  Unsloth 2024.12.4: Fast Llama patching. Transformers:4.46.3.
   \\   /|    GPU: Tesla T4. Max memory: 14.748 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.5.1+cu121. CUDA: 7.5. CUDA Toolkit: 12.1. Triton: 3.1.0
\        /    Bfloat16 = FALSE. FA [Xformers = 0.0.28.post3. FA2 = False]
 "-____-"     Free Apache license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


In [4]:
model = FastLanguageModel.get_peft_model(
    model,
    r = 16, # Choose any number > 0 ! Suggested 8, 16, 32, 64, 128
    target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
                      "gate_proj", "up_proj", "down_proj",],
    lora_alpha = 16,
    lora_dropout = 0,
    bias = "none",
    use_gradient_checkpointing = "unsloth",
    random_state = 3407,
    use_rslora = False,
    loftq_config = None,
)

Unsloth 2024.12.4 patched 32 layers with 32 QKV layers, 32 O layers and 32 MLP layers.


### Understanding Model

In [5]:
# def print_model_structure(model, indent=0):
#     """
#     모델의 내부 구조를 계층적으로 출력하는 함수

#     Args:
#         model: PyTorch 모델
#         indent: 들여쓰기 레벨
#     """
#     tab = '  ' * indent

#     for name, module in model.named_children():
#         print(f"{tab}{name}: ({module.__class__.__name__})")

#         if "Attention" in module.__class__.__name__:
#             print(f"{tab}  → Attention Layer Found!")
#             if hasattr(module, 'num_heads'):
#                 print(f"{tab}    - Number of heads: {module.num_heads}")
#             if hasattr(module, 'head_dim'):
#                 print(f"{tab}    - Head dimension: {module.head_dim}")

#         elif "Transformer" in module.__class__.__name__:
#             print(f"{tab}  → Transformer Block Found!")

#         if len(list(module.children())) > 0:
#             print_model_structure(module, indent + 1)

# def analyze_lora_layers(model):
#     """
#     LoRA 레이어의 상세 정보를 분석하는 함수
#     """
#     print("\n=== LoRA Layer Analysis ===")
#     for name, module in model.named_modules():
#         if hasattr(module, 'lora_A'):
#             print(f"\nLayer: {name}")

#             # LoRA A 행렬 정보
#             if isinstance(module.lora_A, dict):
#                 for adapter_name, lora_A in module.lora_A.items():
#                     print(f"Adapter: {adapter_name}")
#                     if hasattr(lora_A, 'weight'):
#                         shape = lora_A.weight.shape
#                         print(f"  Shape (A): {shape}")
#             else:
#                 if hasattr(module.lora_A, 'weight'):
#                     shape = module.lora_A.weight.shape
#                     print(f"  Shape (A): {shape}")

#             # 기본 레이어 정보
#             if hasattr(module, 'in_features'):
#                 print(f"  Input features: {module.in_features}")
#             if hasattr(module, 'out_features'):
#                 print(f"  Output features: {module.out_features}")

# # 기본 모델 정보 출력
# print("=== Model Information ===")
# print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")
# print(f"Trainable parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

# # 모델 구조 출력
# print("\n=== Model Structure ===")
# print_model_structure(model)

# # LoRA 레이어 분석
# analyze_lora_layers(model)

### Adding Attention Layer

In [6]:
# import torch
# from torch import nn
# import math

# class CustomAttention(nn.Module):
#     def __init__(self, config):
#         super().__init__()
#         self.hidden_size = config.hidden_size
#         self.num_heads = config.num_attention_heads
#         self.head_dim = config.hidden_size // config.num_attention_heads
#         self.scaling = self.head_dim ** -0.5

#         # 주요 프로젝션 레이어들
#         self.q_proj = nn.Linear(config.hidden_size, config.hidden_size)
#         self.k_proj = nn.Linear(config.hidden_size, config.hidden_size)
#         self.v_proj = nn.Linear(config.hidden_size, config.hidden_size)
#         self.o_proj = nn.Linear(config.hidden_size, config.hidden_size)

#         self.rotary_emb = model.base_model.model.model.layers[0].self_attn.rotary_emb

#     def forward(self, hidden_states, attention_mask=None, position_ids=None):
#         batch_size, seq_length, _ = hidden_states.shape

#         # 프로젝션 수행
#         query_states = self.q_proj(hidden_states)
#         key_states = self.k_proj(hidden_states)
#         value_states = self.v_proj(hidden_states)

#         # 헤드 차원으로 재구성
#         query_states = query_states.view(batch_size, seq_length, self.num_heads, self.head_dim).transpose(1, 2)
#         key_states = key_states.view(batch_size, seq_length, self.num_heads, self.head_dim).transpose(1, 2)
#         value_states = value_states.view(batch_size, seq_length, self.num_heads, self.head_dim).transpose(1, 2)

#         # RoPE (Rotary Position Embedding) 적용
#         query_states, key_states = self.rotary_emb(query_states, key_states, position_ids)

#         # Attention 계산
#         attention_scores = torch.matmul(query_states, key_states.transpose(2, 3)) * self.scaling

#         if attention_mask is not None:
#             attention_scores = attention_scores + attention_mask

#         attention_probs = torch.softmax(attention_scores, dim=-1)

#         # Value와 결합하여 최종 출력 계산
#         hidden_states = torch.matmul(attention_probs, value_states)
#         hidden_states = hidden_states.transpose(1, 2).contiguous()
#         hidden_states = hidden_states.view(batch_size, seq_length, self.hidden_size)

#         # 최종 프로젝션
#         hidden_states = self.o_proj(hidden_states)

#         return hidden_states

# def add_attention_layer(model):
#     """
#     모델의 앞부분에 새로운 attention layer를 추가합니다.
#     """
#     config = model.base_model.model.model.config
#     new_attention = CustomAttention(config)

#     # 모델의 기존 레이어들을 임시 저장
#     original_layers = model.base_model.model.model.layers

#     # 새로운 ModuleList 생성
#     new_layers = nn.ModuleList([new_attention])
#     new_layers.extend(original_layers)

#     # 모델의 layers를 새로운 ModuleList로 교체
#     model.base_model.model.model.layers = new_layers

#     return model

# # 사용 예시:
# # model = add_attention_layer(model)

# 2. Dataset Preprocessing

In [None]:
# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
import json
import os
import random
import nltk
from datasets import Dataset, DatasetDict
from nltk.corpus import wordnet

EOS_TOKEN = tokenizer.eos_token # do not forget this part!

# Download required NLTK data
nltk.download('wordnet')

def clean_text(text):
    """Clean vertically split text if present"""
    if isinstance(text, list):
        text = '\n'.join(text)
    if '\n' in text and all(len(line.strip()) == 1 for line in text.split('\n') if line.strip()):
        return ''.join(c for c in text if not c.isspace())
    return text

def get_synonyms(word):
    """Get list of synonyms for a word using WordNet"""
    synonyms = []
    for syn in wordnet.synsets(word):
        for lemma in syn.lemmas():
            if lemma.name() != word:
                synonyms.append(lemma.name())
    return list(set(synonyms))

def augment_scene(scene):
    """Augment scene text by replacing words with synonyms"""
    words = scene.split()
    augmented = []
    for word in words:
        if random.random() < 0.1:  # 10% probability of synonym replacement
            synonyms = get_synonyms(word)
            if synonyms:
                augmented.append(random.choice(synonyms))
            else:
                augmented.append(word)
        else:
            augmented.append(word)
    return " ".join(augmented)

def create_context(session_data, p_scene=0.8, p_attr=0.7, p_relations=0.6):
    """Create context with probabilistic inclusion of different components"""
    context = ""

    if random.random() < p_scene:
        context += f"Scene:\n{clean_text(session_data['scene'])}\n\n"

    if random.random() < p_attr:
        context += "Character Information:\n"
        for speaker in session_data["speakers"]:
            if speaker in session_data["attributes"]:
                attrs = session_data["attributes"][speaker]
                selected_attrs = random.sample(list(attrs.items()),
                                            k=random.randint(2, len(attrs)))
                context += f"{speaker}:\n"
                for key, value in selected_attrs:
                    if value and value != "None":
                        context += f"- {key}: {value}\n"

    if random.random() < p_relations:
        relations = session_data.get("relations with Harry", {})
        if relations:
            context += "\nRelations:\n"
            for person, rel in relations.items():
                selected_rels = random.sample(list(rel.items()),
                                           k=random.randint(1, len(rel)))
                for key, value in selected_rels:
                    if isinstance(value, (int, float)) and value != 0:
                        context += f"{person} - {key}: {value}\n"

    return context

def create_dialogue_variations(dialogues, max_history=3):
    """Create variations of dialogue history"""
    examples = []
    for i in range(len(dialogues) - 1):
        history_length = random.randint(1, min(i+1, max_history))
        selected_history = dialogues[max(0, i-history_length+1):i+1]

        example = {
            "previous_dialogue": "\n".join(selected_history),
            "next_dialogue": dialogues[i + 1]
        }
        examples.append(example)
    return examples

def create_augmented_examples(session_data):
    """Create augmented examples from session data"""
    examples = []
    base_dialogues = session_data["dialogue"]

    for _ in range(3):  # Create 3 variations per session
        context = create_context(session_data)
        augmented_scene = augment_scene(clean_text(session_data["scene"]))
        dialogue_variations = create_dialogue_variations(base_dialogues)

        for variation in dialogue_variations:
            example = {
                "instruction": f"Given the following context and previous dialogue, "
                             f"generate the next line of dialogue:",
                "input": context + "\nScene:\n" + augmented_scene +
                        "\n\nPrevious Dialogue:\n" + variation["previous_dialogue"],
                "output": variation["next_dialogue"]
            }
            examples.append(example)

    return examples

def convert_to_alpaca(json_data):
    """Convert dataset to Alpaca format with augmentation"""
    alpaca_data = []

    if isinstance(json_data, dict):
        for session_key, session_data in json_data.items():
            session_examples = create_augmented_examples(session_data)
            alpaca_data.extend(session_examples)
    elif isinstance(json_data, list):
        for session_data in json_data:
            session_examples = create_augmented_examples(session_data)
            alpaca_data.extend(session_examples)

    return alpaca_data

def formatting_prompts_func(examples):
    """Format examples in Alpaca prompt style"""
    alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""

    instructions = examples["instruction"]
    inputs = examples["input"]
    outputs = examples["output"]
    texts = []

    for instruction, input, output in zip(instructions, inputs, outputs):
        text = alpaca_prompt.format(instruction, input, output) + EOS_TOKEN
        texts.append(text)

    return {"text": texts}

# Main execution
def process_dataset(train_path, test_path, output_train_path, output_test_path, formatted_dataset_path):
    # Process training data
    print(f"Processing training data from: {train_path}")
    with open(train_path, 'r', encoding='utf-8') as f:
        train_data = json.load(f)
    train_formatted = convert_to_alpaca(train_data)

    # Process test data
    print(f"Processing test data from: {test_path}")
    with open(test_path, 'r', encoding='utf-8') as f:
        test_data = json.load(f)
    test_formatted = convert_to_alpaca(test_data)

    # Save intermediate results
    print(f"Saving training data to: {output_train_path}")
    with open(output_train_path, 'w', encoding='utf-8') as f:
        json.dump(train_formatted, f, ensure_ascii=False, indent=2)

    print(f"Saving test data to: {output_test_path}")
    with open(output_test_path, 'w', encoding='utf-8') as f:
        json.dump(test_formatted, f, ensure_ascii=False, indent=2)

    # Create and save formatted dataset
    dataset = DatasetDict({
        'train': Dataset.from_list(train_formatted),
        'test': Dataset.from_list(test_formatted)
    })

    formatted_dataset = dataset.map(
        formatting_prompts_func,
        batched=True,
        remove_columns=dataset['train'].column_names
    )

    # Remove existing directory if it exists
    if os.path.exists(formatted_dataset_path):
        shutil.rmtree(formatted_dataset_path)

    # Save formatted dataset
    formatted_dataset.save_to_disk(formatted_dataset_path)

    print(f"Augmented dataset conversion complete! Training entries: {len(train_formatted)}, Test entries: {len(test_formatted)}")

    return formatted_dataset

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [6]:
# File paths
train_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Dialogue Dataset/en_train_set.json'
test_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Dialogue Dataset/en_test_set.json'
output_train_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_train_set.json'
output_test_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_test_set.json'
formatted_dataset_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_formatted_dataset'

# Process the dataset
formatted_dataset = process_dataset(
    train_path,
    test_path,
    output_train_path,
    output_test_path,
    formatted_dataset_path
)

Processing training data from: /content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Dialogue Dataset/en_train_set.json
Processing test data from: /content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Dialogue Dataset/en_test_set.json
Saving training data to: /content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_train_set.json
Saving test data to: /content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_test_set.json


Map:   0%|          | 0/44166 [00:00<?, ? examples/s]

Map:   0%|          | 0/2601 [00:00<?, ? examples/s]

Saving the dataset (0/2 shards):   0%|          | 0/44166 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/2601 [00:00<?, ? examples/s]

Augmented dataset conversion complete! Training entries: 44166, Test entries: 2601


In [7]:
print(formatted_dataset["train"][0]["text"])

Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
Given the following context and previous dialogue, generate the next line of dialogue:

### Input:
Scene:
“Up! Get up! Now!”
Harry woke with a start. His aunt rapped on the door again.
“Up!” she screeched. Harry heard her walking toward the kitchen and then the sound of the frying pan being put on the stove. He rolled onto his back and tried to remember the dream he had been having. It had been a good one. There had been a flying motorcycle in it. He had a funny feeling he’d had the same dream before.
His aunt was back outside the door.
“Are you up yet?” she demanded.
“Nearly,” said Harry.
“Well, get a move on, I want you to look after the bacon. And don’t you dare let it burn, I want everything perfect on Duddy’s birthday.”
Harry groaned.
“What did you say?” his aunt snapped through the door.
“Nothing, nothing .

---

# 3. Fine-Tuning the Model

In [8]:
import json

output_train_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_train_set.json'
output_test_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_test_set.json'
formatted_dataset_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_formatted_dataset'

with open(output_train_path, 'r', encoding='utf-8') as f:
    train_data = json.load(f)

with open(output_test_path, 'r', encoding='utf-8') as f:
    test_data = json.load(f)

# 나중에 데이터셋을 다시 로드할 때는 다음과 같이 사용할 수 있습니다:
from datasets import load_from_disk
formatted_dataset = load_from_disk(formatted_dataset_path)

In [9]:
# from trl import SFTTrainer
# from transformers import TrainingArguments
# from datasets import Dataset

# # Training Arguments 설정
# training_args = TrainingArguments(
#     per_device_train_batch_size=2,
#     per_device_eval_batch_size=2,  # 평가를 위한 batch size 추가
#     gradient_accumulation_steps=4,
#     warmup_steps=10,
#     max_steps=100, # increase this to make the model learn "better"
#     learning_rate=2e-4,
#     fp16=not torch.cuda.is_bf16_supported(),
#     bf16=torch.cuda.is_bf16_supported(),
#     logging_steps=1,
#     optim="adamw_8bit",
#     weight_decay=0.01,
#     lr_scheduler_type="linear",
#     seed=3407,
#     output_dir="outputs",
#     # 평가 관련 설정 추가
#     evaluation_strategy="steps",    # "steps" 또는 "epoch"
#     eval_steps=20,                 # 20 스텝마다 평가
#     save_strategy="steps",
#     save_steps=20,                 # 20 스텝마다 모델 저장
#     save_total_limit=3,           # 최대 3개의 체크포인트만 저장
#     load_best_model_at_end=True,  # 학습 완료 후 가장 좋은 모델 로드
#     metric_for_best_model="eval_loss",  # 어떤 메트릭으로 best model을 결정할지
# )

# # train 데이터셋만 가져와서 분할
# train_eval_dataset = formatted_dataset['train']

# # train_test_split으로 분할 (예: 80% train, 20% evaluation)
# split_dataset = train_eval_dataset.train_test_split(
#     test_size=0.2,  # 20%를 evaluation set으로 사용
#     shuffle=True,   # 데이터 섞기
#     seed=3407      # 재현성을 위한 시드 설정
# )

# # Trainer 설정
# trainer = SFTTrainer(
#     model=model,
#     tokenizer=tokenizer,
#     train_dataset=split_dataset['train'],      # 분할된 train set
#     eval_dataset=split_dataset['test'],        # 분할된 evaluation set
#     dataset_text_field="text",
#     max_seq_length=max_seq_length,
#     dataset_num_proc=2,
#     packing=False,
#     args=training_args,
# )


from trl import SFTTrainer
from transformers import TrainingArguments
from datasets import Dataset

# 전체 데이터셋에서 더 작은 평가 세트를 만듦
train_eval_dataset = formatted_dataset['train']
split_dataset = train_eval_dataset.train_test_split(
    test_size=0.2,
    shuffle=True,
    seed=3407
)

# 평가 데이터셋의 크기를 제한
max_eval_samples = 500  # 평가에 사용할 최대 샘플 수
eval_dataset = split_dataset['test'].select(range(min(len(split_dataset['test']), max_eval_samples)))

# Training Arguments 설정
training_args = TrainingArguments(
    per_device_train_batch_size=2,
    per_device_eval_batch_size=4,  # 평가 배치 크기를 더 크게 설정
    gradient_accumulation_steps=4,
    warmup_steps=10,
    max_steps=100,
    learning_rate=2e-4,
    fp16=not torch.cuda.is_bf16_supported(),
    bf16=torch.cuda.is_bf16_supported(),
    logging_steps=1,
    optim="adamw_8bit",
    weight_decay=0.01,
    lr_scheduler_type="linear",
    seed=3407,
    output_dir="outputs",
    # 평가 관련 설정 수정
    evaluation_strategy="steps",
    eval_steps=50,  # 평가 빈도를 줄임 (20 -> 50)
    save_strategy="steps",
    save_steps=50,  # 저장 빈도도 함께 조정
    save_total_limit=3,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
)

# Trainer 설정
trainer = SFTTrainer(
    model=model,
    tokenizer=tokenizer,
    train_dataset=split_dataset['train'],
    eval_dataset=eval_dataset,  # 크기가 제한된 평가 데이터셋 사용
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    dataset_num_proc=2,
    packing=False,
    args=training_args,
)



Map (num_proc=2):   0%|          | 0/500 [00:00<?, ? examples/s]

max_steps is given, it will override any value given in num_train_epochs


In [7]:
#@title Show current memory stats
gpu_stats = torch.cuda.get_device_properties(0)
start_gpu_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3)
max_memory = round(gpu_stats.total_memory / 1024 / 1024 / 1024, 3)
print(f"GPU = {gpu_stats.name}. Max memory = {max_memory} GB.")
print(f"{start_gpu_memory} GB of memory reserved.")

GPU = Tesla T4. Max memory = 14.748 GB.
5.605 GB of memory reserved.


In [None]:
# # We're now kicking off the actual training of our model, which will spit out some statistics showing us how well it learns
# trainer_stats = trainer.train()

In [10]:
# 학습 시작
trainer_stats = trainer.train()

# 학습 완료 후 평가
eval_results = trainer.evaluate()
print(f"Evaluation results: {eval_results}")

# 모델 저장
trainer.save_model("final_model")

==((====))==  Unsloth - 2x faster free finetuning | Num GPUs = 1
   \\   /|    Num examples = 35,332 | Num Epochs = 1
O^O/ \_/ \    Batch size per device = 2 | Gradient Accumulation steps = 4
\        /    Total batch size = 8 | Total steps = 100
 "-____-"     Number of trainable parameters = 41,943,040
[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Currently logged in as: [33moverjoy1008[0m ([33moverjoy1008-korea-university[0m). Use [1m`wandb login --relogin`[0m to force relogin


Step,Training Loss,Validation Loss
50,1.4562,1.3662
100,1.2395,1.282975


Evaluation results: {'eval_loss': 1.2829750776290894, 'eval_runtime': 1013.767, 'eval_samples_per_second': 0.493, 'eval_steps_per_second': 0.123, 'epoch': 0.022642363862787274}


In [None]:
# model.save_pretrained("lora_model") # Local saving
model.push_to_hub("Overjoy1008/harry_potter_llama3_lora_100", token = "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") # Online saving

#4. Prompt Tuning

In [None]:
from transformers import AutoTokenizer, PreTrainedModel, TrainingArguments
from trl import SFTTrainer
import torch
import torch.nn as nn
from dataclasses import dataclass
from typing import Optional, Tuple
from transformers.trainer_utils import get_last_checkpoint
import os

# train 데이터셋만 가져와서 분할
train_eval_dataset = formatted_dataset['train']

# train_test_split으로 분할 (예: 80% train, 20% evaluation)
split_dataset = train_eval_dataset.train_test_split(
    test_size=0.2,  # 20%를 evaluation set으로 사용
    shuffle=True,   # 데이터 섞기
    seed=3407      # 재현성을 위한 시드 설정
)

import warnings
warnings.filterwarnings("ignore")

@dataclass
class PromptTuningConfig:
    num_virtual_tokens: int = 20
    initialization_method: str = "random"
    prompt_tuning_init_text: Optional[str] = None

class PromptEmbedding(nn.Module):
    def __init__(self, config: PromptTuningConfig, model: PreTrainedModel, tokenizer: AutoTokenizer):
        super().__init__()
        self.config = config
        self.embedding_dim = model.config.hidden_size
        self.device = next(model.parameters()).device

        # Get the base LlamaModel
        llama_model = model
        while not hasattr(llama_model, 'embed_tokens'):
            if hasattr(llama_model, 'base_model'):
                llama_model = llama_model.base_model
            else:
                raise AttributeError("Could not find embed_tokens in model")

        self.llama_model = llama_model

        if config.initialization_method == "random":
            self.prompt_embeddings = nn.Parameter(
                torch.randn(config.num_virtual_tokens, self.embedding_dim).to(self.device)
            )
        elif config.initialization_method == "vocabulary":
            init_text = config.prompt_tuning_init_text
            if init_text is None:
                init_text = "This is a story about Harry Potter:"

            tokens = tokenizer(init_text, return_tensors="pt").input_ids.to(self.device)
            token_embeddings = self.llama_model.embed_tokens(tokens)

            if token_embeddings.size(1) > config.num_virtual_tokens:
                self.prompt_embeddings = nn.Parameter(
                    token_embeddings[0, :config.num_virtual_tokens, :]
                )
            else:
                padding = torch.randn(
                    config.num_virtual_tokens - token_embeddings.size(1),
                    self.embedding_dim,
                    device=self.device
                )
                self.prompt_embeddings = nn.Parameter(
                    torch.cat([token_embeddings[0], padding], dim=0)
                )

    def forward(self, batch_size: int):
        return self.prompt_embeddings.repeat(batch_size, 1, 1)

class PromptTunedModel(nn.Module):
    def __init__(self, base_model: PreTrainedModel, prompt_embedding: PromptEmbedding):
        super().__init__()
        self.base_model = base_model
        self.prompt_embedding = prompt_embedding
        self.config = base_model.config

        # Freeze base model parameters
        for param in self.base_model.parameters():
            param.requires_grad = False

        self.device = next(base_model.parameters()).device

    def forward(self, input_ids=None, attention_mask=None, labels=None, inputs_embeds=None, **kwargs):
        # Handle input embeddings
        if input_ids is not None and inputs_embeds is None:
            input_ids = input_ids.to(self.device)
            batch_size = input_ids.size(0)
            prompt_embeds = self.prompt_embedding(batch_size)
            inputs_embeds = self.prompt_embedding.llama_model.embed_tokens(input_ids)
            combined_embeds = torch.cat([prompt_embeds, inputs_embeds], dim=1)
        elif inputs_embeds is not None:
            combined_embeds = inputs_embeds
            batch_size = inputs_embeds.size(0)
        else:
            raise ValueError("Either input_ids or inputs_embeds should be provided")

        # Handle attention mask
        if attention_mask is not None:
            attention_mask = attention_mask.to(self.device)
            prompt_attention_mask = torch.ones(
                batch_size,
                self.prompt_embedding.config.num_virtual_tokens,
                device=self.device
            )
            combined_attention_mask = torch.cat(
                [prompt_attention_mask, attention_mask], dim=1
            )
        else:
            combined_attention_mask = None

        # Handle labels by adding padding for prompt tokens
        if labels is not None:
            labels = labels.to(self.device)
            # Create padding labels for the prompt tokens (using -100 to ignore in loss calculation)
            prompt_labels = torch.full(
                (batch_size, self.prompt_embedding.config.num_virtual_tokens),
                -100,
                device=self.device,
                dtype=labels.dtype
            )
            # Concatenate the padding labels with the actual labels
            labels = torch.cat([prompt_labels, labels], dim=1)

        # Forward all arguments to the base model
        model_kwargs = {
            'inputs_embeds': combined_embeds,
            'attention_mask': combined_attention_mask,
            'labels': labels,
            **kwargs  # Pass through any additional kwargs
        }

        # Remove None values
        model_kwargs = {k: v for k, v in model_kwargs.items() if v is not None}

        outputs = self.base_model(**model_kwargs)

        return outputs

    def get_input_embeddings(self):
        """Return the base model's input embeddings layer"""
        return self.prompt_embedding.llama_model.embed_tokens

    def set_input_embeddings(self, value):
        """Set the base model's input embeddings layer"""
        self.prompt_embedding.llama_model.embed_tokens = value

    def get_output_embeddings(self):
        """Return the base model's output embeddings layer"""
        return self.base_model.get_output_embeddings()

    def prepare_inputs_for_generation(self, *args, **kwargs):
        """Prepare inputs for generation"""
        return self.base_model.prepare_inputs_for_generation(*args, **kwargs)

# 모델 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

prompt_config = PromptTuningConfig(
    num_virtual_tokens=20,
    initialization_method="vocabulary",
    prompt_tuning_init_text="This is a story about Harry Potter:"
)

prompt_embedding = PromptEmbedding(prompt_config, model, tokenizer)
prompt_tuned_model = PromptTunedModel(model, prompt_embedding)
prompt_tuned_model = prompt_tuned_model.to(device)

# Training Arguments 설정
training_args = TrainingArguments(
    per_device_train_batch_size=2,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=4,
    warmup_steps=10,
    max_steps=100,
    learning_rate=1e-3,
    fp16=not torch.cuda.is_bf16_supported(),
    bf16=torch.cuda.is_bf16_supported(),
    logging_steps=1,
    optim="adamw_8bit",
    weight_decay=0.00,
    lr_scheduler_type="linear",
    seed=3407,
    output_dir="prompt_tuning_outputs",
    evaluation_strategy="steps",
    eval_steps=50,
    save_strategy="steps",
    save_steps=50,
    save_total_limit=3,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
)

# Trainer 설정
trainer = SFTTrainer(
    model=prompt_tuned_model,
    tokenizer=tokenizer,
    train_dataset=split_dataset['train'],
    eval_dataset=split_dataset['test'],
    dataset_text_field="text",
    max_seq_length=max_seq_length,
    dataset_num_proc=2,
    packing=False,
    args=training_args,
)

# 학습 시작
trainer.train()

max_steps is given, it will override any value given in num_train_epochs
==((====))==  Unsloth - 2x faster free finetuning | Num GPUs = 1
   \\   /|    Num examples = 35,332 | Num Epochs = 1
O^O/ \_/ \    Batch size per device = 2 | Gradient Accumulation steps = 4
\        /    Total batch size = 8 | Total steps = 100
 "-____-"     Number of trainable parameters = 81,920


Step,Training Loss,Validation Loss


In [None]:
# 모델 구조 더 자세히 확인
print("\nDetailed model inspection:")
model_to_check = model.base_model.base_model
print(f"Type: {type(model_to_check)}")
print(f"Available attributes: {dir(model_to_check)}")


Detailed model inspection:
Type: <class 'transformers.models.llama.modeling_llama.LlamaModel'>


In [None]:
#@title Show final memory and time stats
used_memory = round(torch.cuda.max_memory_reserved() / 1024 / 1024 / 1024, 3)
used_memory_for_lora = round(used_memory - start_gpu_memory, 3)
used_percentage = round(used_memory         /max_memory*100, 3)
lora_percentage = round(used_memory_for_lora/max_memory*100, 3)
print(f"{trainer_stats.metrics['train_runtime']} seconds used for training.")
print(f"{round(trainer_stats.metrics['train_runtime']/60, 2)} minutes used for training.")
print(f"Peak reserved memory = {used_memory} GB.")
print(f"Peak reserved memory for training = {used_memory_for_lora} GB.")
print(f"Peak reserved memory % of max memory = {used_percentage} %.")
print(f"Peak reserved memory for training % of max memory = {lora_percentage} %.")

4770.1562 seconds used for training.
79.5 minutes used for training.
Peak reserved memory = 8.439 GB.
Peak reserved memory for training = 2.834 GB.
Peak reserved memory % of max memory = 57.221 %.
Peak reserved memory for training % of max memory = 19.216 %.


# 4. Using, Saving, Loading the LoRA Model

In [None]:
if True:
    from unsloth import FastLanguageModel
    model, tokenizer = FastLanguageModel.from_pretrained(
        model_name = "lora_model", # YOUR MODEL YOU USED FOR TRAINING
        max_seq_length = max_seq_length,
        dtype = dtype,
        load_in_4bit = load_in_4bit,
    )
    FastLanguageModel.for_inference(model)

In [13]:
# 저장할 경로 설정
save_directory = "/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Harry Potter LoRA 100"

# 모델 저장
model.save_pretrained(save_directory)

# 토크나이저 저장
tokenizer.save_pretrained(save_directory)

# 학습 상태(config) 저장 - 선택사항
trainer.save_state()

import os
print("저장된 파일들:")
for file in os.listdir(save_directory):
    print(f"- {file}")

저장된 파일들:
- README.md
- adapter_model.safetensors
- adapter_config.json
- tokenizer_config.json
- special_tokens_map.json
- tokenizer.json


In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer

# 저장된 경로
save_directory = "/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Harry Potter LoRA"

# 모델과 토크나이저 불러오기
model = AutoModelForCausalLM.from_pretrained(save_directory)
tokenizer = AutoTokenizer.from_pretrained(save_directory)

# GPU로 모델 이동 (if available)
model = model.to("cuda")

<a name="Inference"></a>
### Inference
Let's run the model! You can change the instruction and input - leave the output blank!

 You can also use a `TextStreamer` for continuous inference - so you can see the generation token by token, instead of waiting the whole time!

In [None]:
test_cases = [
  {
    "instruction": "Given the following scene, character attributes, and previous dialogue, generate the next line of dialogue between Harry and Snape:",
    "input": "Scene:\nThe dungeon was darker than usual, filled with weird-colored smoke from everyone's cauldrons. Snape's lip curled when he saw Harry's watery potion.\n\"Potter, what is this supposed to be?\"\n\nCharacter Information:\nSnape:\n- name: Severus Snape\n- title: Potions Master\n- character: Sarcastic, strict, bitter\n- affiliation: Hogwarts Professor\n\nHarry:\n- name: Harry Potter\n- age: 11\n- affiliation: Gryffindor student\n\nPrevious Dialogue:\nSnape: Potter, what is this supposed to be?",
    "expected_output": "Harry: The Draught of Living Death, sir."
  },
  {
    "instruction": "Given the following scene, character attributes, and previous dialogue, generate the next line of dialogue between Hermione and Ron:",
    "input": "Scene:\nThe library was quiet except for the rustling of pages. Hermione was surrounded by a tower of books about House-elves' rights, while Ron looked on with disbelief.\n\"Honestly, Hermione, they like working!\"\n\nCharacter Information:\nHermione:\n- name: Hermione Granger\n- character: Passionate about justice, intelligent\n- affiliation: S.P.E.W. founder\n\nRon:\n- name: Ron Weasley\n- character: Practical, sometimes insensitive\n\nPrevious Dialogue:\nRon: Honestly, Hermione, they like working!",
    "expected_output": "Hermione: That's because they've been conditioned to accept their oppression, Ron!"
  },
  {
    "instruction": "Given the following scene, character attributes, and previous dialogue, generate the next line of dialogue between Voldemort and Dumbledore:",
    "input": "Scene:\nThe Ministry's Atrium was silent. Broken glass littered the floor from the magical battle. Voldemort and Dumbledore faced each other, wands raised.\n\"You do not seek to kill me, Dumbledore?\"\n\nCharacter Information:\nVoldemort:\n- name: Lord Voldemort\n- character: Ruthless, powerful, fears death\n- affiliation: Dark Lord\n\nDumbledore:\n- name: Albus Dumbledore\n- character: Wise, powerful, compassionate\n- affiliation: Hogwarts Headmaster\n\nPrevious Dialogue:\nVoldemort: You do not seek to kill me, Dumbledore?",
    "expected_output": "Dumbledore: We both know there are other ways of destroying a man, Tom."
  },
  {
    "instruction": "Given the following scene, character attributes, and previous dialogue, generate the next line of dialogue between Sirius and Harry:",
    "input": "Scene:\nThe cave near Hogsmeade was cold and dark. Buckbeak lay in the corner, while Sirius tore apart a chicken leg. Harry sat across from his godfather, worried about the Triwizard Tournament.\n\"I don't know if I can do this, Sirius.\"\n\nCharacter Information:\nSirius:\n- name: Sirius Black\n- character: Brave, protective, reckless\n- relation: Harry's godfather\n\nHarry:\n- name: Harry Potter\n- age: 14\n- character: Worried but determined\n\nPrevious Dialogue:\nHarry: I don't know if I can do this, Sirius.",
    "expected_output": "Sirius: You're your father's son, Harry. James would've laughed in the face of danger too."
  },
  {
    "instruction": "Given the following scene, character attributes, and previous dialogue, generate the next line of dialogue between Luna and Neville:",
    "input": "Scene:\nThe Room of Requirement was filled with students practicing defensive spells. Luna watched as Neville finally mastered a particularly difficult Shield Charm.\n\"I've never seen anyone improve so quickly, Neville.\"\n\nCharacter Information:\nLuna:\n- name: Luna Lovegood\n- character: Dreamy, honest, perceptive\n- affiliation: Dumbledore's Army\n\nNeville:\n- name: Neville Longbottom\n- character: Growing confidence, determined\n- affiliation: Dumbledore's Army\n\nPrevious Dialogue:\nLuna: I've never seen anyone improve so quickly, Neville.",
    "expected_output": "Neville: Thanks Luna. I suppose we all have to step up now that Harry's teaching us."
  }
]

In [None]:
test_cases[1]['instruction']

'Given the following scene, character attributes, and previous dialogue, generate the next line of dialogue between Hermione and Ron:'

In [None]:
output_train_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_train_set.json'
output_test_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_test_set.json'
formatted_dataset_path = '/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/Dataset Station/Harry Potter Alpaca/hpa_formatted_dataset'

with open(output_train_path, 'r', encoding='utf-8') as f:
    train_data = json.load(f)

with open(output_test_path, 'r', encoding='utf-8') as f:
    test_data = json.load(f)

# 나중에 데이터셋을 다시 로드할 때는 다음과 같이 사용할 수 있습니다:
from datasets import load_from_disk
formatted_dataset = load_from_disk(formatted_dataset_path)

In [None]:
print(formatted_dataset['train']['text'][0])

In [None]:
print(train_json[3]["instruction"])

Given the following scene, character attributes, and previous dialogue, generate the next line of dialogue between Petunia and Vernon and Harry:


In [None]:
print(train_json[3]["input"])

Scene:
“Bad news, Vernon,” she said. “Mrs. Figg’s broken her leg. She can’t take him.” She jerked her head in Harry’s direction.
Dudley’s mouth fell open in horror, but Harry’s heart gave a leap. Every year on Dudley’s birthday, his parents took him and a friend out for the day, to adventure parks, hamburger restaurants, or the movies. Every year, Harry was left behind with Mrs. Figg, a mad old lady who lived two streets away. Harry hated it there. The whole house smelled of cabbage and Mrs. Figg made him look at photographs of all the cats she’d ever owned.
“Now what?” said Aunt Petunia, looking furiously at Harry as though he’d planned this. Harry knew he ought to feel sorry that Mrs. Figg had broken her leg, but it wasn’t easy when he reminded himself it would be a whole year before he had to look at Tibbles, Snowy, Mr. Paws, and Tufty again.
“We could phone Marge,” Uncle Vernon suggested.
“Don’t be silly, Vernon, she hates the boy.”
The Dursleys often spoke about Harry like this, a

In [None]:
print(train_json[3]["output"])

Vernon: We could phone Marge,


In [7]:
import numpy as np
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from nltk.lm.preprocessing import padded_everygram_pipeline
from nltk.lm import MLE
import torch
from torch.nn import CrossEntropyLoss
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from scipy.spatial.distance import cosine
from sentence_transformers import SentenceTransformer
import nltk

# Download required NLTK data
nltk.download('punkt')
nltk.download('wordnet')

def calculate_bleu(reference, hypothesis):
    """
    Calculate BLEU score between reference and hypothesis

    Args:
        reference (str): The reference/ground truth text
        hypothesis (str): The generated/hypothesis text

    Returns:
        float: BLEU score
    """
    # Tokenize the sentences
    ref_tokens = nltk.word_tokenize(reference.lower())
    hyp_tokens = nltk.word_tokenize(hypothesis.lower())

    # Calculate BLEU score with smoothing
    smoothing = SmoothingFunction().method1
    return sentence_bleu([ref_tokens], hyp_tokens, smoothing_function=smoothing)

def calculate_meteor(reference, hypothesis):
    """
    Calculate METEOR score between reference and hypothesis

    Args:
        reference (str): The reference/ground truth text
        hypothesis (str): The generated/hypothesis text

    Returns:
        float: METEOR score
    """
    return meteor_score([reference.split()], hypothesis.split())

def calculate_perplexity(text, model_name='gpt2'):
    """
    Calculate perplexity using GPT-2

    Args:
        text (str): Input text to calculate perplexity for
        model_name (str): Name of the pretrained model to use

    Returns:
        float: Perplexity score
    """
    # Load model and tokenizer
    model = GPT2LMHeadModel.from_pretrained(model_name)
    tokenizer = GPT2Tokenizer.from_pretrained(model_name)
    model.eval()

    # Encode text
    encodings = tokenizer(text, return_tensors='pt')

    # Calculate perplexity
    max_length = model.config.n_positions
    stride = 512
    seq_len = encodings.input_ids.size(1)

    nlls = []
    prev_end_loc = 0
    for begin_loc in range(0, seq_len, stride):
        end_loc = min(begin_loc + max_length, seq_len)
        trg_len = end_loc - prev_end_loc
        input_ids = encodings.input_ids[:, begin_loc:end_loc]
        target_ids = input_ids.clone()
        target_ids[:, :-trg_len] = -100

        with torch.no_grad():
            outputs = model(input_ids, labels=target_ids)
            neg_log_likelihood = outputs.loss

        nlls.append(neg_log_likelihood)
        prev_end_loc = end_loc
        if end_loc == seq_len:
            break

    ppl = torch.exp(torch.stack(nlls).mean())
    return ppl.item()

def calculate_simile(reference, hypothesis):
    """
    Calculate SIMILE (Semantic Similarity) score using sentence transformers

    Args:
        reference (str): The reference/ground truth text
        hypothesis (str): The generated/hypothesis text

    Returns:
        float: SIMILE score (cosine similarity between sentence embeddings)
    """
    # Load sentence transformer model
    model = SentenceTransformer('all-MiniLM-L6-v2')

    # Get embeddings
    ref_embedding = model.encode([reference])[0]
    hyp_embedding = model.encode([hypothesis])[0]

    # Calculate cosine similarity
    similarity = 1 - cosine(ref_embedding, hyp_embedding)
    return similarity

def evaluate_responses(references, hypotheses):
    """
    Calculate all metrics for a list of reference and hypothesis texts

    Args:
        references (list): List of reference texts
        hypotheses (list): List of hypothesis texts

    Returns:
        dict: Dictionary containing average scores for all metrics
    """
    scores = {
        'bleu': [],
        'meteor': [],
        'perplexity': [],
        'simile': []
    }

    for ref, hyp in zip(references, hypotheses):
        scores['bleu'].append(calculate_bleu(ref, hyp))
        scores['meteor'].append(calculate_meteor(ref, hyp))
        scores['perplexity'].append(calculate_perplexity(hyp))
        scores['simile'].append(calculate_simile(ref, hyp))

    # Calculate averages
    return {metric: np.mean(values) for metric, values in scores.items()}

# Example usage
if __name__ == "__main__":
    # Sample data
    references = [
        "The quick brown fox jumps over the lazy dog.",
        "Machine learning is a subset of artificial intelligence."
    ]
    hypotheses = [
        "The fast brown fox leaps over the sleeping dog.",
        "Machine learning is a branch of artificial intelligence."
    ]

    # Install required packages if not already installed
    !pip install -q transformers torch sentence-transformers nltk

    # Calculate scores
    scores = evaluate_responses(references, hypotheses)

    # Print results
    print("\nEvaluation Metrics:")
    print(f"BLEU Score: {scores['bleu']:.4f}")
    print(f"METEOR Score: {scores['meteor']:.4f}")
    print(f"Perplexity: {scores['perplexity']:.4f}")
    print(f"SIMILE Score: {scores['simile']:.4f}")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...


LookupError: 
**********************************************************************
  Resource [93mpunkt_tab[0m not found.
  Please use the NLTK Downloader to obtain the resource:

  [31m>>> import nltk
  >>> nltk.download('punkt_tab')
  [0m
  For more information see: https://www.nltk.org/data.html

  Attempted to load [93mtokenizers/punkt_tab/english/[0m

  Searched in:
    - '/root/nltk_data'
    - '/usr/nltk_data'
    - '/usr/share/nltk_data'
    - '/usr/lib/nltk_data'
    - '/usr/share/nltk_data'
    - '/usr/local/share/nltk_data'
    - '/usr/lib/nltk_data'
    - '/usr/local/lib/nltk_data'
**********************************************************************


In [8]:
# Install required packages
!pip install -q transformers torch sentence-transformers nltk

import numpy as np
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from nltk.lm.preprocessing import padded_everygram_pipeline
from nltk.lm import MLE
import torch
from torch.nn import CrossEntropyLoss
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from scipy.spatial.distance import cosine
from sentence_transformers import SentenceTransformer
import nltk

# Download ALL required NLTK data at the start
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('punkt_tab')
nltk.download('averaged_perceptron_tagger')

def calculate_bleu(reference, hypothesis):
    """
    Calculate BLEU score between reference and hypothesis

    Args:
        reference (str): The reference/ground truth text
        hypothesis (str): The generated/hypothesis text

    Returns:
        float: BLEU score
    """
    # Tokenize the sentences
    ref_tokens = nltk.word_tokenize(reference.lower())
    hyp_tokens = nltk.word_tokenize(hypothesis.lower())

    # Calculate BLEU score with smoothing
    smoothing = SmoothingFunction().method1
    return sentence_bleu([ref_tokens], hyp_tokens, smoothing_function=smoothing)

def calculate_meteor(reference, hypothesis):
    """
    Calculate METEOR score between reference and hypothesis

    Args:
        reference (str): The reference/ground truth text
        hypothesis (str): The generated/hypothesis text

    Returns:
        float: METEOR score
    """
    return meteor_score([reference.split()], hypothesis.split())

def calculate_perplexity(text, model_name='gpt2'):
    """
    Calculate perplexity using GPT-2

    Args:
        text (str): Input text to calculate perplexity for
        model_name (str): Name of the pretrained model to use

    Returns:
        float: Perplexity score
    """
    # Load model and tokenizer
    model = GPT2LMHeadModel.from_pretrained(model_name)
    tokenizer = GPT2Tokenizer.from_pretrained(model_name)
    model.eval()

    # Encode text
    encodings = tokenizer(text, return_tensors='pt')

    # Calculate perplexity
    max_length = model.config.n_positions
    stride = 512
    seq_len = encodings.input_ids.size(1)

    nlls = []
    prev_end_loc = 0
    for begin_loc in range(0, seq_len, stride):
        end_loc = min(begin_loc + max_length, seq_len)
        trg_len = end_loc - prev_end_loc
        input_ids = encodings.input_ids[:, begin_loc:end_loc]
        target_ids = input_ids.clone()
        target_ids[:, :-trg_len] = -100

        with torch.no_grad():
            outputs = model(input_ids, labels=target_ids)
            neg_log_likelihood = outputs.loss

        nlls.append(neg_log_likelihood)
        prev_end_loc = end_loc
        if end_loc == seq_len:
            break

    ppl = torch.exp(torch.stack(nlls).mean())
    return ppl.item()

def calculate_simile(reference, hypothesis):
    """
    Calculate SIMILE (Semantic Similarity) score using sentence transformers

    Args:
        reference (str): The reference/ground truth text
        hypothesis (str): The generated/hypothesis text

    Returns:
        float: SIMILE score (cosine similarity between sentence embeddings)
    """
    # Load sentence transformer model
    model = SentenceTransformer('all-MiniLM-L6-v2')

    # Get embeddings
    ref_embedding = model.encode([reference])[0]
    hyp_embedding = model.encode([hypothesis])[0]

    # Calculate cosine similarity
    similarity = 1 - cosine(ref_embedding, hyp_embedding)
    return similarity

def evaluate_responses(references, hypotheses):
    """
    Calculate all metrics for a list of reference and hypothesis texts

    Args:
        references (list): List of reference texts
        hypotheses (list): List of hypothesis texts

    Returns:
        dict: Dictionary containing average scores for all metrics
    """
    scores = {
        'bleu': [],
        'meteor': [],
        'perplexity': [],
        'simile': []
    }

    for ref, hyp in zip(references, hypotheses):
        scores['bleu'].append(calculate_bleu(ref, hyp))
        scores['meteor'].append(calculate_meteor(ref, hyp))
        scores['perplexity'].append(calculate_perplexity(hyp))
        scores['simile'].append(calculate_simile(ref, hyp))

    # Calculate averages
    return {metric: np.mean(values) for metric, values in scores.items()}

# Example usage
if __name__ == "__main__":
    # Sample data
    references = [
        "The quick brown fox jumps over the lazy dog.",
        "Machine learning is a subset of artificial intelligence."
    ]
    hypotheses = [
        "The fast brown fox leaps over the sleeping dog.",
        "Machine learning is a branch of artificial intelligence."
    ]

    # Calculate scores
    scores = evaluate_responses(references, hypotheses)

    # Print results
    print("\nEvaluation Metrics:")
    print(f"BLEU Score: {scores['bleu']:.4f}")
    print(f"METEOR Score: {scores['meteor']:.4f}")
    print(f"Perplexity: {scores['perplexity']:.4f}")
    print(f"SIMILE Score: {scores['simile']:.4f}")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]


Evaluation Metrics:
BLEU Score: 0.3386
METEOR Score: 0.8734
Perplexity: 166.2554
SIMILE Score: 0.8484


In [None]:
test_n = 3

In [None]:
alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""

# ------------------------------------------------------------------------

# 모델을 inference 모드로 설정
FastLanguageModel.for_inference(model)

# 테스트할 예시 프롬프트 생성
test_instruction = train_json[test_n]['instruction']
test_input = train_json[test_n]['input']

# 입력 텍스트 생성
inputs = tokenizer(
    [
        alpaca_prompt.format(
            test_instruction,  # instruction
            test_input,       # input
            "",              # output - 생성을 위해 비워둠
        )
    ],
    return_tensors="pt"
).to("cuda")

# 텍스트 생성을 위한 streamer 설정
from transformers import TextStreamer
text_streamer = TextStreamer(tokenizer)

# 생성 파라미터 설정
generation_params = {
    "max_new_tokens": 256,      # 더 긴 대화를 위해 증가
    "temperature": 0.2,         # 창의성 조절 (0.7)
    "top_p": 0.2,              # 다양성 조절 (0.9)
    "do_sample": True,         # 다양한 응답 생성 가능
    "streamer": text_streamer,
    "pad_token_id": tokenizer.pad_token_id,
    "eos_token_id": tokenizer.eos_token_id,
}

# 텍스트 생성
print("Generating dialogue...")
outputs = model.generate(**inputs, **generation_params)

# 생성된 텍스트 디코딩 (streamer를 사용하지 않을 경우)
# generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
# print(generated_text)

# 여러 다른 상황에서 테스트하기 위한 함수
def generate_dialogue(instruction, scene, characters):
    input_text = f"Scene:\n{scene}\n\nCharacter Information:\n{characters}"

    inputs = tokenizer(
        [alpaca_prompt.format(instruction, input_text, "")],
        return_tensors="pt"
    ).to("cuda")

    outputs = model.generate(**inputs, **generation_params)
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

print("### Correct Dialogue:")
print(train_json[test_n]['output'], "<|end_of_text|>")

test_n += 1

<a name="Save"></a>
### Saving, loading finetuned models
To save the final model as LoRA adapters, either use Huggingface's `push_to_hub` for an online save or `save_pretrained` for a local save.

**[NOTE]** This ONLY saves the LoRA adapters, and not the full model. To save to 16bit or GGUF, scroll down!

Now if you want to load the LoRA adapters we just saved for inference, set `False` to `True`:

In [None]:
# if False:
#     from unsloth import FastLanguageModel
#     model, tokenizer = FastLanguageModel.from_pretrained(
#         model_name = "lora_model", # YOUR MODEL YOU USED FOR TRAINING
#         max_seq_length = max_seq_length,
#         dtype = dtype,
#         load_in_4bit = load_in_4bit,
#     )
#     FastLanguageModel.for_inference(model)

# # alpaca_prompt = You MUST run cells from above!

# inputs = tokenizer(
# [
#     alpaca_prompt.format(
#         "What is a famous tall tower in Paris?", # instruction
#         "", # input
#         "", # output - leave this blank for generation!
#     )
# ], return_tensors = "pt").to("cuda")

# outputs = model.generate(**inputs, max_new_tokens = 64, use_cache = True)
# tokenizer.batch_decode(outputs)

["<|begin_of_text|>Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.\n\n### Instruction:\nWhat is a famous tall tower in Paris?\n\n### Input:\n\n\n### Response:\nThe Eiffel Tower is a famous tall tower located in Paris, France. It is 324 meters (1,063 feet) tall, making it the tallest structure in Paris. The Eiffel Tower was built in 1889 as the entrance arch for the 1889 World's Fair and was designed by"]

In [None]:
import torch
from typing import Dict, List, Optional

class DialoguePromptGenerator:
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer
        self.base_prompt_template = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
Generate the next line of dialogue that matches the character's personality, relationships, and the scene context.

### Input:
Scene Description:
{scene}

Character Information:
{character_info}

Previous Dialogue:
{dialogue_history}

Speaking Character: {speaker}

### Response:
"""

    def format_character_info(self, attributes: Dict, relations: Optional[Dict] = None) -> str:
        """Format character attributes and relationships into a structured string"""
        info_str = ""

        # Format basic attributes
        for key, value in attributes.items():
            if value and value != "None":
                info_str += f"- {key}: {value}\n"

        # Add relationships if available
        if relations:
            info_str += "\nRelations with Harry:\n"
            for key, value in relations.items():
                if isinstance(value, (int, float)) and value != 0:
                    info_str += f"- {key}: {value}\n"

        return info_str

    def format_dialogue_history(self, history: List[str]) -> str:
        """Format dialogue history with clear speaker indicators"""
        return "\n".join(history)

    def generate_prompt(self,
                       scene: str,
                       character_attributes: Dict,
                       dialogue_history: List[str],
                       speaker: str,
                       relations: Optional[Dict] = None) -> str:
        """Generate a complete prompt for dialogue generation"""

        # Format character information
        character_info = self.format_character_info(character_attributes, relations)

        # Format dialogue history
        formatted_history = self.format_dialogue_history(dialogue_history)

        # Fill template
        prompt = self.base_prompt_template.format(
            scene=scene,
            character_info=character_info,
            dialogue_history=formatted_history,
            speaker=speaker
        )

        return prompt

    def tokenize_prompt(self, prompt: str) -> torch.Tensor:
        """Tokenize the prompt for model input"""
        return self.tokenizer(
            prompt,
            truncation=True,
            max_length=512,
            padding="max_length",
            return_tensors="pt"
        )

    def generate_dialogue(self, model, prompt: str, max_length: int = 1000) -> str:
        """Generate dialogue using the model"""
        inputs = self.tokenize_prompt(prompt)

        outputs = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=max_length,
            num_return_sequences=1,
            temperature=0.7,
            top_p=0.9,
            do_sample=True
        )

        return self.tokenizer.decode(outputs[0], skip_special_tokens=True)

In [None]:
prompt_generator = DialoguePromptGenerator(tokenizer)

In [None]:
# Example data
scene = "In the Great Hall during breakfast"
character_attributes = {
    "personality": "brave, loyal",
    "house": "Gryffindor",
    "year": "3rd year"
}
relations = {
    "friendship": 0.9,
    "trust": 0.8
}
dialogue_history = [
    "Harry: Did you see the notice about Hogsmeade?",
    "Ron: Yeah, can't wait to visit Honeydukes!"
]
speaker = "Hermione"

# Generate prompt
prompt = prompt_generator.generate_prompt(
    scene=scene,
    character_attributes=character_attributes,
    dialogue_history=dialogue_history,
    speaker=speaker,
    relations=relations
)

FastLanguageModel.for_inference(model)

# Generate dialogue
response = prompt_generator.generate_dialogue(model, prompt)

In [None]:
print(response)

Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
Generate the next line of dialogue that matches the character's personality, relationships, and the scene context.

### Input:
Scene Description:
In the Great Hall during breakfast

Character Information:
- personality: brave, loyal
- house: Gryffindor
- year: 3rd year

Relations with Harry:
- friendship: 0.9
- trust: 0.8


Previous Dialogue:
Harry: Did you see the notice about Hogsmeade?
Ron: Yeah, can't wait to visit Honeydukes!

Speaking Character: Hermione

### Response:
Hermione: I can't wait to visit Honeydukes!


In [None]:
# For more consistent responses
prompt_generator.generate_dialogue(model, prompt, temperature=0.5, top_p=0.95)

# For more creative responses
prompt_generator.generate_dialogue(model, prompt, temperature=0.8, top_p=0.9)

In [None]:
character_attributes.update({
    "mood": "excited",
    "current_goals": "preparing for exams",
    "recent_events": "just learned a new spell"
})

In [None]:
!git clone https://github.com/mlvlab/ProMetaR.git
%cd ProMetaR/

!git clone https://github.com/KaiyangZhou/Dassl.pytorch.git
%cd Dassl.pytorch/

# Install dependencies
!pip install -r requirements.txt
!cp -r dassl ../
# Install this library (no need to re-build if the source code is modified)
# !python setup.py develop
%cd ..

!pip install -r requirements.txt

%mkdir outputs
%mkdir data

%cd data
%mkdir eurosat
!wget http://madm.dfki.de/files/sentinel/EuroSAT.zip -O EuroSAT.zip

!unzip -o EuroSAT.zip -d eurosat/
%cd eurosat
!gdown 1Ip7yaCWFi0eaOFUGga0lUdVi_DDQth1o

%cd ../../

import os.path as osp
from collections import OrderedDict
import math
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.cuda.amp import GradScaler, autocast
from PIL import Image
import torchvision.transforms as transforms
import torch
from clip import clip
from clip.simple_tokenizer import SimpleTokenizer as _Tokenizer
import time
from tqdm import tqdm
import datetime
import argparse
from dassl.utils import setup_logger, set_random_seed, collect_env_info
from dassl.config import get_cfg_default
from dassl.engine import build_trainer
from dassl.engine import TRAINER_REGISTRY, TrainerX
from dassl.metrics import compute_accuracy
from dassl.utils import load_pretrained_weights, load_checkpoint
from dassl.optim import build_optimizer, build_lr_scheduler

# custom
import datasets.oxford_pets
import datasets.oxford_flowers
import datasets.fgvc_aircraft
import datasets.dtd
import datasets.eurosat
import datasets.stanford_cars
import datasets.food101
import datasets.sun397
import datasets.caltech101
import datasets.ucf101
import datasets.imagenet
import datasets.imagenet_sketch
import datasets.imagenetv2
import datasets.imagenet_a
import datasets.imagenet_r

def print_args(args, cfg):
    print("***************")
    print("** Arguments **")
    print("***************")
    optkeys = list(args.__dict__.keys())
    optkeys.sort()
    for key in optkeys:
        print("{}: {}".format(key, args.__dict__[key]))
    print("************")
    print("** Config **")
    print("************")
    print(cfg)

def reset_cfg(cfg, args):
    if args.root:
        cfg.DATASET.ROOT = args.root
    if args.output_dir:
        cfg.OUTPUT_DIR = args.output_dir
    if args.seed:
        cfg.SEED = args.seed
    if args.trainer:
        cfg.TRAINER.NAME = args.trainer
    cfg.DATASET.NUM_SHOTS = 16
    cfg.DATASET.SUBSAMPLE_CLASSES = args.subsample_classes
    cfg.DATALOADER.TRAIN_X.BATCH_SIZE = args.train_batch_size
    cfg.OPTIM.MAX_EPOCH = args.epoch

def extend_cfg(cfg):
    """
    Add new config variables.
    """
    from yacs.config import CfgNode as CN
    cfg.TRAINER.COOP = CN()
    cfg.TRAINER.COOP.N_CTX = 16  # number of context vectors
    cfg.TRAINER.COOP.CSC = False  # class-specific context
    cfg.TRAINER.COOP.CTX_INIT = ""  # initialization words
    cfg.TRAINER.COOP.PREC = "fp16"  # fp16, fp32, amp
    cfg.TRAINER.COOP.CLASS_TOKEN_POSITION = "end"  # 'middle' or 'end' or 'front'
    cfg.TRAINER.COCOOP = CN()
    cfg.TRAINER.COCOOP.N_CTX = 4  # number of context vectors
    cfg.TRAINER.COCOOP.CTX_INIT = "a photo of a"  # initialization words
    cfg.TRAINER.COCOOP.PREC = "fp16"  # fp16, fp32, amp
    cfg.TRAINER.PROMETAR = CN()
    cfg.TRAINER.PROMETAR.N_CTX_VISION = 4  # number of context vectors at the vision branch
    cfg.TRAINER.PROMETAR.N_CTX_TEXT = 4  # number of context vectors at the language branch
    cfg.TRAINER.PROMETAR.CTX_INIT = "a photo of a"  # initialization words
    cfg.TRAINER.PROMETAR.PREC = "fp16"  # fp16, fp32, amp
    cfg.TRAINER.PROMETAR.PROMPT_DEPTH_VISION = 9  # Max 12, minimum 0, for 0 it will be using shallow IVLP prompting (J=1)
    cfg.TRAINER.PROMETAR.PROMPT_DEPTH_TEXT = 9  # Max 12, minimum 0, for 0 it will be using shallow IVLP prompting (J=1)
    cfg.DATASET.SUBSAMPLE_CLASSES = "all"  # all, base or new
    cfg.TRAINER.PROMETAR.ADAPT_LR = 0.0005
    cfg.TRAINER.PROMETAR.LR_RATIO = 0.0005
    cfg.TRAINER.PROMETAR.FAST_ADAPTATION = False
    cfg.TRAINER.PROMETAR.MIXUP_ALPHA = 0.5
    cfg.TRAINER.PROMETAR.MIXUP_BETA = 0.5
    cfg.TRAINER.PROMETAR.DIM_RATE=8
    cfg.OPTIM_VNET = CN()
    cfg.OPTIM_VNET.NAME = "adam"
    cfg.OPTIM_VNET.LR = 0.0003
    cfg.OPTIM_VNET.WEIGHT_DECAY = 5e-4
    cfg.OPTIM_VNET.MOMENTUM = 0.9
    cfg.OPTIM_VNET.SGD_DAMPNING = 0
    cfg.OPTIM_VNET.SGD_NESTEROV = False
    cfg.OPTIM_VNET.RMSPROP_ALPHA = 0.99
    cfg.OPTIM_VNET.ADAM_BETA1 = 0.9
    cfg.OPTIM_VNET.ADAM_BETA2 = 0.999
    cfg.OPTIM_VNET.STAGED_LR = False
    cfg.OPTIM_VNET.NEW_LAYERS = ()
    cfg.OPTIM_VNET.BASE_LR_MULT = 0.1
    # Learning rate scheduler
    cfg.OPTIM_VNET.LR_SCHEDULER = "single_step"
    # -1 or 0 means the stepsize is equal to max_epoch
    cfg.OPTIM_VNET.STEPSIZE = (-1, )
    cfg.OPTIM_VNET.GAMMA = 0.1
    cfg.OPTIM_VNET.MAX_EPOCH = 10
    # Set WARMUP_EPOCH larger than 0 to activate warmup training
    cfg.OPTIM_VNET.WARMUP_EPOCH = -1
    # Either linear or constant
    cfg.OPTIM_VNET.WARMUP_TYPE = "linear"
    # Constant learning rate when type=constant
    cfg.OPTIM_VNET.WARMUP_CONS_LR = 1e-5
    # Minimum learning rate when type=linear
    cfg.OPTIM_VNET.WARMUP_MIN_LR = 1e-5
    # Recount epoch for the next scheduler (last_epoch=-1)
    # Otherwise last_epoch=warmup_epoch
    cfg.OPTIM_VNET.WARMUP_RECOUNT = True

def setup_cfg(args):
    cfg = get_cfg_default()
    extend_cfg(cfg)
    # 1. From the dataset config file
    if args.dataset_config_file:
        cfg.merge_from_file(args.dataset_config_file)
    # 2. From the method config file
    if args.config_file:
        cfg.merge_from_file(args.config_file)
    # 3. From input arguments
    reset_cfg(cfg, args)
    cfg.freeze()
    return cfg

_tokenizer = _Tokenizer()

def load_clip_to_cpu(cfg): # Load CLIP
    backbone_name = cfg.MODEL.BACKBONE.NAME
    url = clip._MODELS[backbone_name]
    model_path = clip._download(url)

    try:
        # loading JIT archive
        model = torch.jit.load(model_path, map_location="cpu").eval()
        state_dict = None

    except RuntimeError:
        state_dict = torch.load(model_path, map_location="cpu")

    if cfg.TRAINER.NAME == "":
      design_trainer = "CoOp"
    else:
      design_trainer = cfg.TRAINER.NAME
    design_details = {"trainer": design_trainer,
                      "vision_depth": 0,
                      "language_depth": 0, "vision_ctx": 0,
                      "language_ctx": 0}
    model = clip.build_model(state_dict or model.state_dict(), design_details)

    return model

from dassl.config import get_cfg_default
cfg = get_cfg_default()
cfg.MODEL.BACKBONE.NAME = "ViT-B/16" # Set the vision encoder backbone of CLIP to ViT.
clip_model = load_clip_to_cpu(cfg)



class TextEncoder(nn.Module):
    def __init__(self, clip_model): # 초기화 하는 함수
        super().__init__()
        self.transformer = clip_model.transformer
        self.positional_embedding = clip_model.positional_embedding
        self.ln_final = clip_model.ln_final
        self.text_projection = clip_model.text_projection
        self.dtype = clip_model.dtype

    def forward(self, prompts, tokenized_prompts): # 모델 호출
        x = prompts + self.positional_embedding.type(self.dtype)
        x = x.permute(1, 0, 2)  # NLD -> LND
        x = self.transformer(x)
        x = x.permute(1, 0, 2)  # LND -> NLD
        x = self.ln_final(x).type(self.dtype)

        # x.shape = [batch_size, n_ctx, transformer.width]
        # take features from the eot embedding (eot_token is the highest number in each sequence)
        x = x[torch.arange(x.shape[0]), tokenized_prompts.argmax(dim=-1)] @ self.text_projection

        return x


@TRAINER_REGISTRY.register(force=True)
class CoCoOp(TrainerX):
    def check_cfg(self, cfg):
        assert cfg.TRAINER.COCOOP.PREC in ["fp16", "fp32", "amp"]

    def build_model(self):
        cfg = self.cfg
        classnames = self.dm.dataset.classnames
        print(f"Loading CLIP (backbone: {cfg.MODEL.BACKBONE.NAME})")
        clip_model = load_clip_to_cpu(cfg)

        if cfg.TRAINER.COCOOP.PREC == "fp32" or cfg.TRAINER.COCOOP.PREC == "amp":
            # CLIP's default precision is fp16
            clip_model.float()

        print("Building custom CLIP")
        self.model = CoCoOpCustomCLIP(cfg, classnames, clip_model)

        print("Turning off gradients in both the image and the text encoder")
        name_to_update = "prompt_learner"

        for name, param in self.model.named_parameters():
            if name_to_update not in name:
                param.requires_grad_(False)

        # Double check
        enabled = set()
        for name, param in self.model.named_parameters():
            if param.requires_grad:
                enabled.add(name)
        print(f"Parameters to be updated: {enabled}")

        if cfg.MODEL.INIT_WEIGHTS:
            load_pretrained_weights(self.model.prompt_learner, cfg.MODEL.INIT_WEIGHTS)

        self.model.to(self.device)
        # NOTE: only give prompt_learner to the optimizer
        self.optim = build_optimizer(self.model.prompt_learner, cfg.OPTIM)
        self.sched = build_lr_scheduler(self.optim, cfg.OPTIM)
        self.register_model("prompt_learner", self.model.prompt_learner, self.optim, self.sched)

        self.scaler = GradScaler() if cfg.TRAINER.COCOOP.PREC == "amp" else None

        # Note that multi-gpu training could be slow because CLIP's size is
        # big, which slows down the copy operation in DataParallel
        device_count = torch.cuda.device_count()
        if device_count > 1:
            print(f"Multiple GPUs detected (n_gpus={device_count}), use all of them!")
            self.model = nn.DataParallel(self.model)

    def before_train(self):
        directory = self.cfg.OUTPUT_DIR
        if self.cfg.RESUME:
            directory = self.cfg.RESUME
        self.start_epoch = self.resume_model_if_exist(directory)

        # Remember the starting time (for computing the elapsed time)
        self.time_start = time.time()


    def forward_backward(self, batch):
        image, label = self.parse_batch_train(batch)

        model = self.model
        optim = self.optim
        scaler = self.scaler

        prec = self.cfg.TRAINER.COCOOP.PREC
        loss = model(image, label) # Input image 모델 통과
        optim.zero_grad()
        loss.backward() # Backward (역전파)
        optim.step() # 모델 parameter update

        loss_summary = {"loss": loss.item()}

        if (self.batch_idx + 1) == self.num_batches:
            self.update_lr()

        return loss_summary

    def parse_batch_train(self, batch):
        input = batch["img"]
        label = batch["label"]
        input = input.to(self.device)
        label = label.to(self.device)
        return input, label

    def load_model(self, directory, epoch=None):
        if not directory:
            print("Note that load_model() is skipped as no pretrained model is given")
            return

        names = self.get_model_names()

        # By default, the best model is loaded
        model_file = "model-best.pth.tar"

        if epoch is not None:
            model_file = "model.pth.tar-" + str(epoch)

        for name in names:
            model_path = osp.join(directory, name, model_file)

            if not osp.exists(model_path):
                raise FileNotFoundError('Model not found at "{}"'.format(model_path))

            checkpoint = load_checkpoint(model_path)
            state_dict = checkpoint["state_dict"]
            epoch = checkpoint["epoch"]

            # Ignore fixed token vectors
            if "token_prefix" in state_dict:
                del state_dict["token_prefix"]

            if "token_suffix" in state_dict:
                del state_dict["token_suffix"]

            print("Loading weights to {} " 'from "{}" (epoch = {})'.format(name, model_path, epoch))
            # set strict=False
            self._models[name].load_state_dict(state_dict, strict=False)

    def after_train(self):
      print("Finish training")

      do_test = not self.cfg.TEST.NO_TEST
      if do_test:
          if self.cfg.TEST.FINAL_MODEL == "best_val":
              print("Deploy the model with the best val performance")
              self.load_model(self.output_dir)
          else:
              print("Deploy the last-epoch model")
          acc = self.test()

      # Show elapsed time
      elapsed = round(time.time() - self.time_start)
      elapsed = str(datetime.timedelta(seconds=elapsed))
      print(f"Elapsed: {elapsed}")

      # Close writer
      self.close_writer()
      return acc

    def train(self):
        """Generic training loops."""
        self.before_train()
        for self.epoch in range(self.start_epoch, self.max_epoch):
            self.before_epoch()
            self.run_epoch()
            self.after_epoch()
        acc = self.after_train()
        return acc

parser = argparse.ArgumentParser()
parser.add_argument("--root", type=str, default="data/", help="path to dataset")
parser.add_argument("--output-dir", type=str, default="outputs/cocoop3", help="output directory")
parser.add_argument(
    "--seed", type=int, default=1, help="only positive value enables a fixed seed"
)
parser.add_argument(
    "--config-file", type=str, default="configs/trainers/ProMetaR/vit_b16_c2_ep10_batch4_4+4ctx.yaml", help="path to config file"
)
parser.add_argument(
    "--dataset-config-file",
    type=str,
    default="configs/datasets/eurosat.yaml",
    help="path to config file for dataset setup",
)
parser.add_argument("--trainer", type=str, default="CoOp", help="name of trainer")
parser.add_argument("--eval-only", action="store_true", help="evaluation only")
parser.add_argument(
    "--model-dir",
    type=str,
    default="",
    help="load model from this directory for eval-only mode",
)
parser.add_argument("--train-batch-size", type=int, default=4)
parser.add_argument("--epoch", type=int, default=10)
parser.add_argument("--subsample-classes", type=str, default="base")
parser.add_argument(
    "--load-epoch", type=int, default=0, help="load model weights at this epoch for evaluation"
)
args = parser.parse_args([])

def main(args):
    cfg = setup_cfg(args)
    if cfg.SEED >= 0:
        set_random_seed(cfg.SEED)

    if torch.cuda.is_available() and cfg.USE_CUDA:
        torch.backends.cudnn.benchmark = True

    trainer = build_trainer(cfg)
    if args.eval_only:
        trainer.load_model(args.model_dir, epoch=args.load_epoch)
        acc = trainer.test()
        return acc

    acc = trainer.train()
    return acc

In [None]:
from trl import SFTTrainer
import torch
import torch.nn as nn
from collections import OrderedDict


class DialoguePromptLearner(nn.Module):
    def __init__(self, model, tokenizer, cfg):
        super().__init__()
        self.model = model
        self.tokenizer = tokenizer

        # Configuration for prompt learning
        self.n_ctx = cfg.get('n_ctx', 8)  # Number of context tokens to learn
        self.ctx_init = cfg.get('ctx_init', '')  # Optional initialization text
        self.ctx_dim = model.config.hidden_size

        # Initialize context vectors
        if self.ctx_init:
            # Initialize with given words
            ctx_init = self.ctx_init.replace('_', ' ')
            n_ctx = len(ctx_init.split())
            prompt_tokens = tokenizer(ctx_init, return_tensors='pt')
            with torch.no_grad():
                embedding = model.get_input_embeddings()(prompt_tokens['input_ids'][0, :n_ctx])
            self.ctx = nn.Parameter(embedding)
        else:
            # Random initialization
            self.ctx = nn.Parameter(torch.randn(self.n_ctx, self.ctx_dim))

        # Meta network for instance-specific prompts
        self.meta_net = nn.Sequential(OrderedDict([
            ("linear1", nn.Linear(self.ctx_dim, self.ctx_dim // 16)),
            ("relu", nn.ReLU(inplace=True)),
            ("linear2", nn.Linear(self.ctx_dim // 16, self.ctx_dim))
        ]))

    def forward(self, inputs):
        batch_size = inputs['input_ids'].size(0)

        # Get scene embeddings (using [CLS] token output)
        with torch.no_grad():
            scene_features = self.model.get_encoder()(
                input_ids=inputs['input_ids'],
                attention_mask=inputs['attention_mask']
            ).last_hidden_state[:, 0]  # Use [CLS] token

        # Generate instance-specific bias
        bias = self.meta_net(scene_features)  # (batch, ctx_dim)
        bias = bias.unsqueeze(1)  # (batch, 1, ctx_dim)

        # Apply bias to context tokens
        ctx = self.ctx.unsqueeze(0).expand(batch_size, -1, -1)  # (batch, n_ctx, ctx_dim)
        ctx_shifted = ctx + bias  # (batch, n_ctx, ctx_dim)

        # Prepare prompted input
        prompted_embeddings = self.model.get_input_embeddings()(inputs['input_ids'])

        # Insert learned context tokens after instruction/before input
        instruction_end = (inputs['input_ids'] == self.tokenizer.encode("### Input:", add_special_tokens=False)[0]).nonzero()[:, 1]
        for i in range(batch_size):
            idx = instruction_end[i]
            prompted_embeddings[i] = torch.cat([
                prompted_embeddings[i, :idx],
                ctx_shifted[i],
                prompted_embeddings[i, idx:]
            ], dim=0)

        return prompted_embeddings

class CoCoOpSFTTrainer(SFTTrainer):
    def __init__(self, *args, prompt_tuning_config=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.prompt_learner = DialoguePromptLearner(
            self.model,
            self.tokenizer,
            prompt_tuning_config or {}
        )

    def compute_loss(self, model, inputs, return_outputs=False):
        # Get prompted embeddings
        prompted_embeddings = self.prompt_learner(inputs)

        # Replace the standard embeddings with prompted ones
        original_forward = self.model.forward

        def prompted_forward(*args, **kwargs):
            kwargs['inputs_embeds'] = prompted_embeddings
            kwargs['input_ids'] = None
            return original_forward(*args, **kwargs)

        self.model.forward = prompted_forward

        # Compute loss using parent class
        loss = super().compute_loss(model, inputs, return_outputs)

        # Restore original forward
        self.model.forward = original_forward

        return loss

In [None]:
def generate_dialogue_with_prompts(instruction, scene, characters):
    input_text = f"Scene:\n{scene}\n\nCharacter Information:\n{characters}"
    inputs = tokenizer(
        [alpaca_prompt.format(instruction, input_text, "")],
        return_tensors="pt"
    ).to("cuda")

    # Get prompted embeddings
    prompted_embeddings = trainer.prompt_learner(inputs)

    # Generate with prompted embeddings
    outputs = model.generate(
        inputs_embeds=prompted_embeddings,
        **generation_params
    )
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

In [None]:
import torch
import torch.nn as nn
from collections import OrderedDict
from transformers import TextStreamer

class DialoguePromptLearner(nn.Module):
    def __init__(self, model, tokenizer):
        super().__init__()
        self.model = model
        self.tokenizer = tokenizer

        # Configuration
        self.n_ctx = 8  # 학습할 컨텍스트 토큰 수
        self.ctx_dim = model.config.hidden_size

        # 컨텍스트 벡터 초기화 (랜덤)
        self.ctx = nn.Parameter(torch.randn(self.n_ctx, self.ctx_dim))

        # 메타 네트워크 정의
        self.meta_net = nn.Sequential(OrderedDict([
            ("linear1", nn.Linear(self.ctx_dim, self.ctx_dim // 16)),
            ("relu", nn.ReLU(inplace=True)),
            ("linear2", nn.Linear(self.ctx_dim // 16, self.ctx_dim))
        ])).to(model.device)

    def forward(self, inputs):
        batch_size = inputs['input_ids'].size(0)

        # 씬 임베딩 추출 ([CLS] 토큰 출력 사용)
        with torch.no_grad():
            outputs = self.model.get_encoder()(
                input_ids=inputs['input_ids'].to(self.model.device),
                attention_mask=inputs['attention_mask'].to(self.model.device)
            )
            scene_features = outputs.last_hidden_state[:, 0]

        # 인스턴스별 바이어스 생성
        bias = self.meta_net(scene_features)
        bias = bias.unsqueeze(1)

        # 컨텍스트 토큰에 바이어스 적용
        ctx = self.ctx.unsqueeze(0).expand(batch_size, -1, -1)
        ctx_shifted = ctx + bias

        # 프롬프트된 입력 준비
        prompted_embeddings = self.model.get_input_embeddings()(inputs['input_ids'].to(self.model.device))

        # instruction과 input 사이에 학습된 컨텍스트 토큰 삽입
        instruction_marker = "### Input:"
        marker_ids = self.tokenizer.encode(instruction_marker, add_special_tokens=False)[0]
        instruction_end = (inputs['input_ids'] == marker_ids).nonzero()[:, 1]

        new_embeddings = []
        for i in range(batch_size):
            idx = instruction_end[i]
            new_emb = torch.cat([
                prompted_embeddings[i, :idx],
                ctx_shifted[i],
                prompted_embeddings[i, idx:]
            ], dim=0)
            new_embeddings.append(new_emb)

        return torch.stack(new_embeddings)

# CoCoOp을 사용한 인퍼런스 함수
def generate_dialogue_with_cocoop(model, tokenizer, instruction, input_text, prompt_learner):
    # 입력 텍스트 준비
    inputs = tokenizer(
        [alpaca_prompt.format(instruction, input_text, "")],
        return_tensors="pt"
    )

    # 프롬프트된 임베딩 생성
    prompted_embeddings = prompt_learner(inputs)

    # 텍스트 생성을 위한 streamer 설정
    text_streamer = TextStreamer(tokenizer)

    # 생성 파라미터
    generation_params = {
        "max_new_tokens": 256,
        "temperature": 0.2,
        "top_p": 0.2,
        "do_sample": True,
        "streamer": text_streamer,
        "pad_token_id": tokenizer.pad_token_id,
        "eos_token_id": tokenizer.eos_token_id,
    }

    # 텍스트 생성
    outputs = model.generate(
        inputs_embeds=prompted_embeddings,
        **generation_params
    )

    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# 프롬프트 러너 초기화 및 인퍼런스 실행
prompt_learner = DialoguePromptLearner(model, tokenizer)

# 테스트 예시로 인퍼런스 실행
def test_cocoop_inference(test_n):
    print("Generating dialogue with CoCoOp...")

    test_instruction = train_json[test_n]['instruction']
    test_input = train_json[test_n]['input']

    # CoCoOp을 사용한 대화 생성
    generated_text = generate_dialogue_with_cocoop(
        model,
        tokenizer,
        test_instruction,
        test_input,
        prompt_learner
    )

    print("\n=== Generated Response ===")
    print(generated_text)

    print("\n=== Correct Response ===")
    print(train_json[test_n]['output'], "<|end_of_text|>")

AttributeError: 'DialogueCoCoOpModel' object has no attribute 'config'

In [None]:
# 특정 테스트 케이스로 실행
test_cocoop_inference(test_n=0)  # 또는 다른 인덱스

NameError: name 'test_cocoop_inference' is not defined

In [None]:
import torch
import torch.nn as nn
from collections import OrderedDict
from transformers import TextStreamer

class DialoguePromptLearner(nn.Module):
    def __init__(self, model, tokenizer):
        super().__init__()
        self.base_model = model  # FastLanguageModel 인스턴스
        self.tokenizer = tokenizer

        # FastLanguageModel의 구조에 맞춰 설정
        self.n_ctx = 8  # 학습할 컨텍스트 토큰 수
        # model.config.hidden_size로 직접 접근
        self.ctx_dim = model.config.hidden_size
        self.device = next(model.parameters()).device

        # 컨텍스트 벡터 초기화 (랜덤)
        self.ctx = nn.Parameter(torch.randn(self.n_ctx, self.ctx_dim, device=self.device))

        # 메타 네트워크 정의
        self.meta_net = nn.Sequential(OrderedDict([
            ("linear1", nn.Linear(self.ctx_dim, self.ctx_dim // 16)),
            ("relu", nn.ReLU(inplace=True)),
            ("linear2", nn.Linear(self.ctx_dim // 16, self.ctx_dim))
        ])).to(self.device)

    def forward(self, inputs):
        batch_size = inputs['input_ids'].size(0)

        # 씬 임베딩 추출
        with torch.no_grad():
            input_ids = inputs['input_ids'].to(self.device)
            attention_mask = inputs['attention_mask'].to(self.device)

            # FastLanguageModel의 임베딩 레이어 직접 접근
            embeddings = self.base_model.embed_tokens(input_ids)
            scene_features = embeddings[:, 0]

        # 인스턴스별 바이어스 생성
        bias = self.meta_net(scene_features)
        bias = bias.unsqueeze(1)

        # 컨텍스트 토큰에 바이어스 적용
        ctx = self.ctx.unsqueeze(0).expand(batch_size, -1, -1)
        ctx_shifted = ctx + bias

        # 프롬프트된 입력 준비
        prompted_embeddings = self.base_model.embed_tokens(input_ids)

        # instruction과 input 사이에 학습된 컨텍스트 토큰 삽입
        instruction_marker = "### Input:"
        marker_ids = self.tokenizer.encode(instruction_marker, add_special_tokens=False)[0]
        instruction_end = (input_ids == marker_ids).nonzero()[:, 1]

        new_embeddings = []
        for i in range(batch_size):
            idx = instruction_end[i]
            new_emb = torch.cat([
                prompted_embeddings[i, :idx],
                ctx_shifted[i],
                prompted_embeddings[i, idx:]
            ], dim=0)
            new_embeddings.append(new_emb)

        return torch.stack(new_embeddings)

def generate_dialogue_with_cocoop(model, tokenizer, instruction, input_text, prompt_learner):
    alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""

    inputs = tokenizer(
        [alpaca_prompt.format(instruction, input_text, "")],
        return_tensors="pt"
    )

    # 프롬프트된 임베딩 생성
    prompted_embeddings = prompt_learner(inputs)

    # 텍스트 생성을 위한 streamer 설정
    text_streamer = TextStreamer(tokenizer)

    # 생성 파라미터
    generation_params = {
        "max_new_tokens": 256,
        "temperature": 0.2,
        "top_p": 0.2,
        "do_sample": True,
        "streamer": text_streamer,
        "pad_token_id": tokenizer.pad_token_id,
        "eos_token_id": tokenizer.eos_token_id,
    }

    # FastLanguageModel generate 호출
    outputs = model.generate(
        inputs_embeds=prompted_embeddings,
        attention_mask=torch.ones_like(inputs['input_ids']),
        **generation_params
    )

    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# 테스트 함수
def test_cocoop_inference(model, tokenizer, train_json, test_n=0):
    print("Initializing CoCoOp prompt learner...")
    prompt_learner = DialoguePromptLearner(model, tokenizer)

    print("\nGenerating dialogue with CoCoOp...")
    test_instruction = train_json[test_n]['instruction']
    test_input = train_json[test_n]['input']

    generated_text = generate_dialogue_with_cocoop(
        model,
        tokenizer,
        test_instruction,
        test_input,
        prompt_learner
    )

    print("\n=== Generated Response ===")
    print(generated_text)

    print("\n=== Correct Response ===")
    print(train_json[test_n]['output'], "<|end_of_text|>")

In [None]:
# FastLanguageModel이 이미 준비되어 있다고 가정
test_cocoop_inference(model, tokenizer, train_data, test_n=0)

Initializing CoCoOp prompt learner...


AttributeError: 'DialogueCoCoOpModel' object has no attribute 'config'

In [None]:
print("Model type:", type(model))
print("Available attributes:", [attr for attr in dir(model) if not attr.startswith('_')])

# 모델의 파라미터 구조도 확인
for name, param in model.named_parameters():
    print(f"Layer: {name}, Shape: {param.shape}")

Model type: <class '__main__.DialogueCoCoOpModel'>
Available attributes: ['T_destination', 'add_module', 'apply', 'base_model', 'bfloat16', 'buffers', 'call_super_init', 'children', 'compile', 'context_encoder', 'cpu', 'cuda', 'dialogue_encoder', 'double', 'dump_patches', 'eval', 'extra_repr', 'float', 'forward', 'get_buffer', 'get_extra_state', 'get_parameter', 'get_submodule', 'half', 'ipu', 'load_state_dict', 'modules', 'mtia', 'named_buffers', 'named_children', 'named_modules', 'named_parameters', 'parameters', 'prompt_learner', 'register_backward_hook', 'register_buffer', 'register_forward_hook', 'register_forward_pre_hook', 'register_full_backward_hook', 'register_full_backward_pre_hook', 'register_load_state_dict_post_hook', 'register_load_state_dict_pre_hook', 'register_module', 'register_parameter', 'register_state_dict_post_hook', 'register_state_dict_pre_hook', 'requires_grad_', 'set_extra_state', 'set_submodule', 'share_memory', 'state_dict', 'to', 'to_empty', 'train', 'tra

In [None]:
import torch
import torch.nn as nn
from collections import OrderedDict
from transformers import TextStreamer

class DialoguePromptLearner(nn.Module):
    def __init__(self, model, tokenizer):
        super().__init__()
        self.model = model
        self.tokenizer = tokenizer

        # 실제 모델의 hidden size 가져오기
        self.ctx_dim = 4096  # 모델 출력에서 확인된 크기
        self.n_ctx = 8  # 학습할 컨텍스트 토큰 수

        # device 가져오기
        self.device = next(model.parameters()).device

        # 컨텍스트 벡터 초기화 (랜덤)
        self.ctx = nn.Parameter(torch.randn(self.n_ctx, self.ctx_dim, device=self.device))

        # 메타 네트워크 정의
        self.meta_net = nn.Sequential(OrderedDict([
            ("linear1", nn.Linear(self.ctx_dim, self.ctx_dim // 16)),
            ("relu", nn.ReLU(inplace=True)),
            ("linear2", nn.Linear(self.ctx_dim // 16, self.ctx_dim))
        ])).to(self.device)

    def forward(self, inputs):
        batch_size = inputs['input_ids'].size(0)

        # 입력을 장치로 이동
        input_ids = inputs['input_ids'].to(self.device)
        attention_mask = inputs['attention_mask'].to(self.device)

        # 기본 모델에서 임베딩 추출
        # base_model 경로 단순화 및 device 확인
        base_model = self.model.base_model
        while hasattr(base_model, 'base_model'):
            base_model = base_model.base_model
        base_model = base_model.model

        with torch.no_grad():
            # 모델의 임베딩 레이어를 사용하여 입력 임베딩
            embeddings = base_model.model.embed_tokens(input_ids)
            scene_features = embeddings[:, 0]  # [CLS] 토큰 임베딩 사용

        # 컨텍스트 조정을 위한 메타 네트워크
        bias = self.meta_net(scene_features)  # (batch, ctx_dim)
        bias = bias.unsqueeze(1)  # (batch, 1, ctx_dim)

        # 컨텍스트 토큰 확장 및 바이어스 적용
        ctx = self.ctx.unsqueeze(0).expand(batch_size, -1, -1)  # (batch, n_ctx, ctx_dim)
        ctx_shifted = ctx + bias  # (batch, n_ctx, ctx_dim)

        # 프롬프트된 임베딩 준비
        prompted_embeddings = base_model.model.embed_tokens(input_ids)

        # instruction과 input 사이에 학습된 컨텍스트 토큰 삽입
        instruction_marker = "### Input:"
        marker_ids = torch.tensor(self.tokenizer.encode(instruction_marker, add_special_tokens=False)[0]).to(self.device)
        instruction_end = (input_ids == marker_ids).nonzero()[:, 1]

        new_embeddings = []
        for i in range(batch_size):
            idx = instruction_end[i]
            new_emb = torch.cat([
                prompted_embeddings[i, :idx],
                ctx_shifted[i],
                prompted_embeddings[i, idx:]
            ], dim=0)
            new_embeddings.append(new_emb)

        return torch.stack(new_embeddings)

def generate_dialogue_with_cocoop(model, tokenizer, instruction, input_text, prompt_learner):
    # 입력 텍스트 준비
    alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""

    # 입력을 GPU로 이동
    inputs = tokenizer(
        [alpaca_prompt.format(instruction, input_text, "")],
        return_tensors="pt"
    ).to(prompt_learner.device)

    # 프롬프트된 임베딩 생성
    prompted_embeddings = prompt_learner(inputs)

    # 텍스트 생성을 위한 streamer 설정
    text_streamer = TextStreamer(tokenizer)

    # 생성 파라미터
    generation_params = {
        "max_new_tokens": 256,
        "temperature": 0.2,
        "top_p": 0.2,
        "do_sample": True,
        "streamer": text_streamer,
        "pad_token_id": tokenizer.pad_token_id,
        "eos_token_id": tokenizer.eos_token_id,
    }

    # FastLanguageModel에 맞춘 생성 프로세스
    outputs = model.generate(
        inputs_embeds=prompted_embeddings,
        attention_mask=torch.ones_like(inputs['input_ids']).to(prompt_learner.device),
        **generation_params
    )

    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# 테스트 실행 함수
def test_cocoop_inference(model, tokenizer, train_json, test_n=0):
    print("Initializing CoCoOp prompt learner...")
    prompt_learner = DialoguePromptLearner(model, tokenizer)

    print("\nGenerating dialogue with CoCoOp...")
    test_instruction = train_json[test_n]['instruction']
    test_input = train_json[test_n]['input']

    generated_text = generate_dialogue_with_cocoop(
        model,
        tokenizer,
        test_instruction,
        test_input,
        prompt_learner
    )

    print("\n=== Generated Response ===")
    print(generated_text)

    print("\n=== Correct Response ===")
    print(train_json[test_n]['output'], "<|end_of_text|>")

In [None]:
test_cocoop_inference(model, tokenizer, train_data)

Initializing CoCoOp prompt learner...

Generating dialogue with CoCoOp...


KeyboardInterrupt: 

### Eval

In [None]:
# import json

# output_train_path = '/content/drive/MyDrive/Colab Notebooks/Harry Potter Alpaca/Harry Potter Alpaca/hpa_train_set.json'
# output_test_path = '/content/drive/MyDrive/Colab Notebooks/Harry Potter Alpaca/Harry Potter Alpaca/hpa_test_set.json'
# formatted_dataset_path = '/content/drive/MyDrive/Colab Notebooks/Harry Potter Alpaca/Harry Potter Alpaca/hpa_formatted_dataset'

# with open(output_train_path, 'r', encoding='utf-8') as f:
#     train_data = json.load(f)

# with open(output_test_path, 'r', encoding='utf-8') as f:
#     test_data = json.load(f)

# # 나중에 데이터셋을 다시 로드할 때는 다음과 같이 사용할 수 있습니다:
# from datasets import load_from_disk
# formatted_dataset = load_from_disk(formatted_dataset_path)

In [37]:
alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""

# 모델을 inference 모드로 설정
FastLanguageModel.for_inference(model)

test_result = []

# ------------------------------------------------------------------------
for test_n in range(200):
    # 테스트할 예시 프롬프트 생성
    test_instruction = test_data[test_n]['instruction']
    test_input = test_data[test_n]['input']

    # 입력 텍스트 생성
    inputs = tokenizer(
        [
            alpaca_prompt.format(
                test_instruction,  # instruction
                test_input,       # input
                "",              # output - 생성을 위해 비워둠
            )
        ],
        return_tensors="pt"
    ).to("cuda")

    # 텍스트 생성을 위한 streamer 설정
    from transformers import TextStreamer
    text_streamer = TextStreamer(tokenizer)

    # 생성 파라미터 설정
    generation_params = {
        "max_new_tokens": 256,      # 더 긴 대화를 위해 증가
        "temperature": 0.2,         # 창의성 조절 (0.7)
        "top_p": 0.2,              # 다양성 조절 (0.9)
        "do_sample": True,         # 다양한 응답 생성 가능
        "streamer": text_streamer,
        "pad_token_id": tokenizer.pad_token_id,
        "eos_token_id": tokenizer.eos_token_id,
    }

    # 텍스트 생성
    print("Data", test_n)
    print("Generating dialogue...")
    outputs = model.generate(**inputs, **generation_params)

    print("### Correct Dialogue:")
    print(test_data[test_n]['output'], "<|end_of_text|>")

    prediction = tokenizer.decode(outputs[0], skip_special_tokens=True).split("### Response:")[1].strip()
    ground_truth = test_data[test_n]['output'] + "<|end_of_text|>"
    test_result.append({
        "prediction": prediction,
        "ground_truth": ground_truth
    })

Data 100
Generating dialogue...
<|begin_of_text|>Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
Given the following context and previous dialogue, generate the next line of dialogue:

### Input:
Scene:
Harry wakes up and realizes that his encounter with Hagrid and the news of being a wizard was not a dream. Aunt Petunia knocks on the door, but Harry is too happy to care. An owl delivers a newspaper and demands payment, which Hagrid instructs Harry to give. Hagrid tells Harry that they need to go to Gringotts, the wizard bank, to get money for his school supplies. Hagrid also mentions that he has important business with Dumbledore.

Character Information:
Harry:
- looks: Very thin, black hair, emerald green eyes, wearing glasses, knife injury with lightning shape at the forehead
- nickname: The boy who lived
Hagrid:
- age: Adult
- lineage: wizard
- affiliation

In [43]:
import json
import os

# 저장할 경로 설정
save_path = '/content/drive/MyDrive/Colab Notebooks/test_results(lora).json'

# JSON 형식으로 저장
with open(save_path, 'w', encoding='utf-8') as f:
    json.dump(test_result, f, ensure_ascii=False, indent=2)

print(f'Results saved to: {save_path}')

Results saved to: /content/drive/MyDrive/Colab Notebooks/test_results(lora).json


# Evaluation

In [None]:
# Install required packages
!pip install -q transformers torch sentence-transformers nltk

import numpy as np
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from nltk.lm.preprocessing import padded_everygram_pipeline
from nltk.lm import MLE
import torch
from torch.nn import CrossEntropyLoss
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from scipy.spatial.distance import cosine
from sentence_transformers import SentenceTransformer
import nltk

# Download ALL required NLTK data at the start
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('punkt_tab')
nltk.download('averaged_perceptron_tagger')

def calculate_bleu(reference, hypothesis):
    """
    Calculate BLEU score between reference and hypothesis
    """
    # Tokenize the sentences
    ref_tokens = nltk.word_tokenize(reference.lower())
    hyp_tokens = nltk.word_tokenize(hypothesis.lower())

    # Calculate BLEU score with smoothing
    smoothing = SmoothingFunction().method1
    return sentence_bleu([ref_tokens], hyp_tokens, smoothing_function=smoothing)

def calculate_meteor(reference, hypothesis):
    """
    Calculate METEOR score between reference and hypothesis
    """
    return meteor_score([reference.split()], hypothesis.split())

def calculate_perplexity(text, model_name='gpt2'):
    """
    Calculate perplexity using GPT-2
    """
    # Load model and tokenizer
    model = GPT2LMHeadModel.from_pretrained(model_name)
    tokenizer = GPT2Tokenizer.from_pretrained(model_name)
    model.eval()

    # Encode text
    encodings = tokenizer(text, return_tensors='pt')

    # Calculate perplexity
    max_length = model.config.n_positions
    stride = 512
    seq_len = encodings.input_ids.size(1)

    nlls = []
    prev_end_loc = 0
    for begin_loc in range(0, seq_len, stride):
        end_loc = min(begin_loc + max_length, seq_len)
        trg_len = end_loc - prev_end_loc
        input_ids = encodings.input_ids[:, begin_loc:end_loc]
        target_ids = input_ids.clone()
        target_ids[:, :-trg_len] = -100

        with torch.no_grad():
            outputs = model(input_ids, labels=target_ids)
            neg_log_likelihood = outputs.loss

        nlls.append(neg_log_likelihood)
        prev_end_loc = end_loc
        if end_loc == seq_len:
            break

    ppl = torch.exp(torch.stack(nlls).mean())
    return ppl.item()

def calculate_simile(reference, hypothesis):
    """
    Calculate SIMILE (Semantic Similarity) score using sentence transformers
    """
    # Load sentence transformer model
    model = SentenceTransformer('all-MiniLM-L6-v2')

    # Get embeddings
    ref_embedding = model.encode([reference])[0]
    hyp_embedding = model.encode([hypothesis])[0]

    # Calculate cosine similarity
    similarity = 1 - cosine(ref_embedding, hyp_embedding)
    return similarity

def evaluate_json_responses(test_result):
    """
    Calculate metrics for JSON list containing predictions and ground truths

    Args:
        test_result (list): List of dictionaries with 'prediction' and 'ground_truth' keys

    Returns:
        dict: Dictionary containing average scores for all metrics
    """
    scores = {
        'bleu': [],
        'meteor': [],
        'perplexity': [],
        'simile': []
    }

    for item in test_result:
        # Extract prediction and ground truth
        prediction = item['prediction']
        ground_truth = item['ground_truth']

        # Remove special tokens if present
        ground_truth = ground_truth.replace('<|end_of_text|>', '').strip()

        # Calculate scores
        scores['bleu'].append(calculate_bleu(ground_truth, prediction))
        scores['meteor'].append(calculate_meteor(ground_truth, prediction))
        scores['perplexity'].append(calculate_perplexity(prediction))
        scores['simile'].append(calculate_simile(ground_truth, prediction))

    # Calculate averages
    return {metric: np.mean(values) for metric, values in scores.items()}

In [49]:
import json

# JSON 파일 불러오기
with open('/content/drive/MyDrive/Colab Notebooks/딥러닝 프로젝트/test_results(lora).json', 'r', encoding='utf-8') as f:
    loaded_result = json.load(f)

scores = evaluate_json_responses(loaded_result)


Average Metrics:
Average BLEU Score: 0.2191
Average METEOR Score: 0.2675
Average Perplexity: 26.4606
Average SIMILE Score: 0.9512
