In [1]:
#import packages
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import itertools
import re
import os
from optparse import OptionParser
from bigdl.dataset import news20
from bigdl.util.common import Sample
import time
import pandas as pd
import datetime as dt
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
import seaborn as sn
import pandas as pd
import random as rd
import datetime as dt
%matplotlib inline

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col, udf
from pyspark.ml import  Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

from bigdl.dataset.transformer import *
from bigdl.dataset.base import *
from bigdl.nn.layer import *
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
from bigdl.util.common import *
from pyspark.sql.functions import col



Prepending /home/kunal/anaconda3/envs/deep_learning/lib/python3.6/site-packages/bigdl/share/conf/spark-bigdl.conf to sys.path


In [2]:
#Setting the path for Spark
os.environ['JAVA_HOME']="/usr/lib/jvm/java-8-oracle"
os.environ['SPARK_HOME'] = "/home/kunal/Downloads/spark-2.4.7-bin-hadoop2.7"
os.environ['HADOOP_HOME'] = "/usr/local/hadoop/bin"
os.environ['YARN_CONF_DIR'] = "/usr/local/hadoop/etc/hadoop"
os.environ['BIGDL_HOME'] = "/home/kunal/anaconda3/envs/deep_learning/lib/python3.6/site-packages/bigdl/share/bin"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/kunal/Downloads/jarfiles/bigdl-SPARK_2.2-0.7.0-jar-with-dependencies.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.7,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.7,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 pyspark-shell'

In [3]:
#Create Spark Session
sc=SparkContext.getOrCreate(conf=create_spark_conf().setMaster("local[4]").set("spark.driver.memory","8g"))
sc.setLogLevel("ERROR")
init_engine()

In [4]:
#Method to get train data
def get_train_data(train_file):
    texts = []
    nlines = 0
    with open(train_file) as f:
        for line in f:
            text_label = re.split(r'\t+', line)
            text_label[0] = int(text_label[0])+1
            texts.append((text_label[1],text_label[0]))
            nlines = nlines+1
    return (texts,nlines)

In [5]:
#Method to get test data
def get_test_data(test_file):
    texts = []
    nlines = 0
    with open(test_file) as f:
        for line in f:
            texts.append((line,1))
    return texts

In [6]:
#Method to convert text to words
def text_to_words(review_text):
    letters_only = re.sub("[^a-zA-Z]", " ", review_text)
    words = letters_only.lower().split()
    return words

In [7]:
# calculate the frequency of words in each text corpus, 
# sort by frequency (max to min)
# and assign an id to each word
def analyze_texts(data_rdd):
    def index(w_c_i):
        ((w, c), i) = w_c_i
        return (w, (i + 1, c))
    return data_rdd.flatMap(lambda text_label: text_to_words(text_label[0])) \
        .map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) \
        .sortBy(lambda w_c: - w_c[1]).zipWithIndex() \
        .map(lambda w_c_i: index(w_c_i)).collect()  #return a list

In [8]:
#Method for padding
# pad([1, 2, 3, 4, 5], 0, 6)
def pad(l, fill_value, width):
    if len(l) >= width:
        return l[0: width]
    else:
        l.extend([fill_value] * (width - len(l)))
        return l

In [9]:
#Convert to vector
def to_vec(token, b_w2v, embedding_dim):
    if token in b_w2v:
        return b_w2v[token]
    else:
        return pad([], 0, embedding_dim)

In [10]:
#Convert to Sample for BigDL prediction
def to_sample(vectors, label, embedding_dim):
    # flatten nested list
    flatten_features = list(itertools.chain(*vectors))
    # a row for each word vector
    features = np.array(flatten_features, dtype='float').reshape([sequence_len, embedding_dim])
    features = features.transpose(1, 0)
    return Sample.from_ndarray(features, np.array(label))

In [11]:
#Method to build Convolution neural network
def build_cnn(class_num, input_dim, hidden_dim):
    #each row is an input vector for the RNN
    model = Sequential()
    model.add(Reshape([input_dim, 1, sequence_len]))
    model.add(SpatialConvolution(input_dim, hidden_dim, 5, 1))
    model.add(ReLU())
    model.add(SpatialMaxPooling(5, 1, 5, 1))
    model.add(SpatialConvolution(hidden_dim, hidden_dim, 5, 1))
    model.add(ReLU())
    model.add(SpatialMaxPooling(5, 1, 5, 1))
    model.add(Reshape([hidden_dim]))
    model.add(Linear(hidden_dim, 100))
    model.add(Linear(100, class_num))
    model.add(LogSoftMax())
    return model

