# Install packages and download dataset

In [None]:
# https://huggingface.co/google-bert/bert-base-multilingual-cased

In [None]:
!pip install --upgrade pip
!pip install sentencepiece
!pip install datasets
!pip install transformers

In [None]:
!pip install transformers[torch]

In [None]:
!pip install optuna

In [None]:
!pip install accelerate -U

# Prepare dataset

In [None]:
# Download the combined training data
# Please ensure the combined_training_esen.json file is in the data directory
# You can generate this file using the data processing notebook

In [None]:
import pandas as pd

# Load JSON data directly into a DataFrame
df = pd.read_json('./data/combined_training_esen.json')
len(df)

# Display the first few rows of the DataFrame to verify
print(df.head())


# Fine-tuning seperately

## clean train and test dataset for training

In [None]:
# len(X_english_test)

In [None]:
X = df['text'].values
y = df['label'].values.astype(int)

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
len(X)

In [None]:
from transformers import AutoModelForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback
import numpy as np
from datasets import load_dataset, load_metric
import transformers
from transformers import AutoTokenizer
import optuna

In [None]:
num_labels = len(set(y))

In [None]:
num_labels

In [None]:
MODEL = "dccuchile/bert-base-spanish-wwm-cased"
MAX_TRAINING_EXAMPLES = -1

In [None]:
tokenizer = AutoTokenizer.from_pretrained(MODEL, use_fast=True)

In [None]:
train_encodings = tokenizer(X_train.tolist(), truncation=True, padding=True)
val_encodings = tokenizer(X_test.tolist(), truncation=True, padding=True)

In [None]:
import torch
from torch.utils.data import Dataset

class MyDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):

        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)


In [None]:
train_dataset = MyDataset(train_encodings, y_train)
val_dataset = MyDataset(val_encodings, y_test)

In [None]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    acc = accuracy_score(labels, predictions)

    f1_pos = f1_score(labels, predictions, average='binary', pos_label=1)

    precision_pos = precision_score(labels, predictions, pos_label=1)
    recall_pos = recall_score(labels, predictions, pos_label=1)

    precision_macro = precision_score(labels, predictions, average='macro')
    recall_macro = recall_score(labels, predictions, average='macro')
    f1_macro = f1_score(labels, predictions, average='macro')

    precision_micro = precision_score(labels, predictions, average='micro')
    recall_micro = recall_score(labels, predictions, average='micro')
    f1_micro = f1_score(labels, predictions, average='micro')

    conf_matrix = confusion_matrix(labels, predictions)

    return {
        "accuracy": acc,
        "f1_score_positive": f1_pos,
        "precision_positive": precision_pos,
        "recall_positive": recall_pos,
        "precision_macro": precision_macro,
        "recall_macro": recall_macro,
        "f1_macro": f1_macro,
        "precision_micro": precision_micro,
        "recall_micro": recall_micro,
        "f1_micro": f1_micro,
        "confusion_matrix": conf_matrix.tolist()
    }



def objective(trial):

    def model_init():
        model = AutoModelForSequenceClassification.from_pretrained(MODEL, num_labels=num_labels)
        model.classifier.dropout = torch.nn.Dropout(trial.suggest_float('dropout_rate', 0, 0.5))  # Adjust dropout
        return model

    learning_rate = trial.suggest_float('learning_rate', 1e-5, 5e-5, log=True)
    num_train_epochs = trial.suggest_int('num_train_epochs', 3, 5)
    per_device_train_batch_size = trial.suggest_categorical('per_device_train_batch_size', [8, 16])
    warmup_steps = trial.suggest_int('warmup_steps', 0, 500)
    weight_decay = trial.suggest_float('weight_decay', 0.0, 0.3)

    args = TrainingArguments(
        output_dir='./results',
        learning_rate=learning_rate,
        num_train_epochs=num_train_epochs,
        per_device_train_batch_size=per_device_train_batch_size,
        per_device_eval_batch_size=per_device_train_batch_size,
        warmup_steps=warmup_steps,
        weight_decay=weight_decay,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model='accuracy',
        greater_is_better=True,
        logging_dir='./logs',
        logging_steps=10,

    )

    trainer = Trainer(
        model_init=model_init,
        args=args,
        train_dataset=train_dataset,
        eval_dataset=val_dataset,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
        callbacks = [EarlyStoppingCallback(early_stopping_patience=3)]
    )

    # Train model
    trainer.train()
    # Evaluate model
    eval_results = trainer.evaluate()
    print(f"Accuracy: {eval_results['eval_accuracy']}")
    print(f"F1 Score (Positive): {eval_results['eval_f1_score_positive']}")
    print(f"Macro F1 Score: {eval_results['eval_f1_macro']}")
    print(f"Micro F1 Score: {eval_results['eval_f1_micro']}")

    return eval_results['eval_accuracy']


