In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Setup

In [None]:
# Install jdk8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Set jdk environment path which enables you to run Pyspark in your Colab environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-8u312-b07-0ubuntu1~18.04-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)


In [None]:

!pip install bigdl-dllib



In [None]:
!pip install -Uq emoji \
                 optuna \
                 flashtext \
                 underthesea \
                 scikit-learn \
                 vncorenlp \

# Library

In [None]:
# import necesary libraries and modules

from bigdl.dllib.nn.layer import *
from bigdl.dllib.nn.criterion import *
from bigdl.dllib.utils.common import *
from bigdl.dllib.nnframes.nn_classifier import *
from bigdl.dllib.feature.common import *
import matplotlib.pyplot as plt
from __future__ import print_function
import os
import argparse


from bigdl.dllib.nncontext import *
init_nncontext()

pyspark_submit_args is:  --driver-class-path /usr/local/lib/python3.7/dist-packages/bigdl/share/dllib/lib/bigdl-dllib-spark_2.4.6-2.0.0-jar-with-dependencies.jar pyspark-shell 


In [None]:
import glob
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as f

from pyspark.sql.functions import col,length,trim
import re
from emoji import get_emoji_regexp
import unicodedata
from underthesea import word_tokenize
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, Word2Vec
from pyspark.ml import Pipeline

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .getOrCreate()

# Load dataset
/content/drive/MyDrive/DoAn/data/dataset.csv

In [None]:
def load_data_spark(url = '/content/drive/MyDrive/DoAn/data/gannhan/'):
  folder= ['VA','BLong','Phuong','TH']
  address_files = []
  for i in folder:
    address_files += glob.glob(url+i+'/*.xlsx')
  temp_column = pd.read_excel(address_files[0]).columns.str.lower()
  sum_read_file=[]
  for i in address_files:
    temp_file = pd.read_excel(i)
    temp_file.columns= temp_column
  
    sum_read_file.append(temp_file)
  
  sum_read_file = pd.concat(sum_read_file)
  print(len(sum_read_file))
  return spark.createDataFrame(sum_read_file.astype(str))

In [None]:
dataDF = load_data_spark()

13018


# Processing

