In [None]:
!wget https://s3.amazonaws.com/fast-ai-nlp/yelp_review_polarity_csv.tgz
!tar zxvf yelp_review_polarity_csv.tgz
!mv yelp_review_polarity_csv/train.csv train.csv

In [1]:
#!/usr/bin/env python3
import random
import copy
import re
import numpy as np
import argparse

np.warnings.filterwarnings('ignore', category=np.VisibleDeprecationWarning)

from numpy import dot
from numpy.linalg import norm
from gensim.models import Word2Vec

import redis
import pickle
import logging

from nltk.corpus import stopwords

from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream import CheckpointingMode
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.common.serialization import Encoder

from utils import process_text_and_generate_tokens, split

from time import time
import pandas as pd



In [2]:
class unsupervised_OSA():

    def __init__(self, collector_size,with_accuracy=True):
        """
        :param with_accuracy: True if labels are provided to the datastream. default value is True
        """
        self.initial_model = None
        self.redis_param = None
        self.start_timer = time()
        # collection
        self.vocabulary = []
        self.true_label = []
        self.collector = []
        self.cleaned_text = []
        self.stop_words = stopwords.words('english')
        self.collector_size = collector_size

        # model pruning
        self.LRU_index = ['good', 'bad']
        # self.max_index = max(self.LRU_index)
        self.LRU_cache_size = 300000
        # self.sno = nltk.stem.SnowballStemmer('english')

        # model merging
        self.flag = True
        self.model_to_train = None
        self.timer = time()
        self.time_to_reset = 30
#         self.time_to_reset = 1

        # similarity-based classification preparation
        self.true_ref_neg = []
        self.true_ref_pos = []
        self.ref_pos = ['love', 'best', 'beautiful', 'great', 'cool', 'awesome', 'wonderful', 'brilliant', 'excellent',
                        'fantastic']
        self.ref_neg = ['bad', 'worst', 'stupid', 'disappointing', 'terrible', 'rubbish', 'boring', 'awful',
                        'unwatchable', 'awkward']
        # self.ref_pos = [self.sno.stem(x) for x in self.ref_pos]
        # self.ref_neg = [self.sno.stem(x) for x in self.ref_neg]

        # temporal trend detection
        self.pos_coefficient = 0.5
        self.neg_coefficient = 0.5

        # results
        self.confidence = 0.5
        # self.acc_to_plot = []
        # self.acc_to_plot = []
        self.predictions = []
        self.labelled_dataset = []
        self.confidence_list = []
        self.with_accuracy = with_accuracy

    def open(self):
        # redis-server parameters
        self.redis_param = redis.StrictRedis(host='localhost', port=6379, db=0)

        # load initial model
        self.initial_model = Word2Vec.load('PLS_c10.model')
        self.vocabulary = list(self.initial_model.wv.index_to_key)

        # save model to redis
        self.save_model(self.initial_model)

    def save_model(self, model):
        self.redis_param = redis.StrictRedis(host='localhost', port=6379, db=0)
        try:
            self.redis_param.set('osamodel', pickle.dumps(model, protocol=pickle.HIGHEST_PROTOCOL))
        except (redis.exceptions.RedisError, TypeError, Exception):
            logging.warning('Unable to save model to Redis server, please check your model')

    def load_model(self):
        self.redis_param = redis.StrictRedis(host='localhost', port=6379, db=0)
        # try:
        called_model = pickle.loads(self.redis_param.get('osamodel'))
        return called_model
        # except TypeError:
        #     logging.info('The model name you entered cannot be found in redis')
        # except (redis.exceptions.RedisError, TypeError, Exception):
        #     logging.warning('Unable to call the model from Redis server, please check your model')

    # tweet preprocessing

    def text_to_word_list(self, text):

        clean_word_list = process_text_and_generate_tokens(text)

        while '' in clean_word_list:
            clean_word_list.remove('')
        self.cleaned_text.append(clean_word_list)
        if len(self.cleaned_text) >= self.collector_size:
            # ans = self.update_model(self.cleaned_text)
            # return ans
            return 'update_model'

    def model_prune(self, model):
        if len(model.wv.index_to_key) <= self.LRU_cache_size:
            return model
        else:
            word_to_prune = list(self.LRU_index[30000:])
            for word in word_to_prune:
                k = model.wv.key_to_index[word]
                del model.wv.index_to_key[k]
                del model.wv.key_to_index[word]
            self.vocabulary = list(model.wv.index_to_key)
            return model

    def get_model_new(self, final_words, final_vectors, final_syn1, final_syn1neg, final_cum_table, corpus_count,
                      final_count, final_sample_int, final_code, final_point, model):

        model_new = copy.deepcopy(model)
        n_words = len(final_words)
        model_new.wv.index_to_key = final_words
        model_new.wv.key_to_index = {word: idx for idx, word in enumerate(final_words)}
        model_new.wv.vectors = final_vectors
        model_new.syn1 = final_syn1  # dk why this is important
        model_new.syn1neg = final_syn1neg
        model_new.syn1 = final_syn1
        model_new.syn1neg = final_syn1neg
        model_new.cum_table = final_cum_table
        model_new.corpus_count = corpus_count
        model_new.corpus_total_words = n_words
        model_new.wv.expandos['count'] = final_count
        model_new.wv.expandos['sample_int'] = final_sample_int
        model_new.wv.expandos['code'] = final_code
        model_new.wv.expandos['point'] = final_point
        return model_new

    def model_merge(self, model1, model2):
        # prediction or accuracy not merging