In [12]:
#Method to process text
def preprocess_texts(data_rdd, sequence_len, max_words, embedding_dim, streaming):
    #get list of (word, (index,freq)) representing the training set
    word_to_ic = analyze_texts(data_rdd)

    #at most max_words words among the most frequent
    if streaming:
        word_to_ic = dict(word_to_ic)
    else:
        word_to_ic = dict(word_to_ic[10: max_words])
    
    bword_to_ic = sc.broadcast(word_to_ic)

    #prepare and broadcast word embeddings filtered through word_to_ic 
    w2v = news20.get_glove_w2v(dim=embedding_dim)
    filtered_w2v = {w: v for w, v in w2v.items() if w in word_to_ic}
    bfiltered_w2v = sc.broadcast(filtered_w2v)

    #get a list of words for each line + label in data_rdd
    tokens_rdd = data_rdd.map(lambda text_label:
                              ([w for w in text_to_words(text_label[0]) if
                                w in bword_to_ic.value], text_label[1]))

    #pad lists of words to sequence_len size + label
    padded_tokens_rdd = tokens_rdd.map( lambda tokens_label: 
                                        (pad(tokens_label[0], "##", sequence_len), tokens_label[1]))

    #get vectors from words + label
    vector_rdd = padded_tokens_rdd.map(lambda tokens_label:
                                       ([to_vec(w, bfiltered_w2v.value,
                                                embedding_dim) for w in
                                         tokens_label[0]], tokens_label[1]))

    #get matrix sample composed by word vectors for each text
    sample_rdd = vector_rdd.map(
        lambda vectors_label: to_sample(vectors_label[0], vectors_label[1], embedding_dim))

    return sample_rdd

In [13]:
#Method to Map, classify and show
def map_predict_label(l):
    return np.array(l).argmax()

def classify_stream(rdd_test, train_model):
    if not(rdd_test.isEmpty()):
        #probability vectors, one for each input
        predictions = train_model.predict(rdd_test).collect()
        #get max probability indices
        y_pred = np.array([ map_predict_label(s) for s in predictions])
        for y in y_pred:
            if y==0: 
                print('NEGATIVE\n')
            else:
                print('POSITIVE\n')

def show(rdd_test):
    for elem in rdd_test.collect():
        print(elem[0].encode('utf-8'),end='')

In [14]:
#Parameters
embedding_dim = 50
model_path = "model_path"
app_name = "stream_classifier"
sequence_len = 50   #number of words for each text
max_words = 1000  

In [15]:
#train
batch_size = 128
max_epoch = 100
#p = float(options.p)
checkpoint_path = "checkpoint"
logdir = "log"
train_file = "./data/train.txt"
training_split = 0.8
class_num = 2 

print('loading training data...')
#get training data
(train_data,nlines) = get_train_data(train_file)
#train_data = get_data('./acllmdb','train')
print('train data loaded!')
print('number of lines for training set: ' + str(nlines))

loading training data...
train data loaded!
number of lines for training set: 7086


In [16]:
#Train and Optimize data
print('Processing text dataset')
#generate RDD and sample RDD for the optimizer
train_data_rdd = sc.parallelize(train_data, 2)

Processing text dataset


In [17]:
#take top 10
train_data_rdd.take(10)

[('The Da Vinci Code book is just awesome.\n', 2),
 ("this was the first clive cussler i've ever read, but even books like Relic, and Da Vinci code were more plausible than this.\n",
  2),
 ('i liked the Da Vinci Code a lot.\n', 2),
 ('i liked the Da Vinci Code a lot.\n', 2),
 ("I liked the Da Vinci Code but it ultimatly didn't seem to hold it's own.\n",
  2),
 ("that's not even an exaggeration ) and at midnight we went to Wal-Mart to buy the Da Vinci Code, which is amazing of course.\n",
  2),
 ('I loved the Da Vinci Code, but now I want something better and different!..\n',
  2),
 ('i thought da vinci code was great, same with kite runner.\n', 2),
 ('The Da Vinci Code is actually a good movie...\n', 2),
 ('I thought the Da Vinci Code was a pretty good book.\n', 2)]

In [18]:
sample_rdd = preprocess_texts(train_data_rdd, sequence_len, max_words, embedding_dim, False)

In [19]:
#split into training and validation set
train_rdd, val_rdd = sample_rdd.randomSplit([training_split, 1-training_split])

optimizer = Optimizer(
    model=build_cnn(class_num, input_dim=embedding_dim, hidden_dim=128),
    training_rdd=train_rdd,
    criterion=ClassNLLCriterion(),
    end_trigger=MaxEpoch(max_epoch),
    batch_size=batch_size,
    optim_method=Adagrad(learningrate=0.01, learningrate_decay=0.0002))

optimizer.set_validation(
    batch_size=batch_size,
    val_rdd=val_rdd,
    trigger=EveryEpoch(),
    val_method=[Top1Accuracy()]
)

