In [62]:
import numpy as np
import json
import os
from tqdm.notebook  import tqdm

from sklearn.decomposition import PCA
from logging import raiseExceptions
from sklearn.metrics import precision_score, recall_score, f1_score
from scipy.optimize import linear_sum_assignment
from sklearn.metrics import confusion_matrix
import numpy as np

In [40]:
def load_json(file_path: str):
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

In [41]:
# Loading dataset

train = load_json('UIT-VSFC-train.json')
dev = load_json('UIT-VSFC-dev.json')
test = load_json('UIT-VSFC-test.json')

## Bai 1

In [4]:
!pip install py_vncorenlp



In [6]:
os.makedirs('vncorenlp', exist_ok=True)

In [7]:
import py_vncorenlp

# Automatically download VnCoreNLP components from the original repository
# and save them in some local working folder
py_vncorenlp.download_model(save_dir='/content/vncorenlp')

# Load VnCoreNLP from the local working folder that contains both `VnCoreNLP-1.2.jar` and `models`
#model = py_vncorenlp.VnCoreNLP(save_dir='/content/vncorenlp')

rdrsegmenter = py_vncorenlp.VnCoreNLP(annotators=["wseg"], save_dir='/content/vncorenlp')
text = "Ông Nguyễn Khắc Chúc  đang làm việc tại Đại học Quốc gia Hà Nội. Bà Lan, vợ ông Chúc, cũng làm việc tại đây."
output = rdrsegmenter.word_segment(text)
print(output)


VnCoreNLP model folder /content/vncorenlp already exists! Please load VnCoreNLP from this folder!
['Ông Nguyễn_Khắc_Chúc đang làm_việc tại Đại_học Quốc_gia Hà_Nội .', 'Bà Lan , vợ ông Chúc , cũng làm_việc tại đây .']


In [30]:
segmented_train = [
    rdrsegmenter.word_segment(each['sentence'])[0]
    for each in tqdm(train, desc="Segment train set")
]

Segment train set:   0%|          | 0/11426 [00:00<?, ?it/s]

In [31]:
segmented_test = [
    rdrsegmenter.word_segment(each['sentence'])[0]
    for each in tqdm(test, desc="Segment test set")
]

Segment test set:   0%|          | 0/3166 [00:00<?, ?it/s]

## Bai 2

In [3]:
import torch
from transformers import AutoModel, AutoTokenizer

In [19]:
phobert = AutoModel.from_pretrained("vinai/phobert-base-v2")
tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of RobertaModel were not initialized from the model checkpoint at vinai/phobert-base-v2 and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [32]:
encoded_train = [
    torch.tensor([tokenizer.encode(each)])
    for each in tqdm(segmented_train, desc= 'Encoding train set')
]

Encoding train set:   0%|          | 0/11426 [00:00<?, ?it/s]

In [33]:
encoded_test = [
    torch.tensor([tokenizer.encode(each)])
    for each in tqdm(segmented_test, desc= 'Encoding test set')
]

Encoding test set:   0%|          | 0/3166 [00:00<?, ?it/s]

In [25]:
# Extract feature from train set using PhoBERT.

with torch.no_grad():
  train_feature = [
      phobert(each)
      for each in tqdm(encoded_train, desc= 'PhoBERT train')
  ]

PhoBERT train:   0%|          | 0/11426 [00:00<?, ?it/s]

In [34]:
torch.save(train_feature, "train_feature.pt")

In [35]:
# Extract feature from test set using PhoBERT.

with torch.no_grad():
  test_feature = [
      phobert(each)
      for each in tqdm(encoded_test, desc= 'PhoBERT test')
  ]

PhoBERT test:   0%|          | 0/3166 [00:00<?, ?it/s]

In [36]:
torch.save(test_feature, "test_feature.pt")


In [8]:
# Load feature

train_feature = torch.load("train_feature.pt", weights_only=False)
test_feature = torch.load("test_feature.pt", weights_only=False)

In [9]:
# Get last_hidden_state from train_feature

train_last_hidden_state = [
    each['last_hidden_state']
    for each in tqdm(train_feature)
]

  0%|          | 0/11426 [00:00<?, ?it/s]

In [10]:
# Get last_hidden_state from test_feature

test_last_hidden_state = [
    each['last_hidden_state']
    for each in tqdm(test_feature)
]

  0%|          | 0/3166 [00:00<?, ?it/s]

In [11]:
train_last_hidden_state[0].shape #(batch_size, seq_len, hidden_dim)

torch.Size([1, 6, 768])

In [12]:
# Get mean pooling from hidden state

train_mean_pooled_vector = [
    each.mean(dim=1).squeeze(0)
    for each in tqdm(train_last_hidden_state)
]

  0%|          | 0/11426 [00:00<?, ?it/s]

In [13]:
# Get mean pooling from hidden state

test_mean_pooled_vector = [
    each.mean(dim=1).squeeze(0)
    for each in tqdm(test_last_hidden_state)
]

  0%|          | 0/3166 [00:00<?, ?it/s]

