# Import Library

In [None]:
!pip install PySastrawi
!pip install emoji
!pip install nltk

In [None]:
import pandas as pd
import numpy as np
import unicodedata
import re
import emoji
from nltk.tokenize import word_tokenize
from Sastrawi.Stemmer.StemmerFactory import StemmerFactory
from Sastrawi.StopWordRemover.StopWordRemoverFactory import StopWordRemoverFactory

# Load Data

In [None]:
# Load the uploaded file into a DataFrame
df = pd.read_csv("/kaggle/input/absa-dataset/DataABSA - id_data_raw (1).csv")

df.head()

In [None]:
import pandas as pd

# Membaca data dari file CSV
df = pd.read_csv("/kaggle/input/absa-dataset/DataABSA - id_data_raw (1).csv")

# List kolom aspek dan sentimen
aspek_cols = [f'aspek{i}' for i in range(1, 7)]
sentimen_cols = [f'sentimen{i}' for i in range(1, 7)]

# Dataframe kosong untuk hasil akhir
result = []

# Loop melalui setiap baris
for index, row in df.iterrows():
    # Loop melalui setiap pasangan aspek dan sentimen
    for aspek_col, sentimen_col in zip(aspek_cols, sentimen_cols):
        aspek = row[aspek_col]
        sentimen = row[sentimen_col]
        if pd.notna(aspek) and pd.notna(sentimen):
            result.append({
                'No.': row['No.'],
                'Review': row['Review'],
                'aspek': aspek,
                'sentimen': sentimen
            })

# Konversi list of dicts ke DataFrame
result_df = pd.DataFrame(result)

# Menampilkan hasil akhir
print(result_df)


# Data Preprocessing

In [None]:
import pandas as pd
import numpy as np
import re
import emoji
from Sastrawi.Stemmer.StemmerFactory import StemmerFactory
from Sastrawi.StopWordRemover.StopWordRemoverFactory import StopWordRemoverFactory
from nltk.tokenize import word_tokenize
import unicodedata

class preprocess_data:
    def __init__(self):
        # Load the CSV file into a DataFrame
        df_kamusalay = pd.read_csv('/kaggle/input/kamusalay/new_kamusalay.csv', header=None, encoding='latin1')
        self.word_map = dict(zip(df_kamusalay[0], df_kamusalay[1]))

        # Load the abusive words from abusive.csv into a set
        df_abusive = pd.read_csv('/kaggle/input/abusive/abusive.csv')
        self.abusive_words = set(df_abusive['ABUSIVE'])

        factory = StemmerFactory()
        self.stemmer = factory.create_stemmer()
        stopword_factory = StopWordRemoverFactory()
        self.stopwords = stopword_factory.get_stop_words()

        # Aspect terms to be preserved
        self.aspect_terms = {'aspek', 'pelayanan', 'tempat', 'harga', 'menu', 'rasa', 'makanan'}

        # Remove aspect terms from stopwords
        self.stopwords = [word for word in self.stopwords if word not in self.aspect_terms]

        # Custom stemming dictionary
        self.custom_stem_dict = {
            'makanannya': 'makanan',
            'makanan': 'makanan',
            'pelayanan': 'pelayanan',
            'pelayanannya': 'pelayanan',
            'pelayan': 'pelayanan',
            'layanan': 'pelayanan'
        }

    def case_folding(self, text):
        return text.lower()

    def remove_non_ascii(self, text):
        return unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('utf-8', 'ignore')
    
    def remove_punctuation(self, text):
        return re.sub(r'[^\w\s]', ' ', text)

    def remove_repeated_characters(self, text):
        return re.sub(r'\b(\w*?)([^grnlmo])\2+(\w*)\b', r'\1\2\3', text)

    def fix_typos(self, text):
        words = text.split()
        normalized_words = [self.word_map.get(word, word) for word in words]
        return ' '.join(normalized_words)
    
    def remove_abusive_words(self, text):
        words = text.split()
        clean_words = [word for word in words if word.lower() not in self.abusive_words]
        return ' '.join(clean_words)

    def remove_whitespace(self, text):
        return re.sub(r'\s+', ' ', text).strip()

    def emojize(self, text):
        return emoji.demojize(text)

    def tokenize(self, text):
        return word_tokenize(text)

    def remove_stopwords(self, tokens):
        return [word for word in tokens if word not in self.stopwords]

    def stemming(self, text):
        words = text.split()
        stemmed_words = [self.custom_stem_dict.get(word, self.stemmer.stem(word)) for word in words]
        return ' '.join(stemmed_words)
    
    def remove_numbers(self, text):
        return re.sub(r'\d+', '', text)
    
    def preprocess_text(self, text):
        text = self.remove_non_ascii(text)
        text = self.case_folding(text)
        text = self.remove_punctuation(text)
        text = self.remove_repeated_characters(text)
        text = self.fix_typos(text)
        text = self.remove_abusive_words(text)
        text = self.remove_whitespace(text)
        text = self.emojize(text)
        text = self.remove_numbers(text)
        tokens = self.tokenize(text)
        tokens = self.remove_stopwords(tokens)
        cleaned_text = ' '.join(tokens)
        text = self.stemming(cleaned_text)
        return text

