In [1]:
import pandas as pd

# Loading the dataset
df_original = pd.read_csv('Reviews.csv')

In [2]:
# Make a backup of the original dataframe
df_backup = df_original.copy()

# List of columns to keep
columns_to_keep = ['Text', 'Score']

# Drop all other columns
df_selected = df_original.drop(columns=[col for col in df_original.columns if col not in columns_to_keep])
df_selected = df_selected.rename(columns={'Score': 'label'})

In [3]:
from sklearn.base import TransformerMixin, BaseEstimator
class modify_score(TransformerMixin, BaseEstimator):
  def __init__(self, labels, column):
    self.labels = labels
    self.column = column

  def fit(self, X, y=None):
    return self

  def transform(self, X, y=None):
    X_copy = X.copy()
    X_copy[self.column] = X_copy[self.column].map(self.labels)
    return X_copy

In [4]:
class equalize_sentiments(TransformerMixin, BaseEstimator):
  def __init__(self, random_state, column, target_count):
    self.random_state = random_state
    self.column = column
    self.target_count = target_count

  def fit(self, X, y=None):
    return self

  def transform(self, X, y=None):
    X_copy = X.copy()
    df_balanced = X_copy.groupby(self.column).apply(
        lambda x: x.sample(n=self.target_count, random_state=self.random_state)
        ).reset_index(drop=True)
    df_balanced = df_balanced.sample(frac=1, random_state=self.random_state).reset_index(drop=True)
    return df_balanced

In [5]:
import re
# Define a function to clean text
class clean_text(TransformerMixin, BaseEstimator):

  def clean_text_fn(self, text):
    # Remove HTML tags
    text = re.sub(r'<.*?>', '', text)
    # Remove leading and trailing whitespace
    text = text.strip()
    # Replace multiple spaces with a single space
    text = re.sub(r'\s+', ' ', text)
    return text

  def __init__(self, column):
    self.column = column

  def fit(self, X, y=None):
    return self

  def transform(self, X, y=None):
    X_copy = X.copy()
    X_copy[self.column] = X_copy[self.column].apply(self.clean_text_fn)
    return X_copy

In [6]:
class shrink_dataset(TransformerMixin, BaseEstimator):
  def __init__(self, max_length, random_state):
    self.max_length = max_length
    self.random_state = random_state

  def fit(self, X, y=None):
    return self

  def transform(self, X, y=None):
    X_copy = X.copy()
    X_copy = X_copy.sample(n=self.max_length, random_state=self.random_state)
    return X_copy

In [7]:
class filter_large_texts(TransformerMixin, BaseEstimator):

  def count_tokens(self, text):
    tokens = tokenizer.encode(text, truncation=False)
    return len(tokens)

  def __init__(self, max_length, column):
    self.max_length = max_length
    self.column = column

  def fit(self, X, y=None):
    return self

  def transform(self, X, y=None):
    X_copy = X.copy()
    X_copy['num_tokens'] = X_copy[self.column].apply(self.count_tokens)
    X_copy = X_copy[X_copy['num_tokens'] <= self.max_length].reset_index(drop=True)
    return X_copy

In [36]:
import tensorflow as tf
from sklearn.model_selection import train_test_split

class personal_tokenizer(TransformerMixin, BaseEstimator):

  def tokenize_function(self, texts):
    return self.tokenizer(
        texts.tolist(),
        padding='max_length',
        truncation=True,
        max_length=self.max_length,
        return_tensors='tf'
    )

  def __init__(self, tokenizer, column, max_length):
    self.tokenizer = tokenizer
    self.column = column
    self.max_length = max_length

  def fit(self, X, y=None):
    return self

  def transform(self, X, y=None):
    X_copy = X.copy()
    tokenized_texts = self.tokenize_function(X_copy[self.column])
    tokenized_texts_and_labels = {
        'input_ids': tokenized_texts['input_ids'],
        'attention_mask': tokenized_texts['attention_mask'],
        'labels': X_copy['label'].values
    }
    return tokenized_texts_and_labels

In [9]:
# Setting up the model and tokenizer
from transformers import TFRobertaForSequenceClassification, RobertaTokenizer

# Load the pre-trained tokenizer and sentiment fine-tuned model
tokenizer = RobertaTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment')
model = TFRobertaForSequenceClassification.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment')

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/747 [00:00<?, ?B/s]



tf_model.h5:   0%|          | 0.00/501M [00:00<?, ?B/s]

All model checkpoint layers were used when initializing TFRobertaForSequenceClassification.

All the layers of TFRobertaForSequenceClassification were initialized from the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaForSequenceClassification for predictions without further training.


In [37]:
from sklearn.pipeline import Pipeline
score_to_label = {
    1: 0,  # Negative sentiment
    2: 0,  # Negative sentiment
    3: 1,  # Neutral sentiment
    4: 2,  # Positive sentiment
    5: 2   # Positive sentiment
    }
preprocessing_pipeline = Pipeline([
    ('score_modification', modify_score(score_to_label, 'label')),
    ('equalize_sentiments', equalize_sentiments(42, 'label', 42640)),
    ('clean_text', clean_text('Text')),
    ('shrink_dataset', shrink_dataset(max_length=10000, random_state=42)),
    ('filter_large_texts', filter_large_texts(max_length=512, column='Text')),
    ('personal_tokenizer', personal_tokenizer(tokenizer, 'Text', 512))
])
preprocessing_pipeline

In [38]:
df_balanced = preprocessing_pipeline.fit_transform(df_selected)
df_balanced

{'input_ids': <tf.Tensor: shape=(9893, 512), dtype=int32, numpy=
 array([[    0,   133,  1628, ...,     1,     1,     1],
        [    0,   713,  4076, ...,     1,     1,     1],
        [    0, 33295, 10928, ...,     1,     1,     1],
        ...,
        [    0,   100,  1017, ...,     1,     1,     1],
        [    0, 11475,  2115, ...,     1,     1,     1],
        [    0,   133, 19757, ...,     1,     1,     1]], dtype=int32)>,
 'attention_mask': <tf.Tensor: shape=(9893, 512), dtype=int32, numpy=
 array([[1, 1, 1, ..., 0, 0, 0],
        [1, 1, 1, ..., 0, 0, 0],
        [1, 1, 1, ..., 0, 0, 0],
        ...,
        [1, 1, 1, ..., 0, 0, 0],
        [1, 1, 1, ..., 0, 0, 0],
        [1, 1, 1, ..., 0, 0, 0]], dtype=int32)>,
 'labels': array([2, 1, 0, ..., 2, 2, 2])}

In [39]:
df_balanced['input_ids']

<tf.Tensor: shape=(9893, 512), dtype=int32, numpy=
array([[    0,   133,  1628, ...,     1,     1,     1],
       [    0,   713,  4076, ...,     1,     1,     1],
       [    0, 33295, 10928, ...,     1,     1,     1],
       ...,
       [    0,   100,  1017, ...,     1,     1,     1],
       [    0, 11475,  2115, ...,     1,     1,     1],
       [    0,   133, 19757, ...,     1,     1,     1]], dtype=int32)>

In [40]:
df_balanced['attention_mask']

<tf.Tensor: shape=(9893, 512), dtype=int32, numpy=
array([[1, 1, 1, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0],
       ...,
       [1, 1, 1, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0],
       [1, 1, 1, ..., 0, 0, 0]], dtype=int32)>

In [41]:
df_balanced['labels']

array([2, 1, 0, ..., 2, 2, 2])