In [None]:
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=15)

In [None]:
study.best_trial

In [None]:
study.best_params

In [None]:
args = TrainingArguments(
        output_dir='./results',
        learning_rate=study.best_params["learning_rate"],
        num_train_epochs=study.best_params["num_train_epochs"],
        per_device_train_batch_size=study.best_params["per_device_train_batch_size"],
        per_device_eval_batch_size=study.best_params["per_device_train_batch_size"],
        warmup_steps=study.best_params["warmup_steps"],
        weight_decay=study.best_params["weight_decay"],
        evaluation_strategy="epoch",
        save_strategy="epoch",
        metric_for_best_model='accuracy',
        logging_dir='./logs',
        logging_steps=10,

)

In [None]:
def model_init(trail=None):
        model = AutoModelForSequenceClassification.from_pretrained(MODEL, num_labels=num_labels)
        model.classifier.dropout = torch.nn.Dropout(study.best_params["dropout_rate"])
        return model

trainer = Trainer(
    model_init=model_init,
    args=args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    #callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

trainer.train()

In [None]:
trainer.evaluate()['eval_accuracy']

In [None]:
trainer.evaluate()

In [None]:
# Download test data
# Please ensure the test_data.json file is in the data directory

In [None]:
import pandas as pd
import json

file_path = './data/test_data.json'
with open(file_path, 'r') as file:
    data = [json.loads(line) for line in file]

df_test = pd.DataFrame(data)

print(len(df_test))
print(df_test.head())


In [None]:
X_test_final = df_test['combined_text'].values

In [None]:
import pandas as pd
import json

file_path = './data/test_data.json'
with open(file_path, 'r') as file:
    data = [json.loads(line) for line in file]

texts = [entry['combined_text'] for entry in data]
labels = [entry['gold'] for entry in data]

df_test = pd.DataFrame({
    'combined_text': texts,
    'gold': labels,
    'predictions': [0] * len(texts)
})

print(len(df_test))
print(df_test.head())


In [None]:
import json
from datasets import Dataset

file_path = './data/test_data.json'
with open(file_path, 'r') as file:
    data = [json.loads(line) for line in file]

texts = [entry['combined_text'] for entry in data]

dummy_labels = [0] * len(texts)
test_dict = dict({'text': texts, 'predictions': dummy_labels})
test_encodings = tokenizer(X_test_final.tolist(), truncation=True, padding=True)
test_dataset = MyDataset(test_encodings, test_dict['predictions'])

In [None]:
test_predictions = trainer.predict(test_dataset)

In [None]:
test_preds_raw, test_labels , _ = trainer.predict(test_dataset)
test_preds = np.argmax(test_preds_raw, axis=-1)
print(test_preds)

In [None]:
import numpy as np
import pandas as pd


df_predictions = pd.DataFrame({
    'text': texts,
    'predicted_label': test_preds,
    'gold_label': labels
})


df_predictions.to_csv('./results/google-bert-based-uncased-predictions-ablation.csv', index=False)

print("预测结果已经保存到 'predictions.csv' 文件.")


In [None]:
with open('./data/spanish_memes_test.json', 'r') as file:
    data_ids = json.load(file)

ids = [entry['id_EXIST'] for entry in data_ids.values()]
values = ["NO" if pred == 0 else "YES" if pred == 1 else None for pred in test_preds]

df_predictions = pd.DataFrame({
    'id': ids,
    'value': values,
    'test_case': 'EXIST2024'
})

df_predictions.to_json('./results/bert-based-uncased-es_test_submission.json', orient='records', lines=True, indent=2)


In [None]:
len(test_preds)

In [None]:
len(df_predictions)