In [None]:
class Preporcessing:
  def __init__(self, basic_preprocessing = False, embedding_type = "tfidf", path_acronyms = None):
    self.basic_prepro = basic_preprocessing
    self.embedding_type = embedding_type
    if path_acronyms:
      self.dict_special = self.special_case(path_acronyms)
  
  def special_case(self, path_acronyms):
      special_w = pd.read_excel(path_acronyms)
      special_w = special_w.to_dict('index')
      dict_special={}
      for key, values in special_w.items():
        row = []
        for k,v in values.items():
          if len(v)>=3:
            row.append(v)
        if len(row) ==2:
          dict_special.update({row[1]:[row[0]]})
      return dict_special

  def clean_text(self, text, special_w=None):
    # Unicode normalize
    text = unicodedata.normalize('NFC',text)

    # Lower
    text = text.lower()

    # Remove all emoji
    text = re.sub(get_emoji_regexp(),"",text)

    #  Change #string to HASTAG 
    if self.basic_prepro == False:
        text = re.sub('#\S+',"HASTAG",text)

        # # Find all price tag and change to GIÁ
        pricetag = '((?:(?:\d+[,\.]?)+) ?(?:nghìn đồng|đồng|k|vnd|d|đ))'
        text = re.sub(pricetag,"PRICE",text)

        # Replace some special word
        replace_dct = {"òa ":["oà "], "óa ":["oá "], "ỏa ":["oả "], "õa ":["oã "], "ọa ":["oạ "],
                  "òe":["oè"], "óe":["oé"], "ỏe":["oẻ"], "õe":["oẽ"], "ọe":["oẹ"],
                  "ùy":["uỳ"], "úy":["uý"], "ủy":["uỷ"], "ũy":["uỹ"], "ụy":["uỵ"],
                  "ùa":["uà"], "úa ":["uá "], "ủa":["uả"], "ũa":["uã"], "ụa":["uạ"],
                  "xảy":["xẩy"], "bảy":["bẩy"], "gãy":["gẫy"],"nhân viên ":["nvien"],"quay":['qay']}
        sum_special =  {**special_w, **replace_dct}    
        for key, values in sum_special.items():
          if type(values) == list:
            for v in values:
              text = text.replace(v, key)
        text = text.replace('ìnhh','ình')

    # Remove all special char
    specialchar = r"[\"#$%&'()*+,\-\/\\:;<=>@[\]^_`{|}~\n\r\t]"
    text = re.sub(specialchar," ",text)

    if self.basic_prepro == False:
        text = word_tokenize(text, format="text")

    return text

  def clean_df(self, sparkDF):
    Clean_UDF = udf(lambda x: self.clean_text(x,self.dict_special),StringType())
    # Clean_Nan = udf (lambda x: label_encode[2] if x=='nan' else label_encode[int(float(x))],ArrayType(StringType()))
    Clean_Nan = udf (lambda x: float(-2.0) if x not in ['0.0','1.0','-1.0'] else float(x), FloatType())
    DF_Clean = sparkDF.select(Clean_UDF('cmt').alias("cmt") , Clean_Nan('general').alias("general"), Clean_Nan('price').alias("price"), Clean_Nan('quality').alias("quality"), Clean_Nan('service').alias("service"), Clean_Nan('stylefood').alias("stylefood"),Clean_Nan('location').alias("location"), Clean_Nan('background').alias("background"))
    return DF_Clean.withColumn("label", f.array("general",'price',"quality","service","stylefood","location","background").cast(ArrayType(FloatType())))
  
  def clean_sentenceDF(self, sentenceDF):
    Clean_UDF = udf(lambda x: self.clean_text(x,self.dict_special),StringType())
    DF_Clean = sentenceDF.select(Clean_UDF('cmt').alias("cmt"))
    return DF_Clean

  def split_data(self, sparkDF, train_ratio = 0.8, seed = 50):
    train_data, test_data = sparkDF.randomSplit([train_ratio, 1-train_ratio], seed)
    return train_data, test_data

  def Embedding(self, num_feature):
    tokenizer = Tokenizer(inputCol="cmt", outputCol="words")
    newdb = VectorAssembler(inputCols=["features_vec"], outputCol="features")

    if self.embedding_type == "wordcount":
      countVectors = CountVectorizer(inputCol="words", outputCol="features_vec", minDF=5, vocabSize=num_feature)
      pipeline = Pipeline(stages=[tokenizer,countVectors,newdb])

    elif self.embedding_type == "tfidf":
      hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=num_feature)
      idf = IDF(inputCol="rawFeatures", outputCol="features_vec" )
      pipeline = Pipeline(stages=[tokenizer,hashingTF,idf,newdb])

    elif self.embedding_type == "word2vec":
      w2v = Word2Vec(vectorSize=num_feature, seed=42, inputCol="words", outputCol="features_vec")
      pipeline = Pipeline(stages=[tokenizer, w2v,newdb])
      
    else:
      raise ValueError("Embedding phải là 'wordcount', 'tfidf' hoặc 'word2vec'. Các embedding khác chưa hỗ trợ.")
    
    return pipeline
def convertCase(float_num):
  """ so sánh với 0,-1,1,-2""" 
  value_with_nan = abs(-2-float_num)
  value_with_neu = abs(0-float_num)
  value_with_neg = abs(-1-float_num)
  value_with_pos = abs(1-float_num)
  value_min = min([value_with_nan,value_with_neu,value_with_neg,value_with_pos])
  if value_min == value_with_nan:
    return -2.
  elif value_min== value_with_neu:
    return 0.
  elif value_min == value_with_neg:
    return -1.
  return 1.

def edit_prediction_label(prediction_data):
  edit_pred_label = udf(lambda label_list: [convertCase(x) for x in label_list],ArrayType(FloatType()))
  list_pre = prediction_data.withColumn("prediction",edit_pred_label('prediction'))
  return list_pre

In [None]:
class Model:
  def __init__(self, model, input_dim, batch_size = 64,learning_rate = 0.2,epoch = 10,criterion=MSECriterion()):
    self.criterion=criterion
    self.model = model
    self.est =  NNEstimator(model, criterion, SeqToTensor([input_dim]), ArrayToTensor([7])) \
                .setBatchSize(batch_size).setLearningRate(learning_rate).setMaxEpoch(epoch) 
    self.NNmodel = None
  
  def train(self,train_data):
    self.NNmodel = self.est.fit(train_data)

  def predict(self,test_data):
    return self.NNmodel.transform(test_data)

  def evaluate(self, predict_data):
    res_final = edit_prediction_label(predict_data)
    list_pre = res_final.select('label','prediction').collect()
    acc_aspect = [0,0,0,0,0,0,0]
    total = len(list_pre)
    for i in range(0,len(list_pre)):
      for j in range(7):
        if list_pre[i][0][j] == list_pre[i][1][j]:
          acc_aspect[j] +=1
    return acc_aspect

  def save_model(self, path_model):
    self.NNmodel.save(f"{path_model}")
    print("Save nnmodel successfull!")
  
  def load_model(self, path):
    from bigdl.dllib.nnframes.nn_classifier import NNModel
    self.NNmodel = NNModel(self.model)
    self.NNmodel = self.NNmodel.load(path)
    print("Load nnmodel successfull!")