creating: createSequential
creating: createReshape
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialMaxPooling
creating: createSpatialConvolution
creating: createReLU
creating: createSpatialMaxPooling
creating: createReshape
creating: createLinear
creating: createLinear
creating: createLogSoftMax
creating: createClassNLLCriterion
creating: createMaxEpoch
creating: createAdagrad
creating: createDistriOptimizer
creating: createEveryEpoch
creating: createTop1Accuracy




In [20]:
#set checkpoint path for model
optimizer.set_checkpoint(EveryEpoch(), checkpoint_path)
#set train and val log for tensorboard
train_summary = TrainSummary(log_dir=logdir, app_name=app_name)
val_summary = ValidationSummary(log_dir=logdir, app_name=app_name)
optimizer.set_train_summary(train_summary)
optimizer.set_val_summary(val_summary)

train_model=optimizer.optimize()
#sc.stop()

creating: createEveryEpoch
creating: createTrainSummary
creating: createValidationSummary


In [21]:
test_data = get_test_data("test.txt")

In [22]:
test_data_rdd = sc.parallelize(test_data, 2)

In [23]:
sample_rdd = preprocess_texts(test_data_rdd, sequence_len, max_words, embedding_dim, False)

In [24]:
#get the trained model from the model path
#train_model = Model.load(model_path)
predictions = train_model.predict(sample_rdd).take(50)

In [25]:
#get max probability indices
y_pred = np.array([ map_predict_label(s) for s in predictions])
i = 0
for y in y_pred:
    print(test_data[i][0], end=' ')
    if y==0: 
        print('(NEGATIVE)')
    else:
        print('(POSITIVE)')
    print('\n')
    i = i+1

" I don't care what anyone says, I like Hillary Clinton.
 (POSITIVE)


how i destroy the earth...
 (POSITIVE)


have an awesome time at purdue!..
 (POSITIVE)


Yep, I'm still in London, which is pretty awesome: P Remind me to post the million and one pictures that I took when I get back to Markham!...
 (POSITIVE)


Have to say, I hate Paris Hilton's behavior but I do think she's kinda cute..
 (NEGATIVE)


i will love the lakers.
 (POSITIVE)


deepak jha is good boy.
 (POSITIVE)


Kunal Bhashkar is Happy.
 (NEGATIVE)


Kunal Bhashkar is good boy.
 (POSITIVE)


deepak jha is bad boy.
 (POSITIVE)


I'm so glad I love Paris Hilton, too, or this would be excruciating.
 (NEGATIVE)


considering most Geico commericals are stupid...
 (NEGATIVE)


i liked MIT though, esp their little info book(
 (POSITIVE)


Before I left Missouri, I thought London was going to be so good and cool and fun and a really great experience and I was really excited.
 (POSITIVE)


I still like Tom Cruise.
 (POSITIVE)


### Prediction from Kafka Spark Structured Streaming

In [26]:
#import packages
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

In [27]:
#creating Streaming context
ssc = StreamingContext(sc, 5)
topic = "sentiment"  #kafka topic
zkQuorum = 'localhost:2181' #zk server

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/kunal/Downloads/jarfiles/spark-streaming-kafka-0-8-assembly_2.11-2.4.7.jar --packages org.apache.spark:spark-streaming-kafka-0-8:2.4.7 pyspark-shell'
#get the trained model from the model path
#train_model = Model.load(model_path)

#generate and handle stream
kafkastream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})

In [28]:
#Read and predict data from Kafka streaming
ks = kafkastream.map(lambda ks: (ks[1],0)) #get lines with fake classes
ks.foreachRDD(show) #show lines
featstream = ks.transform(lambda rdd: 
                            preprocess_texts(rdd, sequence_len, max_words, embedding_dim, True)) #get w2v
featstream.foreachRDD(lambda fs: classify_stream(fs,train_model)) #classify stream

ssc.start()
ssc.awaitTermination()

b'" I don\'t care what anyone says, I like Hillary Clinton.\n'POSITIVE

b'how i destroy the earth...\n'POSITIVE

b'have an awesome time at purdue!..\n'POSITIVE

b"Yep, I'm still in London, which is pretty awesome: P Remind me to post the million and one pictures that I took when I get back to Markham!...\n"POSITIVE

b"Have to say, I hate Paris Hilton's behavior but I do think she's kinda cute..\n"NEGATIVE

b'i will love the lakers.\n'POSITIVE

b'deepak jha is good boy.\n'POSITIVE

b'Kunal Bhashkar is Happy.\n'POSITIVE

b'Kunal Bhashkar is good boy.\n'POSITIVE

b'deepak jha is bad boy.\n'NEGATIVE

b"I'm so glad I love Paris Hilton, too, or this would be excruciating.\n"POSITIVE

b'considering most Geico commericals are stupid...\n'

KeyboardInterrupt: 