In [None]:
!pip install transformers
!pip install evaluate
!pip install imblearn

In [None]:
from google.colab import files
uploaded = files.upload()

In [3]:
import numpy as np
import matplotlib.pyplot as plt
from transformers import AutoModelForSequenceClassification
from transformers import TrainingArguments, Trainer
import evaluate
import torch
import pandas as pd 
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler

In [4]:
"""
Return all data available (test and train).
"""
def get_all_data():
  # read excel in data frame 
  df = pd.read_excel('CLEAR_Corpus_6.01.xlsx') 
  
  # convert a data frame to a Numpy 2D array 
  my_data = np.asarray(df) 
  return my_data

"""
Return train data.
"""
def train_data():
  all_data = get_all_data()
  np.random.seed(123)
  np.random.shuffle(all_data)
  split_idx = int(0.7 * all_data.shape[0])
  print(split_idx)

  return all_data[0:split_idx, :]

"""
Return test data.
"""
def test_data():
  all_data = get_all_data()
  np.random.seed(123)
  np.random.shuffle(all_data)
  split_idx = int(0.7 * all_data.shape[0])

  return all_data[split_idx: , :]

"""
Return the array of text excerpts for training.
"""
def text_train_data():
  array = train_data()
  return array[:, 14]

"""
Return the array of text excerpts for testing.
"""
def text_test_data():
  array = test_data()
  return array[:, 14]

"""
Return MPAA ratings for training (numbers).
"""
def mpaa_train_data():
  array = train_data()
  return array[:, 12]

"""
Return MPAA ratings for testing (numbers).
"""
def mpaa_test_data():
  array = test_data()
  return array[:, 12]

In [5]:
# Random Under Sampling
def RUS(X, y):
    rus = RandomUnderSampler(sampling_strategy='auto')
    X_new, Y_new = rus.fit_resample(X, y.astype('int'))
    Y_new= Y_new.astype('int')
    return X_new, Y_new

# Random Over Sampling
def ROS(X, y):
    ros = RandomOverSampler(sampling_strategy='auto')
    X_new, Y_new = ros.fit_resample(X, y.astype('int'))
    Y_new= Y_new.astype('int')
    return X_new, Y_new

def resample(X, y, sample_type="Imbalanced"):
    if sample_type == 'Imbalanced':
        return X,y
    if sample_type == 'RUS':
        return RUS(X, y)
    if sample_type == 'ROS':
        return ROS(X,y)
    else:
        print(f'{sample_type} is not recognized')
        return None

In [6]:
from transformers import AutoTokenizer
from datasets import Dataset
import pandas as pd
from torch.utils.data import Dataset, DataLoader

def mpaa_pre_processing(data_type="train"):
  if data_type == "train":
    data = mpaa_train_data()
  elif data_type == "test":
    data = mpaa_test_data()
  
  # merge R and PG-13
  return np.array(list(map(lambda x: 3 if x == 4 else x, data)))

class MPAADataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels - 1
    def __len__(self):
        return len(self.labels)
    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

def bert_pre_processing(data_type="train", sample_type="Imbalanced"):
  X_data = None
  if data_type == "train":
    X_data = text_train_data()
  elif data_type == "test":
    X_data = text_test_data()

  labels = mpaa_pre_processing(data_type=data_type).astype('int')

  if sample_type !="Imbalanced":
    X_data, labels = resample(X_data.reshape(-1, 1), labels, sample_type=sample_type)

  # applies a different tokenizer depending on resampling bc shape is slightly different
  if sample_type =="Imbalanced":
    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
    X_encodings = tokenizer(X_data.tolist(), truncation=True, padding=True)
    return MPAADataset(X_encodings, labels)
  else:
    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
    X_encodings = tokenizer(X_data.tolist(), truncation=True, padding=True, is_split_into_words=True)
    return MPAADataset(X_encodings, labels)




In [None]:
dataset_train = bert_pre_processing(data_type="train", sample_type="ROS")
dataset_test = bert_pre_processing(data_type="test")

model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", num_labels=3)

training_args = TrainingArguments(
        output_dir="test_trainer", 
        evaluation_strategy="epoch",
        num_train_epochs = 3,
        gradient_accumulation_steps = 1,
        per_device_train_batch_size = 8,
        learning_rate = 5e-5,
        logging_steps = 400
    )

def compute_metrics(eval_pred):
    # metric1 = evaluate.load("precision")
    # metric2 = evaluate.load("recall")
    metric3 = evaluate.load("f1")
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    # precision = metric1.compute(predictions=predictions, references=labels, average="weighted")
    # recall = metric2.compute(predictions=predictions, references=labels, average="weighted")
    # f1 = metric3.compute(predictions=predictions, references=labels, average="weighted")
    # return {"precision": precision, "recall": recall, "f1": f1}
    return metric3.compute(predictions=predictions, references=labels, average="weighted")

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset_train,
    eval_dataset=dataset_test,
    compute_metrics=compute_metrics,
)

trainer.train()