# prerequisite
- [Register LLaMA / HuggingFace LLaMA](https://huggingface.co/blog/llama2)
    - Add LLaMA access token to *DRIVE_PATH*/hf/token.txt
- Place confouders.parquet under *DRIVE_PATH*/annotation
- Run [image2text.py](https://github.com/HireTheHero/MemesModalityEvaluation/blob/main/script/blip2/image2text.py) to get BLIP2 captions and place under *DRIVE_PATH*/annotation
- meta_result.csv should be placed under *DRIVE_PATH*/hf for reproducing meta learning analysis

- All set!

## reference
- [LLaMAForCausalLM](https://huggingface.co/blog/how-to-generate)
- [pipeline](https://huggingface.co/blog/llama2)

# init

## google drive

In [None]:
# mount
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
# variables
# token_path = "/content/drive/MyDrive/vilio/token.txt"
hf_token_path = "/content/drive/MyDrive/hf/token.txt"

In [None]:
class TokenNotLoadedException(Exception):
    pass
def load_token(path):
    token = None
    with open(path, "r") as f:
        token = f.readline()
    if token is not None:
        print("Loaded.\nNote that the verification for actual token is not implemented for security reason. You're on your own for that.")
    else:
        raise TokenNotLoadedException("Check your token and/or path")
    return token

In [None]:
# token = load_token(token_path)
hf_token = load_token(hf_token_path)

## env setup

In [None]:
%%writefile requirements.txt
accelerate==0.21.0
einops==0.6.0
sentencepiece==0.1.99
transformers==4.31.0
xformers==0.0.20
optuna==3.3.0

In [None]:
!pip install -r requirements.txt

In [None]:
!huggingface-cli login --token=$hf_token

## github

In [None]:
!git clone https://github.com/fedebotu/clone-anonymous-github
!cd clone-anonymous-github; pip install -r requirements.txt

In [None]:
# SCRIPT_ROOT = "/content"
# URL = "https://anonymous.4open.science/r/MemesModalityEvaluation-2540"
SCRIPT_PATH = "/content/MemesModalityEvaluation-2540"

In [None]:
!rm -rf $SCRIPT_PATH

In [None]:
!cd clone-anonymous-github; python src/download.py \
    --url https://anonymous.4open.science/r/MemesModalityEvaluation-2540 \
    --save_dir /content

In [None]:
!cd $SCRIPT_PATH; bash shell/hf_overwrite_scripts.sh

## modules and variables

In [None]:
import os
import shutil
import time

from IPython.display import clear_output
import numpy as np
import pandas as pd
from PIL import Image
import torch
from transformers import AutoTokenizer, LlamaForCausalLM
import transformers

In [None]:
MDOEL = "meta-llama/Llama-2-13b-chat-hf"
LARGE = "meta-llama/Llama-2-70b-chat-hf"

In [None]:
DRIVE_PATH = "/content/drive/MyDrive"
DATA_DIR = "/content/hm"
PROMPT_DIR = "/content/prompts"
RESULT_DIR = "/content/results"
CONFIG_DIR = "/content/MemesModalityEvaluation/script/llama/hf"
SAVE_DIR_FIN = f"{DRIVE_PATH}/hf"
IMAGE_DIR = f"{DATA_DIR}/hateful_memes/img"

## load data

In [None]:
!mkdir $DATA_DIR

In [None]:
# blip2 captions / confounders
!cp $DRIVE_PATH/annotation/confounders.parquet $DATA_DIR
!cp $DRIVE_PATH/annotation/hm_captions.parquet $DATA_DIR

In [None]:
%%capture
# hateful memes from MyDrive
!unzip $DRIVE_PATH/vilio/hateful_memes.zip -d $DATA_DIR

# run scripts

In [None]:
# data preparation
!rm -rf $PROMPT_DIR
!mkdir $PROMPT_DIR
!python $SCRIPT_PATH/script/llama/prompt_extraction.py \
    --caption_dir $DATA_DIR \
    --meme_dir $DATA_DIR/hateful_memes \
    --conf_dir $DATA_DIR \
    --save_dir $PROMPT_DIR

In [None]:
# experiment
!rm -rf $RESULT_DIR
!mkdir $RESULT_DIR
!python $SCRIPT_PATH/script/llama/hf/few_shot_generation.py \
    --save_path $RESULT_DIR \
    --prompts_path $PROMPT_DIR \
    --config_path $CONFIG_DIR \
    --max_seq_len 2000

In [None]:
# collect result
!python $SCRIPT_PATH/script/llama/hf/result_collection.py \
    --result_path $RESULT_DIR \
    --prompt_path $PROMPT_DIR \
    --save_path $SAVE_DIR_FIN

In [None]:
# meta-learning analysis
!rm -rf $SAVE_DIR_FIN/atts
!mkdir $SAVE_DIR_FIN/atts
!python $SCRIPT_PATH/script/llama/hf/meta_gradient.py \
    --result_path $RESULT_DIR \
    --prompt_path $PROMPT_DIR \
    --save_path $SAVE_DIR_FIN

# evaluation

## check content

In [None]:
df_result = pd.read_csv(f"{SAVE_DIR_FIN}/extracted_info.csv").sort_values(by=["image_id", "few_shot_num"], ascending=True).reset_index(drop=True)
df_result.head(10)

In [None]:
print(len(df_result))

In [None]:
image_id="07653"
image_ids = df_result["image_id"].apply(lambda x: x.split("_")[0])
df_img = df_result[image_ids==image_id].copy()
image_info = df_img["image_id"].values[0]
print(image_info)
im = Image.open(f"{IMAGE_DIR}/{image_id}.png")
print(df_result["prompt"].values[0])
print("=======================")
print(df_result["extracted_info"].values[0])
print("=======================")
display(im)

# annotation

## init

In [None]:
# import os
# import time

# from IPython.display import clear_output
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score

In [None]:
DRIVE_PATH = "/content/drive/MyDrive"
SAVE_DIR_FIN = f"{DRIVE_PATH}/hf"

## annotation

In [None]:
# version,version_next="v2_10","v2_11"

In [None]:
# df_result = pd.read_csv(f"{SAVE_DIR_FIN}/extracted_info.csv").sort_values(by=["image_id", "few_shot_num"], ascending=True).reset_index(drop=True)
# df_result.head()

In [None]:
# n_samples = len(df_result)
# out_name = f"{SAVE_DIR_FIN}/df_meta_{version}.csv"
# cols = ["image_id", "few_shot_num", "ans_num", "is_functional", "is_formatted", "is_hateful", "is_sarcastic", "is_benign", "ground_truth", "image_info", "prompt", "extracted_info", "generated_text"]
# if os.path.isfile(out_name):
#     df_labeled = pd.read_csv(out_name)[cols]
#     is_first = 0
#     label_dict = df_labeled.to_dict("list")
#     completed = (df_labeled["image_id"].astype(str).str.zfill(5)+"_"+df_labeled["few_shot_num"].astype(str)).tolist()
# else:
#     is_first = 1
#     label_dict = {col: [] for col in cols}
#     processed_id_shots = []
# # for idx in range(n_samples):
# for idx in range(n_samples):
#     # metadata extraction
#     image_info = df_result["image_id"].values[idx]
#     few_shot_num = df_result["few_shot_num"].values[idx]
#     image_id = image_info.split("_")[0]
#     if not is_first and f"{image_id}_{few_shot_num}" in completed:
#         continue
#     print(f"Image id with info {image_info} shot #{few_shot_num}: Sample #{idx} out of {n_samples}")
#     pos_idx = image_info.split("_pos_")[0].split("_")[1:]
#     max_idx = image_info.split("_max_")[-1]
#     # text extraction
#     prompt = df_result["prompt"].values[idx].replace("\n\n\n", "")
#     extracted = df_result["extracted_info"].values[idx].replace("\n\n\n", "")
#     generated = df_result["generated_text"].values[idx].replace("\n\n\n", "")
#     print(f"Prompt: \n{prompt}")
#     print("===========================")
#     print(f"Extracted: \n{extracted}")
#     print("===========================")
#     is_functional = int(input("Is properly answered?: "))
#     if is_functional:
#         is_formatted = int(input("Is formatted like 'Most likely xx sample is...'?: "))
#     for idx2 in range(int(max_idx)+1):
#         print([idx2, pos_idx, str(idx2) in pos_idx])
#         print(f"Image-caption pair #{idx2}")
#         # basic info
#         label_dict["image_id"].append(image_id)
#         label_dict["few_shot_num"].append(few_shot_num)
#         label_dict["ans_num"].append(idx2)
#         label_dict["image_info"].append(image_info)
#         label_dict["prompt"].append(prompt)
#         label_dict["extracted_info"].append(extracted)
#         label_dict["generated_text"].append(generated)
#         # label detection
#         if str(idx2) in pos_idx:
#             label_dict["ground_truth"].append(1)
#             print("Ground-truth label should be hateful")
#         else:
#             label_dict["ground_truth"].append(0)
#             print("Ground-truth label should be benign")
#         # annotation
#         if not is_functional:
#             # auto-label 0
#             label_dict["is_functional"].append(0)
#             label_dict["is_formatted"].append(0)
#             label_dict["is_hateful"].append(0)
#             label_dict["is_sarcastic"].append(0)
#             label_dict["is_benign"].append(0)
#         else:
#             # manual annotation
#             is_hateful = int(input("Is labeled hateful?: "))
#             if not is_hateful:
#                 is_sarcastic = int(input("Is labeled sarcastic?: "))
#             else:
#                 is_sarcastic = 0
#             label_dict["is_functional"].append(is_functional)
#             label_dict["is_formatted"].append(is_formatted)
#             label_dict["is_hateful"].append(is_hateful)
#             label_dict["is_sarcastic"].append(is_sarcastic)
#             label_dict["is_benign"].append(int(not (is_hateful or is_sarcastic)))
#     time.sleep(5)
#     clear_output(True)


In [None]:
# df_labeled = pd.DataFrame(label_dict)
# df_labeled["image_id"] = df_labeled["image_id"].astype(str).str.zfill(5)
# df_labeled.head()

In [None]:
# out_name = f"{SAVE_DIR_FIN}/df_meta_{version_next}.csv"
# df_labeled = pd.DataFrame(label_dict)
# df_labeled.to_csv(out_name, index=False)

# explain by attention weights

## init

In [None]:
import os

import lightgbm as lgb
import matplotlib.pyplot as plt
import numpy as np
import optuna.integration.lightgbm as opt_lgb
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, roc_auc_score
import torch
import torch.nn.functional as F

%matplotlib inline

In [None]:
DRIVE_PATH = "/content/drive/MyDrive"
SAVE_DIR_FIN = f"{DRIVE_PATH}/hf"
ATTS_PATH = f"{SAVE_DIR_FIN}/atts"
# version = "v2_11"
# out_name = f"{SAVE_DIR_FIN}/df_meta_{version}.csv"
out_name = f"{SAVE_DIR_FIN}/meta_result.csv"
gt_cols = ['is_functional', 'is_formatted', 'ground_truth']
RS = 1991

## data prep

In [None]:
df_labeled = pd.read_csv(out_name)
df_labeled.head()

In [None]:
df_labeled["is_hateful_or_sarcastic"] = (np.logical_or(df_labeled["is_hateful"], df_labeled["is_sarcastic"])).astype(int)
df_labeled["is_few_shot"] = 0
df_labeled.loc[df_labeled["few_shot_num"] != 0, "is_few_shot"] = 1

In [None]:
df_scope = df_labeled[df_labeled["few_shot_num"].isin(range(3))].reset_index(drop=False)
print(len(df_scope))

In [None]:
def preprocess_att(att_file, modality, mx_dim):
    att = torch.load(att_file)
    att_filled = F.pad(
        input=att,
        pad=(0,mx_dim[modality]-len(att)),
        mode='constant', value=0
    )
    return att_filled

def load_attentions(df,
                    atts_path = ATTS_PATH,
                    mx_dim = {"caption": 100, "image": 172, "cross": 212},
                    mx_shots = 2,
                    modalities = ["caption", "image", "cross"]):
    image_infos = df['image_info'].values
    att_samples = []
    for image_info in image_infos:
        att_sample = []
        image_dir = f"{atts_path}/{image_info}"
        assert os.path.isdir(image_dir)
        for modality in modalities:
            zsl_file = f"{image_dir}/{modality}_zsl.pt"
            att_zsl_filled = preprocess_att(zsl_file, modality, mx_dim)
            att_sample.append(att_zsl_filled)
            for shot in range(mx_shots-1):
                shot_file = f"{image_dir}/{modality}_delta_{shot+1}.pt"
                att_shot_filled = preprocess_att(shot_file, modality, mx_dim)
                att_sample.append(att_shot_filled)
        att_sample = torch.cat(att_sample)
        att_samples.append(att_sample)
    out = torch.stack(att_samples)
    return out


In [None]:
col = gt_cols[0]
print(col)

In [None]:
atts = load_attentions(df_scope)
atts.shape

In [None]:
atts_arr = atts.numpy()
print(atts_arr.shape)

In [None]:
y = df_scope[col].to_numpy()

In [None]:
X_train_eval, X_test, y_train_eval, y_test = train_test_split(atts_arr, y, random_state=RS, test_size=0.3)
X_train, X_eval, y_train, y_eval = train_test_split(X_train_eval, y_train_eval, random_state=RS, test_size=0.2)
print(y_train.shape, y_eval.shape, y_test.shape)

## classification

In [None]:
params = {
    'verbose': -1,
    'task': 'train',
    'boosting_type': 'gbdt',
    'objective': 'binary',
    'metric': 'binary_logloss',
    'learning_rate': 0.1,
    'seed': RS,
    'deterministic':True,
    'force_row_wise':True
}

In [None]:
lgb_train = opt_lgb.Dataset(X_train, y_train)
lgb_valid = opt_lgb.Dataset(X_eval, y_eval, reference=lgb_train)
lgb_test = opt_lgb.Dataset(X_test, y_test, reference=lgb_train)
lgb_result = {}
model = opt_lgb.LightGBMTuner(
    params=params,
    train_set=lgb_train,
    valid_sets=[lgb_train, lgb_valid],
    valid_names=['Train', 'Valid'],
    num_boost_round=500,
    early_stopping_rounds=5,
    evals_result=lgb_result,
    verbosity=-1,
    verbose_eval=-1,
    optuna_seed=RS,
)

In [None]:
model.run()
print(model.get_best_booster().params)

## performance

In [None]:
y_pred_proba = model.get_best_booster().predict(X_test)
fpr, tpr, _ = roc_curve(y_test,  y_pred_proba)
auc = roc_auc_score(y_test, y_pred_proba)
print("AUC="+str(auc))
plt.plot(fpr,tpr,label="AUC="+str(auc))
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.legend(loc=4)
plt.show()

## feature importance

In [None]:
def generate_feature_names(
    modalities = ["caption", "image", "cross"],
    mx_dim = {"caption": 100, "image": 172, "cross": 212},
    mx_shots = 2,
    ):
    out = []
    for modality in modalities:
        dim = mx_dim[modality]
        for i_dim in range(dim):
            out.append(f"{modality}_zsl_{i_dim}")
        for shot in range(mx_shots-1):
            for i_dim in range(dim):
               out.append(f"{modality}_delta_{shot+1}_{i_dim}")
    return out

In [None]:
feature_names = generate_feature_names()
importance = pd.DataFrame({'feature': feature_names, 'importance': model.get_best_booster().feature_importance()})
types = ['zsl', 'caption','cross','image']
for t in types:
    importance[f'is_{t}']= 0
    importance.loc[importance['feature'].str.contains(t), f'is_{t}']= 1

importance.to_csv(f"{SAVE_DIR_FIN}/{col}_optuna_importance_{RS}.csv", index=False)
importance.sort_values(by="importance", ascending=False).head()

In [None]:
# importance = pd.read_csv(f"{SAVE_DIR_FIN}/{col}_optuna_importance_{RS}.csv")
importance = importance[importance["importance"]>=1].reset_index(drop=True)
categories = ['is_zsl', 'is_caption', 'is_cross', 'is_image']
imp_grp = importance.groupby(categories)
imp_feat = imp_grp["feature"].nunique().reset_index(drop=False)
imp_sum = imp_grp["importance"].sum().reset_index(drop=False)
print(imp_feat)
print(imp_sum)
imp_feat.to_csv(f"{SAVE_DIR_FIN}/{col}_optuna_nunique_{RS}.csv", index=False)
imp_sum.to_csv(f"{SAVE_DIR_FIN}/{col}_optuna_occurrences_{RS}.csv", index=False)