In [None]:
cleaned_data = result_df.copy()
cleaned_data = cleaned_data.rename(columns={'Review (lower)': 'Review'})

cleaned_data.head()

In [None]:
# Random Oversampling pada sentimen

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

def oversampling_and_plot(df, sentiment_col):
    # Count occurrences of each sentiment class
    sentiment_counts = df[sentiment_col].value_counts()
    count_class_0_sent = sentiment_counts.iloc[0]

    # Filter data for each sentiment class
    df_class_0_sent = df[df[sentiment_col] == sentiment_counts.index[0]]
    df_class_1_sent = df[df[sentiment_col] == sentiment_counts.index[1]]
    df_class_2_sent = df[df[sentiment_col] == sentiment_counts.index[2]]

    # Oversample the minority classes to match the count of the majority class
    df_class_1_over_sent = df_class_1_sent.sample(count_class_0_sent, replace=True)
    df_class_2_over_sent = df_class_2_sent.sample(count_class_0_sent, replace=True)

    # Concatenate the oversampled dataframes
    df_test_over_sent = pd.concat([df_class_0_sent, df_class_1_over_sent, df_class_2_over_sent], axis=0)

    # Plotting
    fig, axes = plt.subplots(1, 2, figsize=(12, 6))

    # Plotting the original distribution
    sns.countplot(ax=axes[0], data=df, x=sentiment_col)
    axes[0].set_title('Original Sentiment Distribution')
    axes[0].set_xlabel('Sentiment')
    axes[0].set_ylabel('Count')

    # Plotting the oversampled distribution
    sns.countplot(ax=axes[1], data=df_test_over_sent, x=sentiment_col)
    axes[1].set_title('Oversampled Sentiment Distribution')
    axes[1].set_xlabel('Sentiment')
    axes[1].set_ylabel('Count')

    plt.tight_layout()
    plt.show()

    return df_test_over_sent

# Example usage
df_cleaned = pd.DataFrame(cleaned_data)
balanced_df = oversampling_and_plot(df_cleaned, 'sentimen')

In [None]:
!pip install nlpaug
!pip install nltk

In [None]:
# Assuming dn is an instance of the data_normalization class
pd_instance = preprocess_data()

# Apply the normalize_text method to the review column and replace the values
balanced_df['Review'] = balanced_df['Review'].apply(lambda x: pd_instance.preprocess_text(x))

print(balanced_df)

# Splitting Data

In [None]:
import torch
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

class Config:
    def __init__(self):
        self.batch_size = 16
        self.val_size = 0.1

config = Config()

# Splitting the dataset
train_data, val_data = train_test_split(balanced_df, test_size=config.val_size, random_state=42)

# Creating DataLoader instances
train_loader = DataLoader(train_data.to_dict('records'), batch_size=config.batch_size, shuffle=True)
val_loader = DataLoader(val_data.to_dict('records'), batch_size=config.batch_size, shuffle=False)

# Print to verify the splits
print(f"Training set size: {len(train_data)}")
print(f"Validation set size: {len(val_data)}")

train_data.head()

In [None]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder

# Assuming cleaned_data is your dataframe
# Convert categorical aspect labels to numerical labels
aspect_label_encoder = LabelEncoder()
balanced_df['aspek'] = aspect_label_encoder.fit_transform(balanced_df['aspek'])

# Check the class mapping for aspects
print("Aspect Classes:", aspect_label_encoder.classes_)