#         logger.warning('model_merge')
        if model1[0] == 'labelled':
            # logging.warning(model1)
            return model1[0],(model1[1]) + (model2[1])
        elif model1[0] == 'acc':
            return (float(model1[1]) + float(model2[1])) / 2
        # actual merging taking place
        elif model1[0] == 'model':
            model1 = model1[1]
            model2 = model2[1]
            words1 = copy.deepcopy(model1.wv.index_to_key)
            words2 = copy.deepcopy(model2.wv.index_to_key)
            syn1s1 = copy.deepcopy(model1.syn1)
            syn1s2 = copy.deepcopy(model2.syn1)
            syn1negs1 = copy.deepcopy(model1.syn1neg)
            syn1negs2 = copy.deepcopy(model2.syn1neg)
            cum_tables1 = copy.deepcopy(model1.cum_table)
            cum_tables2 = copy.deepcopy(model2.cum_table)
            corpus_count = copy.deepcopy(model1.corpus_count) + copy.deepcopy(model2.corpus_count)
            counts1 = copy.deepcopy(model1.wv.expandos['count'])
            counts2 = copy.deepcopy(model2.wv.expandos['count'])
            sample_ints1 = copy.deepcopy(model1.wv.expandos['sample_int'])
            sample_ints2 = copy.deepcopy(model2.wv.expandos['sample_int'])
            codes1 = copy.deepcopy(model1.wv.expandos['code'])
            codes2 = copy.deepcopy(model2.wv.expandos['code'])
            points1 = copy.deepcopy(model1.wv.expandos['point'])
            points2 = copy.deepcopy(model2.wv.expandos['point'])
            final_words = []
            final_vectors = []
            final_syn1 = []
            final_syn1neg = []
            final_cum_table = []
            final_count = []
            final_sample_int = []
            final_code = []
            final_point = []
            for idx1 in range(len(words1)):
                word = words1[idx1]
                v1 = model1.wv[word]
                syn11 = syn1s1[idx1]
                syn1neg1 = syn1negs1[idx1]
                cum_table1 = cum_tables1[idx1]
                count = counts1[idx1]
                sample_int = sample_ints1[idx1]
                code = codes1[idx1]
                point = points1[idx1]
                try:
                    idx2 = words2.index(word)
                    v2 = model2.wv[word]
                    syn12 = syn1s2[idx2]
                    syn1neg2 = syn1negs2[idx2]
                    cum_table2 = cum_tables2[idx2]
                    v = np.mean(np.array([v1, v2]), axis=0)
                    syn1 = np.mean(np.array([syn11, syn12]), axis=0)
                    syn1neg = np.mean(np.array([syn1neg1, syn1neg2]), axis=0)
                    cum_table = np.mean(np.array([cum_table1, cum_table2]), axis=0)
                except:
                    v = v1
                    syn1 = syn11
                    syn1neg = syn1neg1
                    cum_table = cum_table1
                final_words.append(word)
                final_vectors.append(list(v))
                final_syn1.append(syn1)
                final_syn1neg.append(syn1neg)
                final_cum_table.append(cum_table)
                final_count.append(count)
                final_sample_int.append(sample_int)
                final_code.append(code)
                final_point.append(point)
            for idx2 in range(len(words2)):
                word = words2[idx2]
                if word in final_words:
                    continue
                v2 = model2.wv[word]
                syn12 = syn1s2[idx2]
                syn1neg2 = syn1negs2[idx2]
                cum_table2 = cum_tables2[idx2]
                count = counts2[idx2]
                sample_int = sample_ints2[idx2]
                code = codes2[idx2]
                point = points2[idx2]
                try:
                    idx1 = words1.index(word)
                    v1 = model1.wv[word]
                    syn11 = syn1s1[idx1]
                    syn1neg1 = syn1negs1[idx1]
                    cum_table1 = cum_tables1[idx1]
                    v = np.mean(np.array([v1, v2]), axis=0)
                    syn1 = np.mean(np.array([syn11, syn12]), axis=0)
                    syn1neg = np.mean(np.array([syn1neg1, syn1neg2]), axis=0)
                    cum_table = np.mean(np.array([cum_table1, cum_table2]), axis=0)
                except:
                    v = v2
                    syn1 = syn12
                    syn1neg = syn1neg2
                    cum_table = cum_table2
                final_words.append(word)
                final_vectors.append(list(v))
                final_syn1.append(syn1)
                final_syn1neg.append(syn1neg)
                final_cum_table.append(cum_table)
                final_count.append(count)
                final_sample_int.append(sample_int)
                final_code.append(code)
                final_point.append(point)

            model_new = self.get_model_new(final_words, np.array(final_vectors), np.array(final_syn1),
                                           np.array(final_syn1neg), final_cum_table, corpus_count,
                                           np.array(final_count),
                                           np.array(final_sample_int), np.array(final_code), np.array(final_point),
                                           model1)
            self.save_model(model_new)
            self.flag = True
