In [1]:
import numpy as np
import pandas as pd
from keras.preprocessing.text import Tokenizer
from keras_preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from keras.layers import *
from keras.models import Model
from keras.callbacks import ModelCheckpoint

2023-05-14 19:25:51.868499: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
data_pd = pd.read_csv("IMDB Dataset.csv")

data_pd["label"] = data_pd["sentiment"].map({'positive': 1, 'negative': 0})
data_pd.head()

Unnamed: 0,review,sentiment,label
0,One of the other reviewers has mentioned that ...,positive,1
1,A wonderful little production. <br /><br />The...,positive,1
2,I thought this was a wonderful way to spend ti...,positive,1
3,Basically there's a family where a little boy ...,negative,0
4,"Petter Mattei's ""Love in the Time of Money"" is...",positive,1


In [3]:
NUM_WORDS = 500
t = Tokenizer(num_words=NUM_WORDS)
# fit the tokenizer on the documents
t.fit_on_texts(data_pd["review"].values)

In [4]:
t.word_index

{'the': 1,
 'and': 2,
 'a': 3,
 'of': 4,
 'to': 5,
 'is': 6,
 'br': 7,
 'in': 8,
 'it': 9,
 'i': 10,
 'this': 11,
 'that': 12,
 'was': 13,
 'as': 14,
 'for': 15,
 'with': 16,
 'movie': 17,
 'but': 18,
 'film': 19,
 'on': 20,
 'not': 21,
 'you': 22,
 'are': 23,
 'his': 24,
 'have': 25,
 'be': 26,
 'one': 27,
 'he': 28,
 'all': 29,
 'at': 30,
 'by': 31,
 'an': 32,
 'they': 33,
 'so': 34,
 'who': 35,
 'from': 36,
 'like': 37,
 'or': 38,
 'just': 39,
 'her': 40,
 'out': 41,
 'about': 42,
 'if': 43,
 "it's": 44,
 'has': 45,
 'there': 46,
 'some': 47,
 'what': 48,
 'good': 49,
 'when': 50,
 'more': 51,
 'very': 52,
 'up': 53,
 'no': 54,
 'time': 55,
 'my': 56,
 'even': 57,
 'would': 58,
 'she': 59,
 'which': 60,
 'only': 61,
 'really': 62,
 'see': 63,
 'story': 64,
 'their': 65,
 'had': 66,
 'can': 67,
 'me': 68,
 'well': 69,
 'were': 70,
 'than': 71,
 'much': 72,
 'we': 73,
 'bad': 74,
 'been': 75,
 'get': 76,
 'do': 77,
 'great': 78,
 'other': 79,
 'will': 80,
 'also': 81,
 'into': 82,
 'p

In [5]:
# convert text to list of sequence according to the word indexes
X = t.texts_to_sequences(data_pd["review"].values)
y = data_pd["label"].values

In [6]:
# transform list of sequence to numpy array, with padding pre/post and maxlen
# for each row options
X = pad_sequences(X, maxlen=300)

In [7]:
# split train set and test set with proportion of 50%
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.5,
    random_state=42,
    stratify=y,
)
# split train set into train and validation sets, 80/20 proportion respectively
X_train, X_val, y_train, y_val = train_test_split(
    X_train,
    y_train,
    test_size=0.2,
    random_state=42,
    stratify=y_train,
)

X_train.shape, X_test.shape, X_val.shape

((20000, 300), (25000, 300), (5000, 300))

In [8]:
class ConcatModel:
    """Provide model architecture concatenating from a Convolution layer and a LSTM layer.

    This model includes:
        - 1 Embedding layer
        - 2 Conv1D layers
        - 2 MaxPooling1D layers
        - 1 Concatenate layer
        - 1 Bidirectional LSTM layer
        - 1 hidden Dense layer
        - 1 Dense layer

    """

    def __init__(self, input_kernel: int) -> None:
        """Initialize required layers for the model."""
        self.input_layer = Input(shape=(input_kernel,))
        self.embed_layer = Embedding(NUM_WORDS, 100)(self.input_layer)
        self.conv_layer1 = Conv1D(
            filters=50,
            kernel_size=10,
            activation="relu",
            padding="same",
        )(self.embed_layer)
        self.conv_layer2 = Conv1D(
            filters=50,
            kernel_size=5,
            activation="relu",
            padding="same",
        )(self.embed_layer)
        self.pooling_layer1 = MaxPooling1D(pool_size=2)(self.conv_layer1)
        self.pooling_layer2 = MaxPooling1D(pool_size=2)(self.conv_layer2)
        self.concat_layer = Concatenate(axis=1)(
            [self.pooling_layer1, self.pooling_layer2],
        )
        self.lstm_layer = Bidirectional(
            LSTM(50, activation="sigmoid", return_sequences=False)
        )(self.concat_layer)
        self.hidden_dense_layer = Dense(\
            50,
            activation="sigmoid",
        )(self.lstm_layer)
        self.output_layer = Dense(
            1,
            activation="sigmoid",
        )(self.hidden_dense_layer)
        self.model = Model(inputs=self.input_layer, outputs=self.output_layer)

    def compile(self, loss: str, optimizer: str, metrics: list[str]) -> None:
        """Compile the with given loss, optimizer and metrics."""
        self.model.compile(loss=loss, optimizer=optimizer, metrics=metrics)
        self.model.summary()

    def fit(
        self,
        X_train: np.array,
        y_train: np.array,
        X_val: np.array,
        y_val: np.array,
        epochs: int,
        batch_size: int,
        filename: str,
    ) -> None:
        """Fit the model with train set and evaluate with validation set."""
        self.mc = ModelCheckpoint(
            filename,
            monitor="val_accuracy",
            save_best_only=True,
            save_weights_only=True,
        )
        self.model.fit(
            X_train,
            y_train,
            validation_data=(X_val, y_val),
            epochs=epochs,
            batch_size=batch_size,
            callbacks=[self.mc],
        )

    def accuracy(self, X_test: np.array, y_test: np.array) -> float:
        """Return the model's accuracy."""
        _, accurary = self.model.evaluate(X_test, y_test)
        return accurary

    def load_model_accuracy(
        self,
        X_test: np.array,
        y_test: np.array,
        filename: str,
    ) -> float:
        """Return the model's accuracy from a weighted file."""
        self.model.load_weights(filename)
        y_test_pred = self.model.predict(X_test)
        y_test_pred = [1 if pred > 0.5 else 0 for pred in y_test_pred]
        return 1 - np.sum(np.abs(y_test_pred - y_test)) / len(y_test_pred)

In [9]:
model = ConcatModel(input_kernel=300)
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])
model.fit(X_train, y_train, X_val, y_val, epochs=10, batch_size=250, filename="model.h5")

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 300)]        0           []                               
                                                                                                  
 embedding (Embedding)          (None, 300, 100)     50000       ['input_1[0][0]']                
                                                                                                  
 conv1d (Conv1D)                (None, 300, 50)      50050       ['embedding[0][0]']              
                                                                                                  
 conv1d_1 (Conv1D)              (None, 300, 50)      25050       ['embedding[0][0]']              
                                                                                              

In [16]:
print(f"Accuracy: {model.accuracy(X_test, y_test)}")

Accuracy: 0.8528000116348267


In [17]:
print(f"Accuracy: {model.load_model_accuracy(X_test, y_test, filename='model.h5')}")

Accuracy: 0.8528
