<a href="https://colab.research.google.com/github/shahriarivari/Persian_sentiment_analysis/blob/main/Persian_sentiment_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install transformers
%pip install tokenizers
%pip install datasets

# BERT pre-training

## Initial imports

In [None]:
import os
import json
from tokenizers import BertWordPieceTokenizer
from transformers import BertForMaskedLM, BertConfig
from transformers import DataCollatorForLanguageModeling
from transformers import Trainer, TrainingArguments
from datasets import load_dataset
from transformers import TrainerCallback
import logging

## Importing dataset from huggingface hub

In [None]:
from datasets import load_dataset

# You should just change this part in order to download your
# parts of corpus.
indices = {
    "train": [5, 1, 2],
    "test": [0, 2]
}

N_FILES = {
    "train": 126,
    "test": 3
}
_BASE_URL = "https://huggingface.co/datasets/SLPL/naab/resolve/main/data/"
data_url = {
    "train": [_BASE_URL + "train-{:05d}-of-{:05d}.txt".format(x, N_FILES["train"]) for x in range(N_FILES["train"])],
    "test": [_BASE_URL + "test-{:05d}-of-{:05d}.txt".format(x, N_FILES["test"]) for x in range(N_FILES["test"])],
}
for index in indices['train']:
    assert index < N_FILES['train']
for index in indices['test']:
    assert index < N_FILES['test']
data_files = {
    "train": [data_url['train'][i] for i in indices['train']],
    "test": [data_url['test'][i] for i in indices['test']]
}
print(data_files)
dataset = load_dataset('text', data_files=data_files, use_auth_token=False)

## Setting paths and file names

In [None]:
# Set your paths and file names
data_files = ["path/to/your_dataset_file.txt"]
tokenizer_output_dir = "path/to/bert_tokenizer"
pretrained_model_output_dir = "path/to/bert_pretrained_model"

## Making files for trainging the tokenizer

In [None]:
# Extract text data from the dataset
texts = dataset['train']['text']  # Assuming you have a 'text' column in your dataset

# Save the text data to a temporary file
temp_file_path = 'temp_dataset_file.txt'
with open(temp_file_path, 'w', encoding='utf-8') as file:
    for text in texts:
        file.write(text + '\n')

## Training a WordPiece Tokenizer

In [None]:
# Training a WordPiece Tokenizer
files = [temp_file_path]
# Parameters for Tokenizer Training
vocab_size = 30000
min_frequency = 2
max_length = 128
special_tokens = ["[PAD]", "[MASK]", "[CLS]", "[SEP]", "[UNK]"]

# Initialize the WordPiece tokenizer for BERT
tokenizer = BertWordPieceTokenizer()

# Train the tokenizer
tokenizer.train(
    files=files,
    vocab_size=vocab_size,
    min_frequency=min_frequency,
    show_progress=True,
    special_tokens=special_tokens,
)

# Save the trained tokenizer
model_path = "pretrained_bert_tokenizer"
# make the directory if not already there
if not os.path.isdir(model_path):
    os.mkdir(model_path)

# Save the trained tokenizer
tokenizer.save_model(tokenizer_output_dir)

# Clean up: Remove the temporary file
os.remove(temp_file_path)

In [None]:
import json
# dumping some of the tokenizer config to config file,
# including special tokens, whether
# to lower case and the maximum sequence length

with open(os.path.join(model_path, "config.json"), "w") as f:
    tokenizer_cfg = {
        "do_lower_case": True,
        "unk_token": "[UNK]",
        "sep_token": "[SEP]",
        "pad_token": "[PAD]",
        "cls_token": "[CLS]",
        "mask_token": "[MASK]",
        "model_max_length": max_length,
        "max_len": max_length,
    }
    json.dump(tokenizer_cfg, f)

## tokenizing the dataset

In [None]:
# Load tokenizer after training
tokenizer = BertWordPieceTokenizer(f"{tokenizer_output_dir}/vocab.json", f"{tokenizer_output_dir}/merges.txt")

In [None]:
# Tokenizing the data using dataset.map()
def tokenize_function(examples):
    return tokenizer(examples['text'], padding=True, truncation=True)