# If you still need to convert 'sentimen' column to numerical labels as well
sentiment_label_encoder = LabelEncoder()
balanced_df['sentimen'] = sentiment_label_encoder.fit_transform(balanced_df['sentimen'])

# Check the class mapping for sentiments
print("Sentiment Classes:", sentiment_label_encoder.classes_)

balanced_df.head()


# ML Model

In [None]:
import pandas as pd
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, BertConfig
import torch
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm

# Tokenizer
tokenizer = BertTokenizer.from_pretrained('indobenchmark/indobert-large-p1')

# Prepare the dataset class
class ReviewDataset(Dataset):
    def __init__(self, data, tokenizer, max_len):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        review = str(self.data.iloc[idx]['Review'])
        aspect = self.data.iloc[idx]['aspek']

        encoding = self.tokenizer.encode_plus(
            review,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'aspect': torch.tensor(aspect, dtype=torch.long)
        }

# Load and preprocess the data
# Assuming cleaned_data is already defined as a DataFrame

# Encode aspects
aspect_label_encoder = LabelEncoder()
balanced_df['aspek'] = aspect_label_encoder.fit_transform(balanced_df['aspek'])

# Prepare Data
train_data, val_data = train_test_split(balanced_df, test_size=0.1, random_state=42)
train_dataset = ReviewDataset(train_data, tokenizer, max_len=128)
val_dataset = ReviewDataset(val_data, tokenizer, max_len=128)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# Initialize the model
config = BertConfig.from_pretrained('indobenchmark/indobert-large-p1', num_labels=len(aspect_label_encoder.classes_))
model = BertForSequenceClassification.from_pretrained('indobenchmark/indobert-large-p1', config=config)

# Training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = AdamW(model.parameters(), lr=5e-5)


for epoch in range(2):  
    model.train()
    total_loss = 0
    predictions = []
    true_labels = []

    for batch in tqdm(train_loader):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        aspect = batch['aspect'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask, labels=aspect)
        loss = outputs.loss
        total_loss += loss.item()
        loss.backward()
        optimizer.step()

        # Collect predictions and true labels for metrics
        _, preds = torch.max(outputs.logits, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(aspect.cpu().numpy())

    # Calculate metrics for the current epoch
    predicted_aspects = aspect_label_encoder.inverse_transform(predictions)
    true_aspects = aspect_label_encoder.inverse_transform(true_labels)

    accuracy = accuracy_score(true_aspects, predicted_aspects)
    precision = precision_score(true_aspects, predicted_aspects, average='weighted')
    recall = recall_score(true_aspects, predicted_aspects, average='weighted')
    f1 = f1_score(true_aspects, predicted_aspects, average='weighted')

    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}, Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