## Bai 3

In [59]:
class GMM:
    def __init__(self, n_components, max_iter=100, comp_names=None, reg_covar=1e-6, verbose=True):
        self.n_components = n_components
        self.max_iter = max_iter
        self.reg_covar = reg_covar
        self.verbose = verbose

        if comp_names is None:
            self.comp_names = [f"comp{index}" for index in range(self.n_components)]
        else:
            self.comp_names = comp_names

        self.pi = [1/self.n_components for _ in range(self.n_components)]

    def multivariate_normal(self, x, mean, cov):
        cov = cov + np.eye(len(mean)) * self.reg_covar
        size = len(x)
        det = np.linalg.det(cov)
        if det <= 0: det = self.reg_covar
        norm_const = 1.0 / (np.power((2 * np.pi), size/2) * np.sqrt(det))
        x_mu = x - mean
        inv = np.linalg.inv(cov)
        result = np.dot(x_mu, inv)
        result = np.dot(result, x_mu)
        val = norm_const * np.exp(-0.5 * result)
        if np.isnan(val) or np.isinf(val):
            val = self.reg_covar
        return val

    def fit(self, X):
        X = np.array(X)
        n_samples, n_features = X.shape

        # Initialization
        split_X = np.array_split(X, self.n_components)
        self.means = [np.mean(x, axis=0) for x in split_X]
        self.covariances = [np.cov(x.T) + np.eye(n_features) * self.reg_covar for x in split_X]
        self.pi = [1 / self.n_components] * self.n_components

        for it in tqdm(range(self.max_iter), desc="GMM EM", disable=not self.verbose):
            # E-step
            r = np.zeros((n_samples, self.n_components))
            for n in range(n_samples):
                for k in range(self.n_components):
                    r[n, k] = self.pi[k] * self.multivariate_normal(X[n], self.means[k], self.covariances[k])
                total = np.sum(r[n])
                if total == 0:
                    r[n] = 1.0 / self.n_components
                else:
                    r[n] /= total

            N_k = np.sum(r, axis=0)

            # M-step
            for k in range(self.n_components):
                self.means[k] = np.sum(r[:, k].reshape(-1, 1) * X, axis=0) / (N_k[k] + self.reg_covar)
                cov_k = np.zeros((n_features, n_features))
                for n in range(n_samples):
                    diff = (X[n] - self.means[k]).reshape(-1, 1)
                    cov_k += r[n, k] * np.dot(diff, diff.T)
                self.covariances[k] = cov_k / (N_k[k] + self.reg_covar) + np.eye(n_features) * self.reg_covar
                self.pi[k] = N_k[k] / n_samples

    def predict(self, X):
        X = np.array(X)
        n_samples = X.shape[0]
        cluster_indices = []
        for n in range(n_samples):
            probs = [self.multivariate_normal(X[n], self.means[k], self.covariances[k]) for k in range(self.n_components)]
            idx = int(np.argmax(probs))
            cluster_indices.append(idx)
        return cluster_indices


In [None]:
X_train = torch.stack([v.to(torch.float16) for v in train_mean_pooled_vector]).cpu().numpy()
X_test = torch.stack([v.to(torch.float16) for v in test_mean_pooled_vector]).cpu().numpy()

In [56]:
# Saving and loading

#np.save("X_train.npy", X_train)
#np.save("X_test.npy", X_test)
#X_train = np.load("X_train.npy")
#X_test = np.load("X_test.npy")

In [55]:
# Due to large size from PhoBER (768), computing might take long time and large resource to process.
# To fix it, i use PCA to reduce dim to 50

pca = PCA(n_components=50)
X_train_pca = pca.fit_transform(X_train)
X_test_pca = pca.transform(X_test)

In [60]:
GMM = GMM(n_components=3, max_iter=50, verbose=True)
GMM.fit(X_train_pca)
y_pred = GMM.predict(X_test_pca)

GMM EM: 100%|██████████| 50/50 [04:44<00:00,  5.69s/it]


In [65]:
# Label mapping from test set
y_true = []

for each in test:
  if each['sentiment'] == 'positive':
    y_true.append(0)
  elif each['sentiment'] == 'negative':
    y_true.append(1)
  elif each['sentiment'] == 'neutral':
    y_true.append(2)
  else:
    raise Exception("Unknown sentiment label")

In [67]:
# Map predict to best label

def best_map(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    indexes = linear_sum_assignment(-cm)
    mapping = dict(zip(indexes[1], indexes[0]))
    y_pred_mapped = np.array([mapping[yi] for yi in y_pred])
    return y_pred_mapped

y_pred_mapped = best_map(y_true, y_pred)

In [68]:
precision = precision_score(y_true, y_pred_mapped, average='macro')
recall    = recall_score(y_true, y_pred_mapped, average='macro')
f1        = f1_score(y_true, y_pred_mapped, average='macro')

print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1-score:  {f1:.4f}")


Precision: 0.5734
Recall:    0.5533
F1-score:  0.4911
