In [8]:
# Prepare Google Colab Environement and build handmade library
# !git clone https://github.com/kaenova/Headline_Detection.git
# %cd "/content/Headline_Detection"

# !make lib

# !wget "https://is3.cloudhost.id/s3.kaenova.my.id/IMDB%20Dataset.zip"
# !unzip "/content/Headline_Detection/IMDB Dataset.zip"

--2023-02-04 08:47:27--  https://is3.cloudhost.id/s3.kaenova.my.id/IMDB%20Dataset.zip
Resolving is3.cloudhost.id (is3.cloudhost.id)... 103.63.24.210, 103.63.24.211
Connecting to is3.cloudhost.id (is3.cloudhost.id)|103.63.24.210|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 26962657 (26M) [application/octet-stream]
Saving to: ‘IMDB Dataset.zip’


2023-02-04 08:52:15 (71.0 KB/s) - Connection closed at byte 20920424. Retrying.

--2023-02-04 08:52:16--  (try: 2)  https://is3.cloudhost.id/s3.kaenova.my.id/IMDB%20Dataset.zip
Connecting to is3.cloudhost.id (is3.cloudhost.id)|103.63.24.210|:443... connected.
HTTP request sent, awaiting response... 206 Partial Content
Length: 26962657 (26M), 6042233 (5.8M) remaining [application/octet-stream]
Saving to: ‘IMDB Dataset.zip’

IMDB Dataset.zip    100%[+++++++++++++++====>]  25.71M  1.25MB/s    in 9.0s    

2023-02-04 08:52:26 (655 KB/s) - ‘IMDB Dataset.zip’ saved [26962657/26962657]

Archive:  /content/Headline_Detection

In [9]:
# # Reset Google Colab Environment
# %cd ..
# !rm -fr Headline_Detection

In [10]:
import pickle
import bz2
import pandas as pd

## Compressing RAW IMDB Dataset File (only used once)

In [11]:
def compressed_pickle(title, data):
    with bz2.BZ2File(title + '.pbz2','w') as f: 
        pickle.dump(data, f)
        
def decompress_pickle(file):
    data = bz2.BZ2File(file, 'rb')
    data = pickle.load(data)
    return data

In [13]:
df_orig = pd.read_csv("/content/Headline_Detection/IMDB Dataset.csv") # Uncomment this if you're running on colab
# df_orig = decompress_pickle("../../data/0. External Data (not used in research)/IMDB Dataset Pandas DataFrame.pbz2") # Comment this if you're using colab

# Testing training model

In [14]:
# Custom handmade library
import kaelib.processor.preprocessing_func as prep_func
from kaelib.processor import TextProcessingPipeline

In [44]:
df = df_orig.copy()
df.head(5)

Unnamed: 0,review,sentiment
0,One of the other reviewers has mentioned that ...,positive
1,A wonderful little production. <br /><br />The...,positive
2,I thought this was a wonderful way to spend ti...,positive
3,Basically there's a family where a little boy ...,negative
4,"Petter Mattei's ""Love in the Time of Money"" is...",positive


In [45]:
# Train test split

