A code to train sentiment analysis for NusaX dataset.

Simply `runtime > run all` to train and test.
Modify the language on the bottom part of this code.

# Training code

In [5]:
# grab the data first
!git clone https://github.com/IndoNLP/nusax.git

Cloning into 'nusax'...
remote: Enumerating objects: 301, done.[K
remote: Counting objects: 100% (5/5), done.[K
remote: Compressing objects: 100% (3/3), done.[K
remote: Total 301 (delta 4), reused 2 (delta 2), pack-reused 296 (from 1)[K
Receiving objects: 100% (301/301), 3.74 MiB | 14.85 MiB/s, done.
Resolving deltas: 100% (136/136), done.


In [2]:
import pandas as pd
from nltk import word_tokenize
import nltk
nltk.download('punkt')

# read csv data
# return a pair of (list of data, list of label)
# also tokenize the input first
def load_data(filedir):
    df = pd.read_csv(filedir)
    data = list(df['text'])
    data = [" ".join(word_tokenize(sent)) for sent in data]
    print(list(df['label']))
    return (data, list(df['label']))

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [3]:
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import f1_score,accuracy_score
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import PredefinedSplit
from scipy.sparse import vstack
nltk.download('punkt_tab')

import numpy as np

def hyperparam_tuning(xtrain, ytrain, xvalid, yvalid, classifier, param_grid):
    # combine train and valid
    x = vstack([xtrain, xvalid])
    y = ytrain + yvalid

    # create predefined split
    # -1 for all training and 0 for all validation
    ps = PredefinedSplit([-1] * len(ytrain) + [0] * len(yvalid))
    clf = GridSearchCV(classifier, param_grid, cv = ps)
    clf = clf.fit(x, y)

    return clf


def train_and_test(lang, directory="/content/nusax/datasets/sentiment/", feature="BoW", classifier="nb"):
    xtrain, ytrain = load_data(directory + lang +"/train.csv")
    xvalid, yvalid = load_data(directory + lang + "/valid.csv")
    xtest, ytest = load_data(directory + lang + "/test.csv")

    # train feature on train data
    if feature == "bow":
        vectorizer = CountVectorizer()
    elif feature == "tfidf":
        vectorizer = TfidfVectorizer()
    else:
        raise Exception('Vectorizer unknown. Use "BoW" or "tfidf"')
    vectorizer.fit(xtrain)

    # transform
    xtrain = vectorizer.transform(xtrain)
    xvalid = vectorizer.transform(xvalid)
    xtest = vectorizer.transform(xtest)

    # all classifiers
    classifier_model = {"nb" : MultinomialNB(),
                        "svm": SVC(),
                        "lr" : LogisticRegression(),
                       }
    # all params for grid-search
    param_grids = {"nb" : {"alpha": np.linspace(0.001,1,50)},
                   "svm": {'C': [0.01, 0.1, 1, 10, 100], 'kernel': ['rbf', 'linear']},
                   "lr" : {'C': np.linspace(0.001,10,100)},
                  }

    clf = hyperparam_tuning(xtrain, ytrain, xvalid, yvalid,
                            classifier=classifier_model[classifier],
                            param_grid=param_grids[classifier])

    pred = clf.predict(xtest.toarray())
    f1score = f1_score(ytest,pred, average='macro')

    return f1score, clf, vectorizer

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


# Testing

In [6]:
#@title Sentiment analysis demo
language = "indonesian" #@param ["indonesian", "english", "javanese", "sundanese", "balinese", "madurese", "minangkabau", "toba_batak", "acehnese", "buginese", "ngaju", "banjarese"]
input_sentiment = "abang saya keterima kerja di kamboja" #@param {type:"string"}


print(f"Training for sentiment analysis classifier {language}")
f1, clf, vectorizer = train_and_test(language, feature="bow")
print(f"Training done. F1 on test set is {f1}")

input_sentiment = " ".join(word_tokenize(input_sentiment))
sent = clf.predict(vectorizer.transform([input_sentiment]).toarray())
print(f"\nSentiment on the input text is {sent}")





Training for sentiment analysis classifier indonesian
['neutral', 'positive', 'neutral', 'positive', 'positive', 'neutral', 'neutral', 'negative', 'negative', 'positive', 'positive', 'negative', 'negative', 'positive', 'positive', 'positive', 'neutral', 'neutral', 'neutral', 'negative', 'neutral', 'positive', 'positive', 'negative', 'neutral', 'negative', 'negative', 'negative', 'negative', 'positive', 'positive', 'neutral', 'positive', 'positive', 'positive', 'neutral', 'positive', 'positive', 'negative', 'neutral', 'negative', 'negative', 'positive', 'positive', 'negative', 'negative', 'positive', 'positive', 'positive', 'positive', 'neutral', 'neutral', 'negative', 'positive', 'neutral', 'positive', 'negative', 'negative', 'negative', 'positive', 'positive', 'negative', 'neutral', 'positive', 'neutral', 'positive', 'neutral', 'neutral', 'negative', 'neutral', 'negative', 'positive', 'negative', 'neutral', 'negative', 'positive', 'positive', 'negative', 'positive', 'positive', 'posit

In [None]:
# 1. Data Preparation and Preprocessing
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score
import tensorflow as tf
from tensorflow.keras.layers import TextVectorization, Embedding, SimpleRNN, Dense, Dropout, Bidirectional
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam

def load_nusax_as_dataframe(base_dir, lang="indonesian"):
    train = pd.read_csv(f"{base_dir}/{lang}/train.csv")
    valid = pd.read_csv(f"{base_dir}/{lang}/valid.csv")
    test = pd.read_csv(f"{base_dir}/{lang}/test.csv")
    return train, valid, test

# 2. Tokenization and Embedding Preparation
def preprocess_text(train, valid, test, max_tokens=10000, seq_len=100):
    vectorizer = TextVectorization(
        max_tokens=max_tokens,
        output_sequence_length=seq_len,
        standardize="lower_and_strip_punctuation"
    )
    text_ds = tf.data.Dataset.from_tensor_slices(train["text"]).batch(128)
    vectorizer.adapt(text_ds)
    x_train = vectorizer(np.array(train["text"]))
    x_valid = vectorizer(np.array(valid["text"]))
    x_test = vectorizer(np.array(test["text"]))
    return x_train, x_valid, x_test, vectorizer

# 3. Build RNN Model (Keras)
def build_rnn_model(vocab_size, seq_len, embed_dim, rnn_units, num_layers=1, dropout_rate=0.3, 
                    bidirectional=False, num_classes=3):
    model = Sequential()
    model.add(Embedding(input_dim=vocab_size, output_dim=embed_dim, input_length=seq_len, name="embedding"))
    for _ in range(num_layers):
        rnn_layer = SimpleRNN(rnn_units, return_sequences=True if _ < num_layers-1 else False)
        if bidirectional:
            rnn_layer = Bidirectional(rnn_layer)
        model.add(rnn_layer)
        model.add(Dropout(dropout_rate))
    model.add(Dense(num_classes, activation="softmax", name="classifier"))
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(),
        optimizer=Adam(),
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
    )
    return model

# 4. Training and Experimentation
def train_and_evaluate_rnn(x_train, y_train, x_valid, y_valid, x_test, y_test, 
                           vocab_size, seq_len, embed_dim, rnn_units, num_layers, dropout_rate, bidirectional, num_classes):
    model = build_rnn_model(vocab_size, seq_len, embed_dim, rnn_units, num_layers, dropout_rate, bidirectional, num_classes)
    model.fit(x_train, y_train, epochs=10, validation_data=(x_valid, y_valid), batch_size=32)

    model.save_weights("nusax_rnn.weights.h5")
    y_pred = np.argmax(model.predict(x_test), axis=1)
    f1 = f1_score(y_test, y_pred, average="macro")
    print(f"Macro F1-score on test: {f1:.4f}")
    return model, f1, y_pred

# 5. Keras Usage:
train, valid, test = load_nusax_as_dataframe("nusax/datasets/sentiment")
label_map = {"negative": 0, "neutral": 1, "positive": 2}
y_train = train["label"].map(label_map).values
y_valid = valid["label"].map(label_map).values
y_test = test["label"].map(label_map).values
x_train, x_valid, x_test, vectorizer = preprocess_text(train, valid, test, max_tokens=10000, seq_len=100)

model, f1, y_pred = train_and_evaluate_rnn(
    x_train, y_train, x_valid, y_valid, x_test, y_test,
    vocab_size=10000, seq_len=100, embed_dim=64, rnn_units=64, num_layers=2, dropout_rate=0.3, bidirectional=False, num_classes=3
)

# 6. FROM SCRATCH IMPLEMENTATION
# ---- For Each Layer ----
class MyEmbedding:
    def __init__(self, weights):
        self.weights = weights
    def forward(self, x):
        x = np.asarray(x, dtype=np.int32)
        return self.weights[x]

class MySimpleRNN:
    def __init__(self, Wx, Wh, b, return_sequences=False):
        self.Wx = Wx
        self.Wh = Wh
        self.b = b
        self.return_sequences = return_sequences
    def forward(self, x):
        # x: (batch, seq, embed_dim)
        h = np.zeros((x.shape[0], self.Wh.shape[0]), dtype=np.float32)
        outputs = []
        for t in range(x.shape[1]):
            h = np.tanh(x[:, t] @ self.Wx + h @ self.Wh + self.b)
            if self.return_sequences:
                outputs.append(h.copy())
        if self.return_sequences:
            return np.stack(outputs, axis=1)
        else:
            return h

class MyDropout:
    def __init__(self, rate):
        self.rate = rate
    def forward(self, x):
        return x

class MyDense:
    def __init__(self, W, b):
        self.W = W
        self.b = b
    def forward(self, x):
        return x @ self.W + self.b

class MySoftmax:
    def forward(self, x):
        e_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
        return e_x / np.sum(e_x, axis=-1, keepdims=True)

class MyRNNModel:
    def __init__(self, keras_model):
        self.embedding = MyEmbedding(keras_model.get_layer("embedding").get_weights()[0])
        self.rnn_layers = []
        self.dropout_layers = []
        last_is_rnn = False
        for layer in keras_model.layers:
            if "SimpleRNN" in layer.__class__.__name__:
                Wx, Wh, b = layer.get_weights()
                self.rnn_layers.append(MySimpleRNN(Wx, Wh, b, return_sequences=layer.return_sequences))
                last_is_rnn = True
            elif "Dropout" in layer.__class__.__name__:
                self.dropout_layers.append(MyDropout(layer.rate))
                last_is_rnn = False

        self.ordered_layers = []
        rnn_idx = 0
        dropout_idx = 0
        for layer in keras_model.layers:
            if "Embedding" in layer.__class__.__name__:
                continue  # handled above
            elif "SimpleRNN" in layer.__class__.__name__:
                self.ordered_layers.append(self.rnn_layers[rnn_idx])
                rnn_idx += 1
            elif "Dropout" in layer.__class__.__name__:
                self.ordered_layers.append(self.dropout_layers[dropout_idx])
                dropout_idx += 1
        # Dense and softmax
        dense_W, dense_b = keras_model.get_layer("classifier").get_weights()
        self.dense = MyDense(dense_W, dense_b)
        self.softmax = MySoftmax()

    def forward(self, x):
        x = self.embedding.forward(x)
        for layer in self.ordered_layers:
            x = layer.forward(x)
        x = self.dense.forward(x)
        x = self.softmax.forward(x)
        return x

# 7. Compare Forward Propagation Results
def compare_keras_and_scratch(keras_model, scratch_model, x_test, y_test):
    y_keras = np.argmax(keras_model.predict(x_test), axis=1)

    x_test_np = np.array(x_test)
    y_scratch = []
    for xi in x_test_np:
        pred = scratch_model.forward(xi[np.newaxis, :])
        y_scratch.append(np.argmax(pred, axis=1)[0])
    y_scratch = np.array(y_scratch)

    if y_test.ndim > 1:
        y_test = np.argmax(y_test, axis=1)

    from sklearn.metrics import f1_score
    keras_f1 = f1_score(y_test, y_keras, average="macro")
    scratch_f1 = f1_score(y_test, y_scratch, average="macro")
    print(f"Keras F1: {keras_f1:.4f}, Scratch F1: {scratch_f1:.4f}")

# 8. Usage:
keras_model = build_rnn_model(
    vocab_size=10000, seq_len=100, embed_dim=64, rnn_units=64,
    num_layers=2, dropout_rate=0.3, bidirectional=False, num_classes=3
)
keras_model(np.zeros((1, 100), dtype=np.int32))  
keras_model.load_weights("nusax_rnn.weights.h5")  

scratch_model = MyRNNModel(keras_model)
compare_keras_and_scratch(keras_model, scratch_model, x_test, y_test)

Epoch 1/10




[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 97ms/step - loss: 1.1497 - sparse_categorical_accuracy: 0.3724 - val_loss: 1.0860 - val_sparse_categorical_accuracy: 0.4100
Epoch 2/10
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 68ms/step - loss: 1.0739 - sparse_categorical_accuracy: 0.4865 - val_loss: 1.1597 - val_sparse_categorical_accuracy: 0.3700
Epoch 3/10
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 64ms/step - loss: 1.1372 - sparse_categorical_accuracy: 0.4221 - val_loss: 1.1151 - val_sparse_categorical_accuracy: 0.4300
Epoch 4/10
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 95ms/step - loss: 0.9445 - sparse_categorical_accuracy: 0.5819 - val_loss: 1.2252 - val_sparse_categorical_accuracy: 0.3200
Epoch 5/10
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 63ms/step - loss: 0.8317 - sparse_categorical_accuracy: 0.6551 - val_loss: 1.2125 - val_sparse_categorical_accuracy: 0.4200
Epoch 6/10
[1m1