# Text Match

In [None]:
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    Trainer,
    TrainingArguments,
)

from datasets import load_dataset
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "2"
os.environ["NCCL_P2P_DISABLE"] = "1"
os.environ["NCCL_IB_DISABLE"] = "1"

In [None]:
dataset = load_dataset("json", data_files="./train_pair_1w.json", split="train")

In [None]:
dataset[0]

In [None]:
datasets = dataset.train_test_split(test_size=0.2)

## Preprocess Data

In [None]:
import torch

tokenizer = AutoTokenizer.from_pretrained("../chinese-macbert-base")


def process_function(examples):
    sentences = []
    labels = []
    for sen1, sen2, label in zip(
        examples["sentence1"], examples["sentence2"], examples["label"]
    ):
        sentences.append(sen1)
        sentences.append(sen2)
        labels.append(1 if int(label) == 1 else -1)
    # input_ids, attention_mask, token_type_ids
    tokenized_examples = tokenizer(
        sentences, max_length=128, truncation=True, padding="max_length"
    )
    tokenized_examples = {
        k: [v[i : i + 2] for i in range(0, len(v), 2)]
        for k, v in tokenized_examples.items()
    }
    tokenized_examples["labels"] = labels
    return tokenized_examples


tokenized_datasets = datasets.map(
    process_function, batched=True, remove_columns=datasets["train"].column_names
)

In [None]:
print(tokenized_datasets["train"][0])

In [None]:
print("Train dataset sample:")
sample = tokenized_datasets["train"][0]
for key, value in sample.items():
    if key == "labels":
        print(f"{key}: {value}")
    else:
        print(f"{key}: shape={len(value)} x {len(value[0])}")
        print(f"  First sentence length: {len(value[0])}")
        print(f"  Second sentence length: {len(value[1])}")
        break  

## Model Training

In [None]:
from transformers import BertForSequenceClassification, BertPreTrainedModel, BertModel
from typing import Optional
from transformers.configuration_utils import PretrainedConfig
from torch.nn import CosineSimilarity, CosineEmbeddingLoss
import torch


class DualModel(BertPreTrainedModel):

    def __init__(self, config: PretrainedConfig, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)
        self.bert = BertModel(config)
        self.post_init()

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ):
        return_dict = (
            return_dict if return_dict is not None else self.config.use_return_dict
        )

        senA_input_ids, senB_input_ids = input_ids[:, 0], input_ids[:, 1]
        senA_attention_mask, senB_attention_mask = (
            attention_mask[:, 0],
            attention_mask[:, 1],
        )
        senA_token_type_ids, senB_token_type_ids = (
            token_type_ids[:, 0],
            token_type_ids[:, 1],
        )

        senA_outputs = self.bert(
            senA_input_ids,
            attention_mask=senA_attention_mask,
            token_type_ids=senA_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        senA_pooled_output = senA_outputs[1]  # [batch, hidden]

        senB_outputs = self.bert(
            senB_input_ids,
            attention_mask=senB_attention_mask,
            token_type_ids=senB_token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        senB_pooled_output = senB_outputs[1]  # [batch, hidden]


        cos = CosineSimilarity()(senA_pooled_output, senB_pooled_output)  # [batch, ]


        loss = None
        if labels is not None:
            loss_fct = CosineEmbeddingLoss(0.3)
            loss = loss_fct(senA_pooled_output, senB_pooled_output, labels)

        output = (cos,)
        return ((loss,) + output) if loss is not None else output


model = DualModel.from_pretrained("../chinese-macbert-base")

In [None]:
import evaluate

metric_accuracy = evaluate.load("../evaluate/metrics/accuracy/accuracy.py")
metric_f1 = evaluate.load("../evaluate/metrics/f1/f1.py")

In [None]:
import numpy as np

def compute_metrics(pred):
    predictions, labels = pred
    predictions = np.asarray(predictions).flatten()
    predictions = [int(float(p) > 0.7) for p in predictions]
    labels = [int(l > 0) for l in labels]

    acc = metric_accuracy.compute(predictions=predictions, references=labels)
    f1 = metric_f1.compute(predictions=predictions, references=labels)
    acc.update(f1)
    return acc

In [None]:
args = TrainingArguments(
    output_dir="./models_dual",
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    logging_steps=10,
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=3,
    learning_rate=2e-5,
    weight_decay=0.01,
    metric_for_best_model="f1",
    load_best_model_at_end=True
    )

In [None]:
from transformers import DataCollatorWithPadding

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["test"],
    data_collator=DataCollatorWithPadding(tokenizer=tokenizer),
    compute_metrics=compute_metrics,
)

In [None]:
trainer.train()