#             logging.warning("model 1 merge time: " + str(time() - model1[2]))
#             logging.warning("model 2 merge time: " + str(time() - model2[2]))
            return model_new

    def map(self, tweet):
        """
        :param tweet: expects tweet in the format [index,label,string] or [index,string]
        :return: tag,data
        """
        if self.with_accuracy:
            content = tweet[2]
            self.true_label.append(int(tweet[1]))
            self.collector.append((tweet[0], content))
        else:
            content = tweet[1]
            self.collector.append((tweet[0], content))
        tokenize_text_done = self.text_to_word_list(content)
        if tokenize_text_done == 'update_model':
#             logging.warning('in update_model map')
#             logging.warning(self.model_to_train)
            self.update_model(self.cleaned_text)

            classify_result = self.classify_result(self.cleaned_text)
            self.cleaned_text = []
            self.true_label = []

            if time() - self.timer >= self.time_to_reset:  # prune and return model
                self.model_to_train = self.model_prune(self.model_to_train)
                model_to_merge = ('model', self.model_to_train)
                self.timer = time()
                return model_to_merge
            else:
                not_yet = ('labelled', classify_result)
                self.labelled_dataset=[] # flush labelled_dataset here
                return not_yet
        else:
            return 'collecting', '1'

    def incremental_training(self, new_sentences):
        self.model_to_train.build_vocab(new_sentences, update=True)  # 1) update vocabulary
        self.model_to_train.train(new_sentences,  # 2) incremental training
                                  total_examples=self.model_to_train.corpus_count,
                                  epochs=self.model_to_train.epochs)

    def update_LRU_index(self):
        for word in self.model_to_train.wv.index_to_key:
            if word not in self.vocabulary:  # new words
                self.LRU_index.insert(0, word)
            else:  # duplicate words
                self.LRU_index.remove(word)
                self.LRU_index.insert(0, word)
        self.vocabulary = list(self.model_to_train.wv.index_to_key)

    def update_true_ref(self):
        if len(self.ref_neg) > 0:
            for words in self.ref_neg:
                if words in self.model_to_train.wv:
                    self.ref_neg.remove(words)
                    if words not in self.true_ref_neg:
                        self.true_ref_neg.append(words)
        if len(self.ref_pos) > 0:
            for words in self.ref_pos:
                if words in self.model_to_train.wv:
                    self.ref_pos.remove(words)
                    if words not in self.true_ref_pos:
                        self.true_ref_pos.append(words)

    def update_model(self, new_sentences):

        if self.flag:
            self.model_to_train = self.load_model()
            self.flag = False
        # else:
        #     call_model = self.model_to_train

        # incremental learning
        self.incremental_training(new_sentences)
        self.update_LRU_index()
        self.update_true_ref()

    def classify_result(self, tweets):
        for t in range(len(tweets)):
            predict_result = self.predict(tweets[t], self.model_to_train)
            self.confidence_list.append(predict_result[0])

            d = {'neg_coefficient': self.neg_coefficient, 'pos_coefficient': self.pos_coefficient}
            if self.with_accuracy:
                d['true_label'] = self.true_label[t]
            self.labelled_dataset.append([
                self.collector[t][0], predict_result[0], predict_result[1], self.collector[t][1], d])
            self.predictions.append(predict_result[1])

        self.neg_coefficient = self.predictions.count(0) / (self.predictions.count(1) + self.predictions.count(0))
        self.pos_coefficient = 1 - self.neg_coefficient
        self.collector = []
        ans = self.labelled_dataset
        # else:
        #     ans = accuracy_score(self.true_label, self.predictions)
        self.predictions = []
        return ans

    def predict(self, tweet, model):
        sentence = np.zeros(20)
        counter = 0
        cos_sim_bad, cos_sim_good = 0, 0
        for words in tweet:
            try:
                sentence += model.wv[words]  # np.array(list(model.wv[words]) + new_feature)
                counter += 1
            except:
                pass
        if counter != 0:
            sentence_vec = sentence / counter
        k_cur = min(len(self.true_ref_neg), len(self.true_ref_pos))
        for neg_word in self.true_ref_neg[:k_cur]:
            try:
                cos_sim_bad += dot(sentence_vec, model.wv[neg_word]) / (norm(sentence_vec) * norm(model.wv[neg_word]))
            except:
                pass
        for pos_word in self.true_ref_pos[:k_cur]:
            try:
                cos_sim_good += dot(sentence_vec, model.wv[pos_word]) / (norm(sentence_vec) * norm(model.wv[pos_word]))
            except:
                pass
        if cos_sim_bad - cos_sim_good > self.confidence:
            return cos_sim_bad - cos_sim_good, 0
        elif cos_sim_bad - cos_sim_good < -self.confidence:
            return cos_sim_good - cos_sim_bad, 1
        else:
            if cos_sim_bad * self.neg_coefficient >= cos_sim_good * self.pos_coefficient:
                return cos_sim_bad - cos_sim_good, 0
            else:
                return cos_sim_good - cos_sim_bad, 1