test_ratio = 0.001
train_size = int(len(df) // (1 / (1 - test_ratio)))

text_train = df["review"].to_list()[:train_size]
text_test = df["review"].to_list()[train_size:]

label2id = {"positive": 1, "negative": 0}
id2label = {v: k for k, v in label2id.items()}

labels_train = df["sentiment"].apply(lambda x: label2id[x]).to_list()[:train_size]
labels_test = df["sentiment"].apply(lambda x: label2id[x]).to_list()[train_size:]


In [46]:
# Prep text
pipeline = TextProcessingPipeline([
    prep_func.lowercasing,
    prep_func.remove_html_tags,
    prep_func.remove_url,
    prep_func.remove_punctuation
])

X_train = pipeline.process_corpus(text_train)
X_test = pipeline.process_corpus(text_test)

y_train = labels_train[:]
y_test = labels_test[:]

In [47]:
import typing
import torch

from tokenizers import Tokenizer, models, pre_tokenizers, trainers

class WordTokenizer:
    special_vocab2id = {
        "[PAD]": 0,
        "[UNK]": 1,
    }
    special_id2vocab = {v: k for k, v in special_vocab2id.items()}
    special_token = list(special_vocab2id.keys())

    def __init__(self, vocab_size=10000) -> None:
        self.vocab_size = vocab_size

        self.tokenizer = Tokenizer(
            models.WordLevel(vocab=self.special_vocab2id, unk_token="[UNK]")
        )

        self.tokenizer.pre_tokenizer = pre_tokenizers.Sequence(
            [pre_tokenizers.WhitespaceSplit()]
        )  # type: ignore

    def train(self, corpus: "typing.List[str]", min_frequency=2):
        trainer = trainers.WordLevelTrainer(
            vocab_size=self.vocab_size,
            min_frequency=min_frequency,
            special_tokens=self.special_token,
        )  # type: ignore

        self.tokenizer.train_from_iterator(corpus, trainer=trainer)

    def encode(self, text: "typing.List[str]", max_length=256) -> "torch.Tensor":
        self.tokenizer.enable_padding(
            direction="right", pad_id=self.special_vocab2id["[PAD]"], length=max_length
        )
        self.tokenizer.enable_truncation(max_length)

        tokenized = self.tokenizer.encode_batch(text)

        final_tokenized = []

        for encode in tokenized:
            final_tokenized.append(encode.ids)

        final_tokenized = torch.tensor(final_tokenized)

        self.tokenizer.no_padding()
        self.tokenizer.no_truncation()

        return final_tokenized

    def save(self, path: "str"):
        self.tokenizer.save(path)

    def __init_from_path__(self, path):
        self.tokenizer.from_file(path)

    @classmethod
    def load(cls, path):
        obj = cls.__new__(cls)
        obj.tokenizer = Tokenizer.from_file(path)
        return obj

In [None]:
from torch.utils.data import Dataset
import torch
import typing

class TextClassificationDataset(Dataset):
    def __init__(
        self,
        x: "list[str]",
        y: "list[int]",
        preprocessor: "typing.Optional[TextProcessingPipeline]" = None,
    ):
        assert len(x) == len(y)
        self.x = x
        self.y = torch.tensor(y)
        self.preprocessor = preprocessor

    def __len__(self):
        return len(self.x)

    def _process_idx_text(self, idx):
        data = self.x[idx]
        if type(idx) is not slice:
            data = [self.x[idx]]
        if self.preprocessor is not None:
            data = self.preprocessor.process_corpus(data)
        return data

    def __getitem__(self, idx):
        processed_corpus = self._process_idx_text(idx)
        return processed_corpus, self.y[idx]

In [330]:
# https://ieeexplore.ieee.org/document/8577620
# https://ieeexplore.ieee.org/abstract/document/8589431
# https://ieeexplore.ieee.org/document/8691381

import torch
import torch.nn as nn
import torch.nn.functional as F
import lightning.pytorch as pl


class LSTMCNNClassifier(pl.LightningModule):
    def __init__(
        self,
        seq_length: "int",
        out_feature: "int",
        vocab_size: "int" = 10000,
        text_embed_size: "int" = 128,
        lstm_hidden_size: "int" = 256,
        lstm_layers: "int" = 2,
        lstm_dropout: "float" = 0.5,
        conv_channels_out: "int" = 256,
    ) -> None:
        super().__init__()

        # Config
        self.seq_length = seq_length

        # Layer
        self.text_embeddings = nn.Embedding(
            vocab_size, text_embed_size, device=self.device
        )

        self.lstm_layer = nn.LSTM(
            input_size=text_embed_size,
            hidden_size=lstm_hidden_size,
            num_layers=lstm_layers,
            dropout=lstm_dropout,
            batch_first=True,
        )

        self.conv_layer = nn.Sequential(
            nn.Conv1d(lstm_hidden_size, conv_channels_out, 3),
            nn.MaxPool2d(2),
            nn.Flatten(1),
        )

        self.classification_head = nn.Sequential(
            nn.Linear(((seq_length - 2) // 2) * (conv_channels_out // 2), 1024),
            nn.Linear(1024, 512),
            nn.Linear(512, 256),
            nn.Linear(256, 128),
            nn.Linear(128, out_feature),
        )

    def forward(self, x: "torch.Tensor"):
        sequence_length = x.shape[-1]
        assert sequence_length == self.seq_length

        batch_input = True
        if len(x.shape) == 1:
            batch_input = False
            x = torch.stack([x])

        x = self.text_embeddings(x)
        x, _ = self.lstm_layer(x)  # (N, seq_length, lstm_hidden_size)
        x = x.reshape([x.shape[0], x.shape[2], x.shape[1]])
        # print("test",x.shape)
        x = self.conv_layer(x)
        x = self.classification_head(x)

        if not batch_input:
            x = x[0]

        return x

    def training_step(self, batch, batch_idx):
        # training_step defines the train loop.
        x, y = batch
        pred = self.forward(x[0])
        loss = F.cross_entropy(pred, y)
        self.log("training_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        # training_step defines the train loop.
        x, y = batch
        pred = self.forward(x[0])
        loss = F.cross_entropy(pred, y)
        self.log("validation_loss", loss)

    def test_step(self, batch, batch_idx):
        # training_step defines the train loop.
        x, y = batch
        pred = self.forward(x[0])
        loss = F.cross_entropy(pred, y)
        self.log("test_loss", loss)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.00001)
        return optimizer


In [331]:
import torch
# sample = torch.randint(9999, (0,128))

model = LSTMCNNClassifier(128, 2)
sample = torch.randint(999, (2, 128))
model(sample)


tensor([[-0.0372,  0.0808],
        [-0.0353,  0.0816]], grad_fn=<AddmmBackward0>)

In [51]:
test_input = ["This movie suck", "I love this movie so much", "Meh", "This movie kinda suck, but there's some part that i love"]
with torch.no_grad():
    pred = model(test_input)
    pred = F.softmax(pred, dim=1)
    pred_np = pred.argmax(dim=1).cpu().detach().numpy()
    for i in range(len(test_input)):
        print(f"'{test_input[i]}' : {id2label[pred_np[i]]}")

'This movie suck' : negative
'I love this movie so much' : negative
'Meh' : negative
'This movie kinda suck, but there's some part that i love' : negative