tokenized_dataset = dataset.map(tokenize_function, batched=True)

## Model Configuration

In [None]:
# Model Configuration
model_config = BertConfig(
    vocab_size=vocab_size,
    hidden_size=768,  # Adjust as needed
    num_attention_heads=12,  # Adjust as needed
    num_hidden_layers=12,  # Adjust as needed
    max_position_embeddings=max_length,
)
# Model Initialization
model = BertForMaskedLM(config=model_config)

## Data Collator
Load your dataset using Hugging Face datasets library

In [None]:
# Data Collator for Language Modeling
class CustomDataCollator(DataCollatorForLanguageModeling):
  def collate_batch(self, batch):
    input_ids = torch.stack([torch.tensor(example['input_ids']) for example in batch])
    attention_mask = torch.stack([torch.tensor(example['attention_mask']) for example in batch])
    labels = torch.stack([torch.tensor(example['input_ids']) for example in batch])
    return {
      'input_ids': input_ids,
      'attention_mask': attention_mask,
      'labels': labels
    }


## Callback

In [None]:
# Define a custom callback for monitoring
class CustomCallback(TrainerCallback):
  def __init__(self):
    super().__init__()

  def on_step_end(self, args, state, control, model, optimizer, scheduler, **kwargs):
    if state.global_step % args.logging_steps == 0:
      logging.info(f"Step {state.global_step}: Loss = {state.log_metrics['loss']}")

## Training Arguments

In [None]:
# Training Arguments
training_args = TrainingArguments(
    output_dir=pretrained_model_output_dir,
    overwrite_output_dir=True,
    num_train_epochs=3,
    per_device_train_batch_size=4,
    save_steps=1000,
    save_total_limit=3,
    evaluation_strategy="steps",
    eval_steps=500,
)

In [None]:
# Tokenized data for training
train_inputs = {
    "input_ids": tokenized_dataset["input_ids"],
    "attention_mask": tokenized_dataset["attention_mask"],
}

## Trainer Initialization

In [None]:
# Trainer Initialization
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_inputs,
    data_collator=CustomDataCollator(tokenizer=tokenizer, mlm=True, mlm_probability=0.15),
    callbacks=[CustomCallback()],
)

In [None]:
# Train the model
trainer.train()

# Save the final pre-trained model
trainer.save_model(os.path.join(pretrained_model_output_dir, "final_model"))

# XLnet pre-training

## Initial imports

In [None]:
import os
from tokenizers import XLNetWordPieceTokenizer
from transformers import XLNetLMHeadModel, XLNetConfig
from transformers import DataCollatorForLanguageModeling
from transformers import Trainer, TrainingArguments
from datasets import load_dataset
from transformers import TrainerCallback
from torch.utils.data import DataLoader
import torch
import logging

## Importing dataset from huggingface hub

In [None]:
from datasets import load_dataset

# You should just change this part in order to download your
# parts of corpus.
indices = {
    "train": [5, 1, 2],
    "test": [0, 2]
}

N_FILES = {
    "train": 126,
    "test": 3
}
_BASE_URL = "https://huggingface.co/datasets/SLPL/naab/resolve/main/data/"
data_url = {
    "train": [_BASE_URL + "train-{:05d}-of-{:05d}.txt".format(x, N_FILES["train"]) for x in range(N_FILES["train"])],
    "test": [_BASE_URL + "test-{:05d}-of-{:05d}.txt".format(x, N_FILES["test"]) for x in range(N_FILES["test"])],
}
for index in indices['train']:
    assert index < N_FILES['train']
for index in indices['test']:
    assert index < N_FILES['test']
data_files = {
    "train": [data_url['train'][i] for i in indices['train']],
    "test": [data_url['test'][i] for i in indices['test']]
}
print(data_files)
dataset = load_dataset('text', data_files=data_files, use_auth_token=False)

## Setting paths and file names

In [None]:
# Set your paths and file names
data_files = ["path/to/your_dataset_file.txt"]
tokenizer_output_dir = "path/to/xlnet_tokenizer"
pretrained_model_output_dir = "path/to/xlnet_pretrained_model"

## Making files for trainging the tokenizer