# def unsupervised_stream(ds, map_parallelism=1, reduce_parallelism=2):
#     # ds.print()
#     ds = ds.map(unsupervised_OSA()).set_parallelism(map_parallelism)
#     ds = ds.filter(lambda x: x[0] != 'collecting')
#     ds = ds.key_by(lambda x: x[0], key_type=Types.STRING())
#     ds = ds.reduce(lambda x, y: (x[0], unsupervised_OSA().model_merge(x, y))).set_parallelism(reduce_parallelism)
#     ds = ds.filter(lambda x: x[0] != 'model').map(lambda x: x[1])
#     # ds = ds.map(for_output()).set_parallelism(1))
#     ds = ds.flat_map(split)  # always put output_type before passing it to file sink
#     # ds = ds.add_sink(StreamingFileSink  # .set_parallelism(2)
#     #                  .for_row_format('./output', Encoder.simple_string_encoder())
#     #                  .build())
#     return ds




model has to return 

In [3]:
class UO:
    def __init__(self,obj):
        self.object=obj
        self.stream=[]

In [4]:
# the labels of dataset are only used for accuracy computation, since PLStream is unsupervised
f = pd.read_csv('./train.csv')  # , encoding='ISO-8859-1'
f.columns = ["label","review"]

# 20,000 data for quick testing
def dataset_trunc(n,f):
    true_label = list(f.label)[:n]
    for i in range(len(true_label)):
        if true_label[i] == 1:
            true_label[i] = 0
        else:
            true_label[i] = 1
    yelp_review = list(f.review)[:n]
    return true_label,yelp_review