creating: createMSECriterion


In [None]:
def LSTM_model(input_size, hidden_size, output_size):
    model = Sequential()
    recurrent = Recurrent()
    recurrent.add(LSTM(input_size, hidden_size))
    model.add(InferReshape([-1, input_size], True))
    model.add(recurrent)
    model.add(Select(2, -1))
    model.add(Dropout(0.2))
    model.add(Linear(hidden_size, output_size))
    return model
def GRU_model(input_size, hidden_size, output_size):
    model = Sequential()
    recurrent = Recurrent()
    recurrent.add(GRU(input_size, hidden_size))
    model.add(InferReshape([-1, input_size], True))
    model.add(recurrent)
    model.add(Select(2, -1))
    model.add(Dropout(0.2))
    model.add(Linear(hidden_size, output_size))
    return model
def MLP(input_size, hidden_size, output_size):
    model = Sequential()
    model.add(Linear(input_size, hidden_size))
    model.add(ReLU())
    model.add(Linear(hidden_size, 256))
    model.add(ReLU())
    model.add(Linear(256, output_size))
    return model

def RNN_model(input_size, hidden_size, output_size):
    model = Sequential()
    recurrent = Recurrent()
    recurrent.add(RnnCell(input_size, hidden_size, Tanh()))
    model.add(InferReshape([-1, input_size], True))
    model.add(recurrent)
    model.add(Select(2, -1))
    model.add(Dropout(0.2))
    model.add(Linear(hidden_size, output_size))
    return model


# Run and Save Model with embedding 300

In [None]:
def get_evalute(model,test_data):
  pred_data = model.predict(test_data)
  acc = model.evaluate(pred_data)
  aspect_name = ['general', 'price', 'quality', 'service', 'stylefood','location', 'background']
  total = len(pred_data.collect())
  for i in range(7):
    print('{} : {} '.format(aspect_name[i],acc[i]/total))
  print("Sum evaluate : ",sum(acc)/(total*7))
def get_embedding(embedding_name,data):
  preprocessing = Preporcessing(basic_preprocessing=False, embedding_type=embedding_name, path_acronyms='/content/drive/MyDrive/BigData/data/original/Acronyms.xlsx')
  sparkDF_cleaned = preprocessing.clean_df(dataDF)
  train_data, test_data = preprocessing.split_data(sparkDF_cleaned)
  embedding = preprocessing.Embedding(300)
  embedding_abc = embedding.fit(train_data)
  embedding_abc.save('/content/drive/MyDrive/DoAn/model/Embedding/{}'.format(embedding_name))
  train_data = embedding_abc.transform(train_data).select('features','label')
  test_data = embedding_abc.transform(test_data).select('features','label')
  return train_data,test_data
def train_model_and_save(dataDF):
  embedding_nameS = ['tfidf','wordcount','word2vec']
  model_train =[LSTM_model(300,256,7),GRU_model(300,256,7),MLP(300,256,7),RNN_model(300,256,7)]
  model_name = ['lstm','gru','mlp','rnn']
  for embedding_name in embedding_nameS:
    print('------------------------------------------Embedding : {}----------------------------------------'.format(embedding_name))
    train_data,test_data = get_embedding(embedding_name,dataDF)
    for j in range(4):
      print(model_name[j])
      model = Model(model_train[j], 300, epoch=10, batch_size = 64,learning_rate = 0.2, criterion= MSECriterion())
      model.train(train_data)
      model.save_model('/content/drive/MyDrive/DoAn/model/Model_BigDL/{}/{}'.format(model_name[j],embedding_name))
      get_evalute(model,test_data)
train_model_and_save(dataDF)

creating: createSequential
creating: createRecurrent
creating: createTanh
creating: createSigmoid
creating: createLSTM
creating: createInferReshape
creating: createSelect
creating: createDropout
creating: createLinear
creating: createSequential
creating: createRecurrent
creating: createTanh
creating: createSigmoid
creating: createGRU
creating: createInferReshape
creating: createSelect
creating: createDropout
creating: createLinear
creating: createSequential
creating: createLinear
creating: createReLU
creating: createLinear
creating: createReLU
creating: createLinear
creating: createSequential
creating: createRecurrent
creating: createTanh
creating: createRnnCell
creating: createInferReshape
creating: createSelect
creating: createDropout
creating: createLinear
------------------------------------------Embedding : tfidf----------------------------------------
lstm
creating: createMSECriterion
creating: createSeqToTensor
creating: createArrayToTensor
creating: createFeatureLabelPreprocess