In [None]:
# Extract text data from the dataset
texts = dataset['train']['text']  # Assuming you have a 'text' column in your dataset

# Save the text data to a temporary file
temp_file_path = 'temp_dataset_file.txt'
with open(temp_file_path, 'w', encoding='utf-8') as file:
    for text in texts:
        file.write(text + '\n')

# Training a WordPiece Tokenizer
files = [temp_file_path]

## Training a wordPiece Tokenizer

In [None]:
# Training a WordPiece Tokenizer
files = [temp_file_path]

# Parameters for Tokenizer Training
vocab_size = 30000
min_frequency = 2
max_length = 128
special_tokens = ["<pad>", "<mask>", "<cls>", "<sep>", "<unk>"]

# Initialize the WordPiece tokenizer for XLNet
tokenizer = XLNetWordPieceTokenizer()

# Train the tokenizer
tokenizer.train(
    files=files,
    vocab_size=vocab_size,
    min_frequency=min_frequency,
    show_progress=True,
    special_tokens=special_tokens,
)

# Save the trained tokenizer
tokenizer.save_model(tokenizer_output_dir)

# Save the trained tokenizer
model_path = "pretrained_xlnet_tokenizer"
# make the directory if not already there
if not os.path.isdir(model_path):
    os.mkdir(model_path)

# Save the trained tokenizer
tokenizer.save_model(tokenizer_output_dir)

# Clean up: Remove the temporary file
os.remove(temp_file_path)

## Tokenizing the dataset

In [None]:
# Load tokenizer after training
tokenizer = XLNetWordPieceTokenizer(f"{tokenizer_output_dir}/vocab.json", f"{tokenizer_output_dir}/merges.txt")

In [None]:
# Tokenizing the data using dataset.map()
def tokenize_function(examples):
    return tokenizer(examples['text'], padding=True, truncation=True)

tokenized_dataset = dataset.map(tokenize_function, batched=True)

## Model Configuration

In [None]:
# Model Configuration
model_config = XLNetConfig(
    vocab_size=vocab_size,
    d_model=768,  # Adjust as needed
    n_head=12,  # Adjust as needed
    num_layers=12,  # Adjust as needed
    max_position_embeddings=max_length,
)

# Model Initialization
model = XLNetLMHeadModel(config=model_config)

## Data Collator

In [None]:
# Data Collator for Language Modeling
class CustomDataCollator(DataCollatorForLanguageModeling):
  def collate_batch(self, batch):
    input_ids = torch.stack([torch.tensor(example['input_ids']) for example in batch])
    attention_mask = torch.stack([torch.tensor(example['attention_mask']) for example in batch])
    labels = torch.stack([torch.tensor(example['input_ids']) for example in batch])
    return {
      'input_ids': input_ids,
      'attention_mask': attention_mask,
      'labels': labels
    }


## Callback

In [None]:
# Define a custom callback for monitoring
class CustomCallback(TrainerCallback):
  def __init__(self):
    super().__init__()

  def on_step_end(self, args, state, control, model, optimizer, scheduler, **kwargs):
    if state.global_step % args.logging_steps == 0:
      logging.info(f"Step {state.global_step}: Loss = {state.log_metrics['loss']}")

## Training Arguments

In [None]:
# Training Arguments
training_args = TrainingArguments(
    output_dir=pretrained_model_output_dir,
    overwrite_output_dir=True,
    num_train_epochs=3,
    per_device_train_batch_size=4,
    save_steps=1000,
    save_total_limit=3,
    evaluation_strategy="steps",
    eval_steps=500,
)

In [None]:
# Tokenized data for training
train_inputs = {
    "input_ids": tokenized_dataset["input_ids"],
    "attention_mask": tokenized_dataset["attention_mask"],
}

## Trainer Initialazation

In [None]:
# Trainer Initialization
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_inputs,
    data_collator=CustomDataCollator(tokenizer=tokenizer, mlm=True, mlm_probability=0.15),
    callbacks=[CustomCallback()],
)

In [None]:
# Train the model
trainer.train()

# Save the final pre-trained model
trainer.save_model(os.path.join(pretrained_model_output_dir, "final_model"))