# Prediction on validation set
model.eval()
predictions = []
true_labels = []
with torch.no_grad():
    for batch in tqdm(val_loader):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        true_aspects = batch['aspect'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        _, preds = torch.max(outputs.logits, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(true_aspects.cpu().numpy())

# Decode predictions to original aspect labels
predicted_aspects = aspect_label_encoder.inverse_transform(predictions)
true_aspects = aspect_label_encoder.inverse_transform(true_labels)

# Calculate metrics
accuracy = accuracy_score(true_aspects, predicted_aspects)
precision = precision_score(true_aspects, predicted_aspects, average='weighted')
recall = recall_score(true_aspects, predicted_aspects, average='weighted')
f1 = f1_score(true_aspects, predicted_aspects, average='weighted')

print(f"Validation Accuracy: {accuracy:.4f}")
print(f"Validation Precision: {precision:.4f}")
print(f"Validation Recall: {recall:.4f}")
print(f"Validation F1 Score: {f1:.4f}")


In [None]:
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, BertConfig
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm

# Configuration
class config:
    def __init__(self):
        self.batch_size = 16
        self.val_size = 0.1
        self.learning_rate = 5e-5
        self.epochs = 2
        self.max_len = 128

config = Config()

# Encode labels
aspect_label_encoder = LabelEncoder()
sentiment_label_encoder = LabelEncoder()
cleaned_data['aspek'] = aspect_label_encoder.fit_transform(cleaned_data['aspek'])
cleaned_data['sentimen'] = sentiment_label_encoder.fit_transform(cleaned_data['sentimen'])

# Tokenizer
tokenizer = BertTokenizer.from_pretrained('indobenchmark/indobert-base-p1')

# Dataset class
class ReviewDataset(Dataset):
    def __init__(self, data, tokenizer, max_len):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        review = str(self.data.iloc[idx]['Review'])
        aspect = self.data.iloc[idx]['aspek']

        encoding = self.tokenizer.encode_plus(
            review,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'aspect': torch.tensor(aspect, dtype=torch.long)
        }

# Split data and create DataLoaders
train_data, val_data = train_test_split(cleaned_data, test_size=config.val_size, random_state=42)
train_dataset = ReviewDataset(train_data, tokenizer, config.max_len)
val_dataset = ReviewDataset(val_data, tokenizer, config.max_len)
train_loader = DataLoader(train_dataset, batch_size=config.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=config.batch_size, shuffle=False)

# Initialize model
model_config = BertConfig.from_pretrained('indobenchmark/indobert-base-p1', num_labels=len(aspect_label_encoder.classes_))
model = BertForSequenceClassification.from_pretrained('indobenchmark/indobert-base-p1', config=model_config)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Optimizer
optimizer = AdamW(model.parameters(), lr=config.learning_rate)

# Training function
def train_epoch(model, data_loader, optimizer, device):
    model.train()
    total_loss = 0
    predictions, true_labels = [], []

    for batch in tqdm(data_loader):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        aspect = batch['aspect'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask, labels=aspect)
        loss = outputs.loss
        total_loss += loss.item()
        loss.backward()
        optimizer.step()

        _, preds = torch.max(outputs.logits, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(aspect.cpu().numpy())

    return total_loss, predictions, true_labels

# Evaluation function
def evaluate_model(model, data_loader, device):
    model.eval()
    predictions, true_labels = [], []

    with torch.no_grad():
        for batch in tqdm(data_loader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            aspect = batch['aspect'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs.logits, dim=1)
            predictions.extend(preds.cpu().numpy())
            true_labels.extend(aspect.cpu().numpy())

    return predictions, true_labels

# Training loop
for epoch in range(config.epochs):
    total_loss, train_predictions, train_labels = train_epoch(model, train_loader, optimizer, device)
    train_accuracy = accuracy_score(train_labels, train_predictions)
    train_precision = precision_score(train_labels, train_predictions, average='weighted')
    train_recall = recall_score(train_labels, train_predictions, average='weighted')
    train_f1 = f1_score(train_labels, train_predictions, average='weighted')

    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}, "
          f"Accuracy: {train_accuracy:.4f}, Precision: {train_precision:.4f}, "
          f"Recall: {train_recall:.4f}, F1 Score: {train_f1:.4f}")

# Validation
val_predictions, val_labels = evaluate_model(model, val_loader, device)
val_accuracy = accuracy_score(val_labels, val_predictions)
val_precision = precision_score(val_labels, val_predictions, average='weighted')
val_recall = recall_score(val_labels, val_predictions, average='weighted')
val_f1 = f1_score(val_labels, val_predictions, average='weighted')

print(f"Validation Accuracy: {val_accuracy:.4f}")
print(f"Validation Precision: {val_precision:.4f}")
print(f"Validation Recall: {val_recall:.4f}")
print(f"Validation F1 Score: {val_f1:.4f}")


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Encode label aspek
label_encoder = LabelEncoder()
cleaned_data['sentimen'] = label_encoder.fit_transform(cleaned_data['aspek'])

# Pisahkan data ke dalam set pelatihan dan validasi
train_df, val_df = train_test_split(cleaned_data, test_size=0.2, random_state=42)


In [None]:
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer

class ReviewDataset(Dataset):
    def __init__(self, data, tokenizer, max_len):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        review = self.data.iloc[index]['Review']
        labels = self.data.iloc[index]['sentimen']
        encoding = self.tokenizer.encode_plus(
            review,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        return {
            'review_text': review,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(labels, dtype=torch.long)
        }

tokenizer = BertTokenizer.from_pretrained('indobenchmark/indobert-base-p1')
max_len = 128
batch_size = 32

train_dataset = ReviewDataset(train_df, tokenizer, max_len)
val_dataset = ReviewDataset(val_df, tokenizer, max_len)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)


In [None]:
import numpy as np
import torch
import torch.nn as nn
from transformers import BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup

# Load model
model = BertForSequenceClassification.from_pretrained(
    'indobenchmark/indobert-base-p1',
    num_labels=len(label_encoder.classes_)
)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = model.to(device)

# Define optimizer, loss function, and scheduler
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
total_steps = len(train_loader) * 3
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)
loss_fn = nn.CrossEntropyLoss().to(device)

# Training function
def train_epoch(model, data_loader, loss_fn, optimizer, device, scheduler, n_examples):
    model.train()
    losses = []
    correct_predictions = 0
    for d in data_loader:
        input_ids = d['input_ids'].to(device)
        attention_mask = d['attention_mask'].to(device)
        labels = d['labels'].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        logits = outputs.logits

        _, preds = torch.max(logits, dim=1)
        correct_predictions += torch.sum(preds == labels)
        losses.append(loss.item())

        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
    
    return correct_predictions.double() / n_examples, np.mean(losses)

# Evaluation function
def eval_model(model, data_loader, loss_fn, device, n_examples):
    model.eval()
    losses = []
    correct_predictions = 0
    with torch.no_grad():
        for d in data_loader:
            input_ids = d['input_ids'].to(device)
            attention_mask = d['attention_mask'].to(device)
            labels = d['labels'].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            logits = outputs.logits

            _, preds = torch.max(logits, dim=1)
            correct_predictions += torch.sum(preds == labels)
            losses.append(loss.item())

    return correct_predictions.double() / n_examples, np.mean(losses)

# Training loop
epochs = 2

for epoch in range(epochs):
    print(f'Epoch {epoch + 1}/{epochs}')
    print('-' * 10)
    
    train_acc, train_loss = train_epoch(
        model,
        train_loader,
        loss_fn,
        optimizer,
        device,
        scheduler,
        len(train_df)
    )
    
    print(f'Train loss {train_loss} accuracy {train_acc}')
    
    val_acc, val_loss = eval_model(
        model,
        val_loader,
        loss_fn,
        device,
        len(val_df)
    )
    
    print(f'Validation loss {val_loss} accuracy {val_acc}')
    print()


In [None]:
!pip install setfit


In [None]:
from setfit import SetFitModel
from sklearn.preprocessing import LabelEncoder

# Install SetFit library
# pip install setfit

# Load the SetFit model from the Hugging Face Hub
model = SetFitModel.from_pretrained("firqaaa/indo-setfit-bert-base-p1")

# Example input: list of review texts
reviews = [
    "Saya sangat suka dengan pelayanan di restoran ini.",
    "Makanannya enak tetapi harganya terlalu mahal.",
    "Lokasi restoran ini sangat strategis dan nyaman."
]

# Run inference
preds = model(reviews)

# Assuming you have the label encoder used during training
aspect_label_encoder = LabelEncoder()
aspect_labels = ["lokasi", "harga", "pelayanan", "makanan"]  # Example aspect labels
aspect_label_encoder.fit(aspect_labels)

# Decode predictions
decoded_preds = aspect_label_encoder.inverse_transform(preds)

# Print decoded predictions
for review, aspect in zip(reviews, decoded_preds):
    print(f"Review: {review}")
    print(f"Predicted Aspect: {aspect}\n")


# Elfira Nyoba

## DeBERTa

In [None]:
!pip install transformers[sentencepiece]

In [None]:
class Config:
    def __init__(self, vocab_size=50265, hidden_size=768, num_hidden_layers=12, num_attention_heads=12,
                 intermediate_size=3072, hidden_act='gelu', hidden_dropout_prob=0.1,
                 attention_probs_dropout_prob=0.1, max_position_embeddings=512, type_vocab_size=0,
                 initializer_range=0.02, layer_norm_eps=1e-07, relative_attention=False,
                 max_relative_positions=-1, pad_token_id=0, position_biased_input=True,
                 pos_att_type=None, pooler_dropout=0, pooler_hidden_act='gelu', **kwargs):
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.num_hidden_layers = num_hidden_layers
        self.num_attention_heads = num_attention_heads
        self.intermediate_size = intermediate_size
        self.hidden_act = hidden_act
        self.hidden_dropout_prob = hidden_dropout_prob
        self.attention_probs_dropout_prob = attention_probs_dropout_prob
        self.max_position_embeddings = max_position_embeddings
        self.type_vocab_size = type_vocab_size
        self.initializer_range = initializer_range
        self.layer_norm_eps = layer_norm_eps
        self.relative_attention = relative_attention
        self.max_relative_positions = max_relative_positions
        self.pad_token_id = pad_token_id
        self.position_biased_input = position_biased_input
        self.pos_att_type = pos_att_type
        self.pooler_dropout = pooler_dropout
        self.pooler_hidden_act = pooler_hidden_act
        self.extra_args = kwargs

    def __repr__(self):
        return (f"ModelConfig(vocab_size={self.vocab_size}, hidden_size={self.hidden_size}, "
                f"num_hidden_layers={self.num_hidden_layers}, num_attention_heads={self.num_attention_heads}, "
                f"intermediate_size={self.intermediate_size}, hidden_act='{self.hidden_act}', "
                f"hidden_dropout_prob={self.hidden_dropout_prob}, attention_probs_dropout_prob={self.attention_probs_dropout_prob}, "
                f"max_position_embeddings={self.max_position_embeddings}, type_vocab_size={self.type_vocab_size}, "
                f"initializer_range={self.initializer_range}, layer_norm_eps={self.layer_norm_eps}, "
                f"relative_attention={self.relative_attention}, max_relative_positions={self.max_relative_positions}, "
                f"pad_token_id={self.pad_token_id}, position_biased_input={self.position_biased_input}, "
                f"pos_att_type={self.pos_att_type}, pooler_dropout={self.pooler_dropout}, "
                f"pooler_hidden_act='{self.pooler_hidden_act}', extra_args={self.extra_args})")

# Usage
config = Config()
print(config)

In [None]:
from transformers import AutoTokenizer, DebertaModel
import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm

# Tokenizer
tokenizer = AutoTokenizer.from_pretrained("microsoft/deberta-base")

# Prepare the dataset class
class ReviewDataset(Dataset):
    def __init__(self, data, tokenizer, max_len):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        review = str(self.data.iloc[idx]['Review'])
        aspect = self.data.iloc[idx]['aspek']

        encoding = self.tokenizer.encode_plus(
            review,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'aspect': torch.tensor(aspect, dtype=torch.long)
        }

# Load and preprocess the data
# Assuming cleaned_data is already defined as a DataFrame

# Encode aspects
aspect_label_encoder = LabelEncoder()
cleaned_data['aspek'] = aspect_label_encoder.fit_transform(cleaned_data['aspek'])

# Prepare Data
train_data, val_data = train_test_split(cleaned_data, test_size=0.1, random_state=42)
train_dataset = ReviewDataset(train_data, tokenizer, max_len=128)
val_dataset = ReviewDataset(val_data, tokenizer, max_len=128)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# Initialize the model
configuration = Config()

# Initializing a model (with random weights) from the microsoft/deberta-base style configuration
model = DebertaModel.from_pretrained("microsoft/deberta-base")

# Accessing the model configuration
configuration = model.config

# Training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.torch(device)
optimizer = AdamW(model.parameters(), lr=5e-5)


for epoch in range(2):  
    model.train()
    total_loss = 0
    predictions = []
    true_labels = []

    for batch in tqdm(train_loader):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].torch(device)
        attention_mask = batch['attention_mask'].torch(device)
        aspect = batch['aspect'].torch(device)

        outputs = model(input_ids, attention_mask=attention_mask, labels=aspect)
        loss = outputs.loss
        total_loss += loss.item()
        loss.backward()
        optimizer.step()

        # Collect predictions and true labels for metrics
        _, preds = torch.max(outputs.logits, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(aspect.cpu().numpy())

    # Calculate metrics for the current epoch
    predicted_aspects = aspect_label_encoder.inverse_transform(predictions)
    true_aspects = aspect_label_encoder.inverse_transform(true_labels)

    accuracy = accuracy_score(true_aspects, predicted_aspects)
    precision = precision_score(true_aspects, predicted_aspects, average='weighted')
    recall = recall_score(true_aspects, predicted_aspects, average='weighted')
    f1 = f1_score(true_aspects, predicted_aspects, average='weighted')

    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}, Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

# Prediction on validation set
model.eval()
predictions = []
true_labels = []
with torch.no_grad():
    for batch in tqdm(val_loader):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        true_aspects = batch['aspect'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        _, preds = torch.max(outputs.logits, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(true_aspects.cpu().numpy())

# Decode predictions to original aspect labels
predicted_aspects = aspect_label_encoder.inverse_transform(predictions)
true_aspects = aspect_label_encoder.inverse_transform(true_labels)

# Calculate metrics
accuracy = accuracy_score(true_aspects, predicted_aspects)
precision = precision_score(true_aspects, predicted_aspects, average='weighted')
recall = recall_score(true_aspects, predicted_aspects, average='weighted')
f1 = f1_score(true_aspects, predicted_aspects, average='weighted')

print(f"Validation Accuracy: {accuracy:.4f}")
print(f"Validation Precision: {precision:.4f}")
print(f"Validation Recall: {recall:.4f}")
print(f"Validation F1 Score: {f1:.4f}")

In [None]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

from transformers import AutoTokenizer, DebertaForSequenceClassification
import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm

# Tokenizer
tokenizer = AutoTokenizer.from_pretrained("microsoft/deberta-base")

# Prepare the dataset class
class ReviewDataset(Dataset):
    def __init__(self, data, tokenizer, max_len):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        review = str(self.data.iloc[idx]['Review'])
        aspect = self.data.iloc[idx]['aspek']

        encoding = self.tokenizer.encode_plus(
            review,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            return_attention_mask=True,
            return_tensors='pt',
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'aspect': torch.tensor(aspect, dtype=torch.long)
        }

# Load and preprocess the data
# Assuming cleaned_data is already defined as a DataFrame

# Encode aspects
aspect_label_encoder = LabelEncoder()
cleaned_data['aspek'] = aspect_label_encoder.fit_transform(cleaned_data['aspek'])

# Check for invalid labels
num_labels = len(aspect_label_encoder.classes_)
assert cleaned_data['aspek'].min() >= 0 and cleaned_data['aspek'].max() < num_labels, "Invalid labels detected"

# Prepare Data
train_data, val_data = train_test_split(cleaned_data, test_size=0.1, random_state=42)
train_dataset = ReviewDataset(train_data, tokenizer, max_len=128)
val_dataset = ReviewDataset(val_data, tokenizer, max_len=128)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

# Initialize the model
model = DebertaForSequenceClassification.from_pretrained("microsoft/deberta-base", num_labels=num_labels)

# Training setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
optimizer = AdamW(model.parameters(), lr=5e-5)

# Training loop
epochs = 2  # Adjust the number of epochs as needed

for epoch in range(epochs):
    model.train()
    total_loss = 0
    predictions = []
    true_labels = []

    for batch in tqdm(train_loader):
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        aspects = batch['aspect'].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=aspects)
        loss = outputs.loss
        total_loss += loss.item()
        loss.backward()
        optimizer.step()

        # Collect predictions and true labels for metrics
        _, preds = torch.max(outputs.logits, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(aspects.cpu().numpy())

    # Calculate metrics for the current epoch
    predicted_aspects = aspect_label_encoder.inverse_transform(predictions)
    true_aspects = aspect_label_encoder.inverse_transform(true_labels)

    accuracy = accuracy_score(true_aspects, predicted_aspects)
    precision = precision_score(true_aspects, predicted_aspects, average='weighted')
    recall = recall_score(true_aspects, predicted_aspects, average='weighted')
    f1 = f1_score(true_aspects, predicted_aspects, average='weighted')

    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}, Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

# Prediction on validation set
model.eval()
predictions = []
true_labels = []
with torch.no_grad():
    for batch in tqdm(val_loader):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        aspects = batch['aspect'].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        _, preds = torch.max(outputs.logits, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(aspects.cpu().numpy())

# Decode predictions to original aspect labels
predicted_aspects = aspect_label_encoder.inverse_transform(predictions)
true_aspects = aspect_label_encoder.inverse_transform(true_labels)

# Calculate metrics
accuracy = accuracy_score(true_aspects, predicted_aspects)
precision = precision_score(true_aspects, predicted_aspects, average='weighted')
recall = recall_score(true_aspects, predicted_aspects, average='weighted')
f1 = f1_score(true_aspects, predicted_aspects, average='weighted')

print(f"Validation Accuracy: {accuracy:.4f}")
print(f"Validation Precision: {precision:.4f}")
print(f"Validation Recall: {recall:.4f}")
print(f"Validation F1 Score: {f1:.4f}")