In [None]:
preprocessing = Preporcessing(basic_preprocessing=False, embedding_type="tfidf", path_acronyms='/content/drive/MyDrive/BigData/data/original/Acronyms.xlsx')
sparkDF_cleaned = preprocessing.clean_df(dataDF)
train_data, test_data = preprocessing.split_data(sparkDF_cleaned)
embedding = preprocessing.Embedding(300)


In [None]:
embedding_abc = embedding.fit(train_data)

In [None]:
embedding_abc.save('/content/drive/MyDrive/DoAn/model/Embedding/wordcount')

In [None]:
from pyspark.ml import Pipeline, PipelineModel
embedding_test = PipelineModel.load('/content/drive/MyDrive/DoAn/model/Embedding/wordcount')

In [None]:

train_data = embedding_abc.transform(train_data).select('features','label')
test_data = embedding_abc.transform(test_data).select('features','label')

# Run and Save Model with Custom Embedding 


In [None]:
def get_evalute(model,test_data):
  pred_data = model.predict(test_data)
  acc = model.evaluate(pred_data)
  aspect_name = ['general', 'price', 'quality', 'service', 'stylefood','location', 'background']
  total = len(pred_data.collect())
  for i in range(7):
    print('{} : {} '.format(aspect_name[i],acc[i]/total))
  print("Sum evaluate : ",sum(acc)/(total*7))
def prepare_model(embedding_dim,embedtype,model_type):
  if model_type == "lstm":
    model = LSTM_model(embedding_dim, 256, 7)
  elif model_type == "gru":
    model = GRU_model(embedding_dim, 256, 7)
  elif model_type == "mlp":
    if embedtype == "word2vec":
      hidden_size = 256
    else:
      hidden_size = 1000
    model = MLP(embedding_dim, hidden_size, 7)
  elif model_type == "rnn":
    model = RNN_model(embedding_dim, 256, 7)
  return model
def get_embedding(embedding_name,data,embedding_dim):
  preprocessing = Preporcessing(basic_preprocessing=False, embedding_type=embedding_name, path_acronyms='/content/drive/MyDrive/BigData/data/original/Acronyms.xlsx')
  sparkDF_cleaned = preprocessing.clean_df(dataDF)
  train_data, test_data = preprocessing.split_data(sparkDF_cleaned)
  embedding = preprocessing.Embedding(embedding_dim)
  embedding_abc = embedding.fit(train_data)
  embedding_abc.save('/content/drive/MyDrive/DoAn/Model_Custom/Embedding/{}'.format(embedding_name))
  train_data = embedding_abc.transform(train_data).select('features','label')
  test_data = embedding_abc.transform(test_data).select('features','label')
  return train_data,test_data
def train_model_and_save2(dataDF):
  embedding_nameS = ['tfidf','wordcount','word2vec']
  dict_embedding = {'tfidf':3000,'wordcount':3000,'word2vec':300}
  model_name = ['lstm','gru','mlp','rnn']
  for embedding_name in embedding_nameS:
    print('------------------------------------------Embedding : {}----------------------------------------'.format(embedding_name))
    train_data,test_data = get_embedding(embedding_name,dataDF,dict_embedding[embedding_name])
    for j in range(4):
      print(model_name[j])
      model_train= prepare_model(dict_embedding[embedding_name],embedding_name,model_name[j])
      model = Model(model_train, dict_embedding[embedding_name], epoch=10, batch_size = 64,learning_rate = 0.2, criterion= MSECriterion())
      model.train(train_data)
      model.save_model('/content/drive/MyDrive/DoAn/Model_Custom/Model_BigDL/{}/{}'.format(model_name[j],embedding_name))
      get_evalute(model,test_data)
train_model_and_save2(dataDF)

------------------------------------------Embedding : tfidf----------------------------------------
lstm
creating: createSequential
creating: createRecurrent
creating: createTanh
creating: createSigmoid
creating: createLSTM
creating: createInferReshape
creating: createSelect
creating: createDropout
creating: createLinear
creating: createMSECriterion
creating: createSeqToTensor
creating: createArrayToTensor
creating: createFeatureLabelPreprocessing
creating: createNNEstimator
creating: createToTuple
creating: createChainedPreprocessing
Save nnmodel successfull!
general : 0.6317204301075269 
price : 0.6693548387096774 
quality : 0.44086021505376344 
service : 0.6778033794162827 
stylefood : 0.8252688172043011 
location : 0.8137480798771122 
background : 0.7258064516129032 
Sum evaluate :  0.6835088874259381
gru
creating: createSequential
creating: createRecurrent
creating: createTanh
creating: createSigmoid
creating: createGRU
creating: createInferReshape
creating: createSelect
creating: