In [1]:
from kafka import KafkaConsumer, KafkaProducer
import json
import pandas
import numpy as np
import re 
from pyvi.ViTokenizer import ViTokenizer
import tensorflow as tf
import pickle
from keras.preprocessing import sequence
import requests

In [2]:
#preprocess
STOPWORDS = 'stop_word.txt'

with open(STOPWORDS, "r", encoding="utf-8") as ins:
    stopwords = []
    for line in ins:
        dd = line.strip("\n")
        stopwords.append(dd)
    stopwords = set(stopwords)
#Filter stop words
def filter_stop_words(train_sentences, stopwords):
    new_sent = [word for word in train_sentences.split() if word not in stopwords]
    train_sentences = ' '.join(new_sent)
    return train_sentences

#Delete Emoji
def deEmojify(text):
    emoji_pattern = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                           "]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'',text)

#Preprocess
def preprocess(text, tokenized=True, lowercase=True):
    text = ViTokenizer.tokenize(text) if tokenized else text
    text = filter_stop_words(text, stopwords)
    text = deEmojify(text)
    text = text.lower() if lowercase else text
    return text

#-------Extract Features------------
def pre_process_features(X, tokenized=True, lowercase=True):
    X = [preprocess(str(X), tokenized=tokenized, lowercase=lowercase)]
    for idx, ele in enumerate(X):
        if not ele:
            np.delete(X, idx)
    return X
#-----Make Features---------
def make_features(X, tokenizer, is_one_hot_label=True,sequence_length=100):
    X = tokenizer.texts_to_sequences(X)
    X = sequence.pad_sequences(X, maxlen=sequence_length)
    return X

In [3]:
model = tf.keras.models.load_model('Text_CNN_model_v13.h5')
file = open("tokenizer.pickle","rb")
tokenizer = pickle.load(file)
headers = {"content-type": "application/json"}

In [4]:
consumer = KafkaConsumer(bootstrap_servers="localhost:9092",value_deserializer=lambda x: json.loads(x.decode("ascii")))
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
topic = "web"

In [5]:
consumer.subscribe(topics="streaming")
for message in consumer:
    data = message.value["text"]
    id = message.value["id"]
    preprocess_data = pre_process_features(data,tokenized=True,lowercase=True)
    process_data = make_features(preprocess_data, tokenizer, is_one_hot_label=False)
    json_data = json.dumps({"instances":process_data.tolist()})
    json_response = requests.post("http://localhost:8605/v1/models/twitter_model:predict",data=json_data,headers=headers)
    predictions = json.loads(json_response.text)['predictions']
    result = np.argmax(predictions,axis=-1)
    if(result == 0):
        result_type = "CLEAN"
    elif result ==1:
        result_type = "OFFENSIVE"
    else:
        result_type = "HATE"
    web_data = {"id":id,"text":data,"type":result_type}
    producer.send(topic,web_data)
    print(id,": ",data," ",np.argmax(predictions,axis=-1))

1476903924451856387 :  Xin chào   [0]
1476904100788797449 :  Mày ngu   [0]
1476904165926313985 :  mày ngu   [2]
1476904229335797766 :  mày xấu   [0]
1476904597910212610 :  tệ quá   [0]
1476904675043540993 :  tệ   [0]
1476904776239439875 :  mày ngu mày xấu mày đần   [2]
1476905104099852289 :  @MinhQua12576147 Đồ khùng   [1]
1476905554228379652 :  @MinhQua12576147 Dm   [1]
1476905693886115844 :  @MinhQua12576147 Chào   [0]
1476905869430325251 :  @MinhQua12576147 Kiểm tra   [0]
1476905953492541440 :  @MinhQua12576147 Đồ khùng điên   [2]
1476906026209214465 :  @MinhQua12576147 Kết thúc kiểm tra   [0]


KeyboardInterrupt: 