### Concept Validation Experiment

In [5]:
import torch

# 检查CUDA是否可用
if torch.cuda.is_available():
    # 获取可用的GPU设备数量
    gpu_count = torch.cuda.device_count()
    print(f"发现 {gpu_count} 个可用的GPU 设备.")

    # 遍历并打印每个GPU设备的名称
    for i in range(gpu_count):
        gpu_name = torch.cuda.get_device_name(i)
        print(f"GPU {i + 1}: {gpu_name}")
else:
    print("未发现可用的GPU设备.")


发现 1 个可用的GPU 设备.
GPU 1: NVIDIA A100-SXM4-80GB


In [6]:
import copy
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge import Rouge
from bert_score import score
import statistics
from ast import literal_eval
import functools
import json
import os
import random
import wget
random.seed(8888)
torch.manual_seed(8888)
random.seed(8888)
np.random.seed(8888)

if torch.cuda.is_available():
    torch.cuda.manual_seed(8888)
    torch.cuda.manual_seed_all(8888)


import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F

from tqdm import tqdm
#from transformers_source.src.transformers.models.llama import LlamaForCausalLM, LlamaTokenizer
from transformers import LlamaForCausalLM, LlamaTokenizer

torch.set_grad_enabled(False)
tqdm.pandas()

import sys
sys.path.append("/root/Unlearn_Harry_Potter/dissecting_factual_predictions")

# Utilities
from utils import (
    ModelAndTokenizer,
    make_inputs,
    decode_tokens,
    find_token_range,
    predict_from_input,
)

In [7]:
torch.cuda.set_device(0)

base_model = '/root/autodl-tmp/transformers/llama2-7b-chat-hf' 
model = LlamaForCausalLM.from_pretrained(
            base_model,
            # load_in_8bit=load_8bit,
            # torch_dtype=torch.float16,
            # device_map="auto",
        ).to('cuda')

tokenizer = LlamaTokenizer.from_pretrained(base_model, legacy = True)
mt = ModelAndTokenizer(
    model_name=base_model,
    model=model,
    tokenizer=tokenizer,
    low_cpu_mem_usage=False,
    torch_dtype=None,
)
mt.model.eval()


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

LlamaForCausalLM(
  (model): LlamaModel(
    (embed_tokens): Embedding(32000, 4096)
    (layers): ModuleList(
      (0-31): 32 x LlamaDecoderLayer(
        (self_attn): LlamaSdpaAttention(
          (q_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (k_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (v_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (o_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (rotary_emb): LlamaRotaryEmbedding()
        )
        (mlp): LlamaMLP(
          (gate_proj): Linear(in_features=4096, out_features=11008, bias=False)
          (up_proj): Linear(in_features=4096, out_features=11008, bias=False)
          (down_proj): Linear(in_features=11008, out_features=4096, bias=False)
          (act_fn): SiLU()
        )
        (input_layernorm): LlamaRMSNorm()
        (post_attention_layernorm): LlamaRMSNorm()
      )
    )
    (norm): LlamaRMSNorm()
  )
  (lm_head):

In [8]:
# Bleu Score Calculation
def calculate_bleu(reference, candidate):
    reference = [reference.split()]
    candidate = candidate.split()
    cc = SmoothingFunction()
    bleu_score = sentence_bleu(reference, candidate, smoothing_function=cc.method3)
    return bleu_score

# Rouge-L Score Calculation
def calculate_rouge_l(reference, candidate):
    rouge = Rouge()
    scores = rouge.get_scores(candidate, reference)
    rouge_l_score = scores[0]['rouge-l']['f']
    return rouge_l_score

# BLEURT-20 Score Calculation
def calculate_bleurt(references, candidates, model, tokenizer, config):

    model.eval()
    with torch.no_grad():
        inputs = tokenizer(references, candidates, padding='longest', return_tensors='pt')
        res = model(**inputs).logits.flatten().tolist()
    print(res)
    return res

    
# Bert-F Score Calculation
def calculate_bert_f(reference, candidate):
    _, _, bert_scores = score([candidate], [reference], lang="en", model_type="bert-base-uncased")
    bert_f_score = bert_scores[0]  # Extracting the F1 score
    return bert_f_score

def add_noise(location, noise_scale = 0):
    # Create Gaussian noise
    mean = 0
    std = noise_scale
    shape = (4096,)
    
    noise = torch.normal(mean, std, size=shape).to('cuda')
    dimension, layer = location[0], location[1]
    global new_params
    new_params[f'model.layers.{layer}.mlp.down_proj.weight'].T[dimension,:] += noise 

def answers_generate(Questions, Questions_unrelated, location, noise = 0):
    answers_list = []
    unrelated_answers_list = []

    if noise != 0:
        add_noise(location = location, noise_scale = noise)
        global old_params, new_params
        mt.model.load_state_dict(new_params)

    len_questions = len(Questions)
    for idx, question in enumerate(Questions + Questions_unrelated):
        inputs = mt.tokenizer(f"Question: {question}\n Answer:", return_tensors="pt")
        input_ids = inputs["input_ids"].to('cuda')
        
        with torch.no_grad():
            generation_output = mt.model.generate(  #mt.model
                input_ids=input_ids,
                return_dict_in_generate=True,
                do_sample = False,
                max_new_tokens=100,
            )
        s = generation_output.sequences[0]
        output = tokenizer.decode(s)
        # Find the index of "Answer:"
        answer_index = output.find("Answer:")
        if answer_index != -1:
            # Extract the text after "Answer:"
            answer_text = output[answer_index + len("Answer:"):].strip()
            if idx < len_questions:
                answers_list.append(answer_text)
            else:  
                unrelated_answers_list.append(answer_text)
            
        else:
            print("Answer not found.")
            
    if noise != 0:
        new_params = old_params
        mt.model.load_state_dict(old_params)

    return answers_list, unrelated_answers_list
    


In [9]:
# Copy model params
global old_params, new_params

old_params = copy.deepcopy(mt.model.state_dict())
new_params = copy.deepcopy(mt.model.state_dict())
#new_params

In [10]:
def random_select_except(lst, n, exclude_index):
    # 排除指定位置的元素
    candidates = [elem for i, elem in enumerate(lst) if i != exclude_index]
    # 从候选元素中随机选择 n 个
    selected = random.sample(candidates, n)
    return selected

In [13]:
from openpyxl import Workbook
import re

# 创建一个工作簿
wb = Workbook()
ws = wb.active
# 设置列名
ws.append(["id", "Concept", "bleu_score", "unrelated_QA_bleu_score", "rouge_l_score","unrelated_QA_rouge_l_score","bert_f_score", "unrelated_bert_f_score","original_answers","perturbed_answers", "original_unrelated_answers","perturbed_unrelated_answers"])

In [21]:
with open("/root/Unlearn_Harry_Potter/test_data/llama2-7b_Concepts_list_QA.json", "r", encoding="utf-8") as file:
    concepts_list = json.load(file)

#concepts = ["Amazon Alexa", "Harry Potter", "The Lord of the Rings", "Super Mario", "Star Wars", "William Shakespeare", "Sherlock Holmes", "Netflix", "Blockchain", "Satan"]
#concepts = ["William Shakespeare", "Netflix",  "Satan"]
#concepts_for_random_sample = ["William Shakespeare", "Netflix",  "Satan"]

list(dict.values())[0]

for idx,concept in enumerate(concepts_list):
    bleu_scores = []
    rouge_l_scores = []
    bert_f_scores = []
    unrelated_bleu_scores = []
    unrelated_rouge_l_scores = []
    unrelated_bert_f_scores = []
    print("idx: ",idx)
    
    
    concept_name = list(concept.keys())[0]
    
    concept_content = concept[concept_name]
    dimension, layer = concept_content['dimension'], concept_content['layer']
    Questions = concept['QA']

    # concepts_for_random_sample.remove(concept)
    # random_selection = random.sample(concepts_for_random_sample, 2)
    # concepts_for_random_sample.append(concept)

    Questions_unrelated = []

    random_selection = random_select_except(concepts_list, 5, idx)
    
    for selection in random_selection:
        Questions_unrelated += selection['QA']
     
    original_answers, original_unrelated_answers = answers_generate(Questions, Questions_unrelated, noise = 0, location = [dimension, layer])
    perturbed_answers, perturbed_unrelated_answers = answers_generate(Questions, Questions_unrelated, noise = 0.1, location = [dimension, layer])
    #perturbed_answers_big = answers_generate(Questions, noise = 0.3)

    for perturbed_answer, original_answer in zip(perturbed_answers, original_answers):
        bleu_scores.append(calculate_bleu(perturbed_answer, original_answer))
        rouge_l_scores.append(calculate_rouge_l(perturbed_answer, original_answer))
        bert_f_scores.append(calculate_bert_f(perturbed_answer, original_answer))

   
    for perturbed_unrelated_answer, original_unrelated_answer in zip(perturbed_unrelated_answers, original_unrelated_answers):
        unrelated_bleu_scores.append(calculate_bleu(perturbed_unrelated_answer, original_unrelated_answer))
        unrelated_rouge_l_scores.append(calculate_rouge_l(perturbed_unrelated_answer, original_unrelated_answer))
        unrelated_bert_f_scores.append(calculate_bert_f(perturbed_unrelated_answer, original_unrelated_answer))
    

    bleu_score = statistics.mean(bleu_scores)  
    rouge_l_score = statistics.mean(rouge_l_scores)  
    
    bert_f_scores = [tensor.item() for tensor in bert_f_scores]
    bert_f_score = statistics.mean(bert_f_scores)  

    unrelated_bleu_score = statistics.mean(unrelated_bleu_scores)  
    unrelated_rouge_l_score = statistics.mean(unrelated_rouge_l_scores) 

    unrelated_bert_f_scores = [tensor.item() for tensor in unrelated_bert_f_scores]
    unrelated_bert_f_score = statistics.mean(unrelated_bert_f_scores) 
    
    print(f"Concept: {concept_name} Validation: ", f"bleu_score: {bleu_score} ", f"rouge_l_score: {rouge_l_score} ", f"bert_f_score: {bert_f_score} ")
    print(f"Concept: {concept_name} Validation: ", f"unrelated_bleu_score: {unrelated_bleu_score} ", f"unrelated_rouge_l_score: {unrelated_rouge_l_score} ", f"unrelated_bert_f_score: {unrelated_bert_f_score} ")
    #print(f"Concept: {concept_name} Validation: ", f"bleu_score: {bleu_score} ", f"rouge_l_score: {rouge_l_score} ")
    #print(f"Concept: {concept_name} Validation: ", f"unrelated_bleu_score: {unrelated_bleu_score} ", f"unrelated_rouge_l_score: {unrelated_rouge_l_score} ")
    row_data = [idx, concept_name, bleu_score, unrelated_bleu_score, rouge_l_score, unrelated_rouge_l_score, bert_f_score, unrelated_bert_f_score,str(original_answers),str(perturbed_answers), str(original_unrelated_answers),str(perturbed_unrelated_answers)]
    ws.append(row_data)    
 
wb.save("llama2_7b_validation_full.xlsx")    

idx:  0
Concept: Amazon Alexa Validation:  bleu_score: 0.1545177128161785  rouge_l_score: 0.43027458890197817  bert_f_score: 0.6305558323860169 
Concept: Amazon Alexa Validation:  unrelated_bleu_score: 0.9642955384110983  unrelated_rouge_l_score: 0.9760154929625888  unrelated_bert_f_score: 0.9894608133114301 
idx:  1
Concept: Harry Potter Validation:  bleu_score: 0.08904823809125229  rouge_l_score: 0.33614462588990623  bert_f_score: 0.5187995582818985 
Concept: Harry Potter Validation:  unrelated_bleu_score: 0.9862258774186325  unrelated_rouge_l_score: 0.9908610234516092  unrelated_bert_f_score: 0.9964558704226625 
idx:  2
Concept: The Lord of the Rings Validation:  bleu_score: 0.16832159356046844  rouge_l_score: 0.37594557180689797  bert_f_score: 0.5487277895212174 
Concept: The Lord of the Rings Validation:  unrelated_bleu_score: 0.9462932044284492  unrelated_rouge_l_score: 0.963798416356625  unrelated_bert_f_score: 0.9807807385921479 
idx:  3
Concept: Super Mario Validation:  bleu_s

### Plots for Concepts Validation

In [12]:
import random

def random_select_except(lst, n, exclude_index):
    # 排除指定位置的元素
    candidates = [elem for i, elem in enumerate(lst) if i != exclude_index]
    # 从候选元素中随机选择 n 个
    selected = random.sample(candidates, n)
    return selected

# 示例用法
my_list = [1, 2, 3, 4, 5]
str(
exclude_index = 2  # 排除第三个位置的元素
selected_elements = random_select_except(my_list, 2, exclude_index)
print(selected_elements)


[5, 2]


In [20]:
my_list = [1, 2, 3, 4, 5]
str(my_list)

'[1, 2, 3, 4, 5]'