# Libraries

In [2]:
import numpy as np 
import pandas as pd
import os 
import random

# Data Preparation
from torch.utils.data import Dataset
from sklearn.preprocessing import LabelEncoder
# Data Preprocessing
from transformers import AutoTokenizer
from nltk.corpus import stopwords, wordnet
from nltk.stem import WordNetLemmatizer
from nltk import pos_tag, word_tokenize
import nltk
import string

# Feature extraction
from sklearn.feature_extraction.text import TfidfVectorizer

# Data Visualization
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
import umap

from tqdm import tqdm

# Data Preparation and Preprocessing

In [70]:
# Download necessary NLTK data files
# nltk.download('stopwords')
# nltk.download('wordnet')
# nltk.download('averaged_perceptron_tagger')

# __init__ method is used when the class is called to initialize the instance
# __call__ method is called when the instance is called

class IMDBDataset(Dataset):
    def __init__(self, base_path, size = 2000, train=True, transform=None):
        # pandas DataFrame
        self.df = self._load_imdb_data(base_path, size, train)
        self.features = self.df['review']
        self.labels = LabelEncoder().fit_transform(self.df['sentiment'])
        self.transform = transform

    def __len__(self):
        return len(self.features) # number of samples

    def __getitem__(self, idx):
        feature = self.features.iloc[idx]
        label = self.labels[idx]
        if self.transform:
            feature = self.transform(feature)
        return feature, label
    
    @staticmethod
    def _load_imdb_data(base_path, sample_size, train):
        def score_to_sentiment(score):
            """Convert score to sentiment based on the given convention."""
            if score <= 4:
                return 'negative'
            elif score >= 7:
                return 'positive'
            else:
                return 'neutral'
            
        """Load IMDb data into a pandas DataFrame with balanced sampling."""
        data = []
        categories = ['pos', 'neg']
        sample_per_category = sample_size // 2  # Ensure equal sampling from each category

        for category in categories:
            if train == False: 
                # base_path/test/{neg or pos}/data_in_txt_format
                category_path = os.path.join(base_path, 'test', category)
            else:
                category_path = os.path.join(base_path, 'train', category)
            
            # read all file names in a list
            all_data_files = os.listdir(category_path) # ['1821_4.txt',  '9487_1.txt' ...]

            # get full path to each file in a list
            file_paths = [os.path.join(category_path, file_name) for file_name in all_data_files]

            # Randomly sample file paths from the current category
            sampled_files = random.sample(file_paths, sample_per_category)

            # Read each file and extract information
            for file_path in sampled_files:
                file_name = os.path.basename(file_path)
                id, score = file_name.split('_')
                score = int(score.replace('.txt', ''))
                sentiment = score_to_sentiment(score)

                with open(file_path, 'r', encoding='utf-8') as file:
                    review = file.read()

                data.append({'id': id, 'review': review, 'score': score, 'sentiment': sentiment})

        # Create DataFrame
        df = pd.DataFrame(data)
        return df

class Lowercase:
    def __call__(self, text):
        return text.lower()

class Tokenize:
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer

    def __call__(self, text):
        return self.tokenizer.tokenize(text)
    
class StopwordsRemoval:
    def __init__(self, stopwords):
        # init list of stopwords 
        self.stopwords = set(stopwords)

    # callable object like function
    def __call__ (self, tokens):
        # this object takes tokens (list)
        stopwords_removed_tokens = [word for word in tokens if word not in self.stopwords and word not in string.punctuation]
        return stopwords_removed_tokens

class POStagging:
    def __call__(self, tokens):
        # result from POStaggain is ('running', 'VBP')
        pos_tagged_tokens = nltk.pos_tag(tokens)
        return pos_tagged_tokens
    
class Lemmatization:
    def __init__(self, lemmatizer):
        self.lemmatizer = lemmatizer
    
    # lemmatize tuple of word and its tag ('running', 'verb')
    def __call__(self, pos_tagged_tokens):
        lemmatized_tokens = [self.lemmatizer.lemmatize(word, self._get_wordnet_pos(pos)) for word, pos in pos_tagged_tokens]
        return lemmatized_tokens
    
    @staticmethod
    def _get_wordnet_pos(treebank_tag): #independent of instance
        """Convert Treebank POS tags to WordNet POS tags."""
        if treebank_tag.startswith('J'):
            return wordnet.ADJ
        elif treebank_tag.startswith('V'):
            return wordnet.VERB
        elif treebank_tag.startswith('N'):
            return wordnet.NOUN
        elif treebank_tag.startswith('R'):
            return wordnet.ADV
        else:
            return wordnet.NOUN  # Default to noun if unknown
    
class TFIDF:
    def __init__(self, vectorizer):
        self.vectorizer = vectorizer  # a TF-IDF vectorizer
    def __call__(self, preprocess_text):
        # TF-IDF vectorizer expects an iterable of documents
        tfidf_matrix = self.vectorizer.fit_transform(preprocess_text)
        return tfidf_matrix.toarray()
    
class Compose:
    def __init__(self, transforms):
        # list of transformation object
        self.transforms = transforms

    # apply different transformations in order
    def __call__(self, x):
        for t in self.transforms:
            x = t(x)
        return x

In [58]:
# Load a pre-trained tokenizer
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

# Initialize 
stop_words_list = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()
vectorizer = TfidfVectorizer(max_features=250)

# Preprocessing pipelien
transform_pipeline = Compose([
    Lowercase(),
    Tokenize(tokenizer),
    StopwordsRemoval(stop_words_list),
    POStagging(),
    Lemmatization(lemmatizer),
    TFIDF(vectorizer)
])

In [71]:
# Load dataset
train_dataset = IMDBDataset(base_path='aclImdb', size=2000, train=True, transform=transform_pipeline)
test_dataset = IMDBDataset(base_path='aclImdb', size=2000, train=False, transform=transform_pipeline)

In [74]:
# Example of accessing transformed feature
for feature, label in train_dataset:
    print("Transformed feature:", feature)
    print("Label:", label)

Transformed feature: [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
Label: 1
Transformed feature: [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 1.]
 [0. 0. 0. ... 0. 0. 0.]]
Label: 1
Transformed feature: [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
Label: 1
Transformed feature: [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
Label: 1
Transformed feature: [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
Label: 1
Transformed feature: [[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]

In [None]:
# Apply t-SNE
tsne = TSNE(n_components=2, random_state=42)
tsne_results = tsne.fit_transform(tfidf_matrix.toarray())

# Plot t-SNE results
plt.figure(figsize=(10, 5))

plt.scatter(tsne_results[:, 0], tsne_results[:, 1], c=df['sentiment'].map({'positive': 1, 'negative': 0}), cmap='coolwarm')
plt.title('t-SNE Visualization')
plt.xlabel('t-SNE Component 1')
plt.ylabel('t-SNE Component 2')

In [None]:
# Apply UMAP
umap_reducer = umap.UMAP(n_components=2, random_state=42)
umap_results = umap_reducer.fit_transform(tfidf_matrix.toarray())


# Plot UMAP results
plt.figure(figsize=(10, 5))
plt.scatter(umap_results[:, 0], umap_results[:, 1], c=df['sentiment'].map({'positive': 1, 'negative': 0}), cmap='coolwarm')
plt.title('UMAP Visualization')
plt.xlabel('UMAP Component 1')
plt.ylabel('UMAP Component 2')

plt.tight_layout()
plt.legend()
plt.show()