In [None]:
import warnings  
warnings.filterwarnings(action='ignore',category=UserWarning,module='gensim')  
warnings.filterwarnings(action='ignore',category=FutureWarning,module='gensim')
from tqdm import tqdm
data_stream = []
buckets=[]
parallelism = 4
collector_size=2000
n = 560000 
model=None
pseudo_dataset=[]
start_time=time()
assert n >= parallelism*collector_size

true_label,yelp_review= dataset_trunc(n,f)


# unsupervised_OSA
for i in range(parallelism):
    buckets.append(UO(unsupervised_OSA(collector_size)))
    buckets[i].object.open()

k=0
iteration =0
# pbar =tqdm(len(yelp_review))
while k<len(yelp_review):
    # map
    for j in range(len(yelp_review[k:k+collector_size*parallelism])):
        bucket=buckets[j%parallelism]
        bucket.stream.append(bucket.object.map([k,int(true_label[k]),yelp_review[k]]))
        # pbar.update(1)
        k+=1
    # filter
    
    for i in range(parallelism):
        stream=buckets[i].stream
        stream=pd.DataFrame(stream)
        stream=stream[stream[0]!='collecting']
        stream=stream.values.tolist()
        buckets[i].stream=stream
        
        

    # reduce
    if iteration ==0:
        pseudo_dataset=buckets[0].stream[0]
        buckets[0].stream=[]
    else:
        pseudo_dataset=buckets[0].object.model_merge(pseudo_dataset,buckets[0].stream[0])
    model_array=[]
    # label merge
    
    for j in range(0,len(buckets)-1):
        if buckets[j+1].stream[0][0] == 'model':
            model_array.append(buckets[j+1].stream[0])
            buckets[j+1].stream=[]
        else:
            pseudo_dataset=buckets[j].object.model_merge(pseudo_dataset,buckets[j+1].stream[0])
            buckets[j+1].stream=[]
    # model merge
    
    if model_array:
        model =model_array[0][1]
        for i in range(len(model_array)-1):
            model=buckets[0].object.model_merge(('model',model),model_array[i+1])

        for bucket in buckets:
            bucket.object.model_to_train=model 
    iteration += 1
# pbar.close()
pseudo_dataset =pseudo_dataset[1]
print(time()-start_time)


In [None]:
print(len(pseudo_dataset))

In [None]:
print((buckets[3].stream[4]))

In [None]:
buckets[0].object

In [279]:
n,parallelism,collector_size

(7, 4, 2)

In [282]:
assert 7 >= parallelism+collector_size

In [95]:
for i in range(parallelism):
    stream=buckets[i].stream
    stream=pd.DataFrame(stream)
    stream=stream[stream[0]!='collecting']
    buckets[i].stream=stream


In [169]:
buckets[3].stream

[]

In [148]:
dataset[0]

'labelled'

In [98]:
buckets[0].stream.shape[1]

2

In [99]:
len(buckets)

4

In [100]:
for i in range(buckets[0].stream.shape[0]): # rows
    for j in range(0,len(buckets),2):# parallelism
        nm=buckets[j].object.model_merge(buckets[j].stream.iloc[i],buckets[j+1].stream.iloc[i])

In [101]:
buckets[0].stream.shape[0]

0

In [None]:
ats=buckets[0].object.model_to_train.__dir__()

In [None]:
m=buckets[0].object.model_to_train

In [None]:
m.negative

In [None]:
m.wv.expandos.keys()

In [None]:
m.wv.vocab

In [None]:
m.train_count

In [None]:
m.corpus_count

In [None]:
m.wv.expandos

In [None]:
interested=ats[18:]

In [None]:
interested=interested[:6]

In [None]:
interested