In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
import fasttext
import keras
from underthesea import word_tokenize
from gensim.utils import simple_preprocess

from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras import layers, Model
from tensorflow.keras.layers import concatenate
from tensorflow.keras.initializers import Constant


from tensorflow.keras.layers import Dense, Dropout, Embedding, LSTM, Bidirectional,Conv1D,GRU,SpatialDropout1D,Concatenate,Input,Flatten,GlobalMaxPooling1D,Reshape
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.layers import MaxPooling1D,AveragePooling1D


In [2]:
from numpy import loadtxt
X = loadtxt('X_absa.txt',delimiter=',',dtype = np.int32)
y_aspect = loadtxt('y_aspect.txt',delimiter=',',dtype = np.int32)
loaded_arr = loadtxt("y_polarity.txt",dtype = np.int32,delimiter=',')
y_pol = loaded_arr.reshape(
    loaded_arr.shape[0], loaded_arr.shape[1] //4, 4)
embedding_matrix = loadtxt('embedding_matrix_absa.csv',delimiter=',',dtype = None)
import pickle
a_file = open('wordtoix_ABSA.pkl','rb')
wordtoix = pickle.load(a_file)

a_file = open('ixtoword_ABSA.pkl','rb')
ixtoword = pickle.load(a_file)

In [3]:
from tensorflow.keras import layers

# soruce:(https://keras.io/examples/nlp/text_classification_with_transformer/)
from keras.initializers import Constant

class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1,**kwargs):
        super(TransformerBlock, self).__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation='tanh'), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)
        
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.rate = rate

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "ff_dim": self.ff_dim,
            "rate": self.rate,
        })
        return config

In [4]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim,**kwargs):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)
        
        self.maxlen = maxlen
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
  

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions
    
    def get_config(self):
        config = super().get_config()
        config.update({
            "maxlen": self.maxlen,
            "vocab_size": self.vocab_size,
            "embed_dim": self.embed_dim,
        })
        return config

    
   

In [7]:
embed_dim = 32  # Embedding size for each token
num_heads = 2  # Number of attention heads
ff_dim = 32  # 
max_length = 382
vocab_size = len(wordtoix)

In [8]:
inputs = layers.Input(shape=(max_length,))
embedding_layer = TokenAndPositionEmbedding(max_length, vocab_size, embed_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
x = transformer_block(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.5)(x)
x = layers.Dense(20, activation="relu")(x)
x = layers.Dropout(0.5)(x)
aspect_output = layers.Dense(12, activation="sigmoid",name ='output_aspect')(x)

output_pol1 = Dense(4,activation = 'softmax',)(x)
output_pol1 = Reshape((-1,4))(output_pol1)

output_pol2 = Dense(4,activation = 'softmax')(x)
output_pol2 = Reshape((-1,4))(output_pol2)

output_pol3 = Dense(4,activation = 'softmax')(x)
output_pol3 = Reshape((-1,4))(output_pol3)

output_pol4 = Dense(4,activation = 'softmax')(x)
output_pol4 = Reshape((-1,4))(output_pol4)

output_pol5 = Dense(4,activation = 'softmax')(x)
output_pol5 = Reshape((-1,4))(output_pol5)

output_pol6 = Dense(4,activation = 'softmax')(x)
output_pol6 = Reshape((-1,4))(output_pol6)

output_pol7 = Dense(4,activation = 'softmax')(x)
output_pol7 = Reshape((-1,4))(output_pol7)

output_pol8 = Dense(4,activation = 'softmax')(x)
output_pol8 = Reshape((-1,4))(output_pol8)

output_pol9 = Dense(4,activation = 'softmax')(x)
output_pol9 = Reshape((-1,4))(output_pol9)

output_pol10 = Dense(4,activation = 'softmax')(x)
output_pol10 = Reshape((-1,4))(output_pol10)

output_pol11 = Dense(4,activation = 'softmax')(x)
output_pol11 = Reshape((-1,4))(output_pol11)

output_pol12 = Dense(4,activation = 'softmax')(x)
output_pol12 = Reshape((-1,4))(output_pol12)

output_pol = concatenate([output_pol1,output_pol2,output_pol3,output_pol4,output_pol5,
                    output_pol6,output_pol7,output_pol8,output_pol9,output_pol10,output_pol11,output_pol12],name='output_polarity',axis = 1)

model2 = Model(inputs = inputs ,outputs =[aspect_output,output_pol])


In [9]:
callback = tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)

In [12]:
loss = {'output_aspect' : "binary_crossentropy" , 'output_polarity': 'categorical_crossentropy'}
model2.compile(optimizer='adam', loss=loss,loss_weights=[0.1,1])

from sklearn.model_selection import train_test_split
X_train,X_test,y_aspect_train,y_aspect_test,y_pol_train,y_pol_test = train_test_split(X,y_aspect,y_pol,test_size= 0.22 ,random_state=42)

In [None]:
history = model2.fit(X_train,[y_aspect_train,y_pol_train],
                   batch_size=64,
                    epochs=60,
                    validation_data=[X_test, [y_aspect_test,y_pol_test]],callbacks = [callback])
model2.save('best_model_transformer_absa2.hdf5')

In [None]:
import matplotlib.pyplot as plt
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.xlabel('epoch')
plt.legend()
plt.show() 

In [17]:
model2 =load_model('best_model_transformer_absa2.hdf5',custom_objects={'TokenAndPositionEmbedding':TokenAndPositionEmbedding,'TransformerBlock':TransformerBlock})

In [None]:
def predict_aspect_pol(text):
    token_sample = word_processing(text)
    X_predict=[wordtoix[word] for word in token_sample]
    X_predict=sequence.pad_sequences([X_predict],max_length)
    predict= model.predict(X_predict)
    
    i = 0
    label_aspect_pred_lst= []
    for pred in predict[0][0]:
        if pred > 0.0002 :
    #         print(label_aspect_rev[i])
            label_aspect_pred_lst.append(label_aspect_rev[i])
        i = i+1
        
    j =0 
    label_pol_aspect_dict ={}
    for pre_pol in predict[1][0]:
        if pre_pol.argmax() != 0:
    #         print(label_aspect_rev[i],pre_pol.argmax())
            label_pol_aspect_dict[label_aspect_rev[j]]= label_polarity_rev[pre_pol.argmax()]

        j = j +1
    label_aspect_final = {}
    for label in label_pol_aspect_dict.keys():
        if label in label_aspect_pred_lst:
#             print(label,label_pol_aspect_dict[label])
             label_aspect_final[label] = label_pol_aspect_dict[label]
    return label_aspect_final

In [None]:
text = 'phục vụ tận tình'

In [None]:
predict_aspect_pol(text)