In [1]:
import pandas as pd
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [2]:
file_path =   '/DATA/arabic_training/pg_new_train.xlsx'
data = pd.read_excel(file_path, encoding='utf-8').astype(str)


In [3]:
# Modify these variables according to your needs.
application_name = "Play with Sentiments"

master = "local[*]"
num_processes = 3
num_executors = 1

# This variable is derived from the number of cores and executors,
# and will be used to assign the number of model trainers.
num_workers = num_executors * num_processes

print("Number of desired executors: " + str(num_executors))
print("Number of desired processes / executor: " + str(num_processes))
print("Total number of workers: " + str(num_workers))

Number of desired executors: 1
Number of desired processes / executor: 3
Total number of workers: 3


In [4]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", num_processes)
conf.set("spark.executor.instances", num_executors)
conf.set("spark.executor.memory", "5g")
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.max", "2000")
conf.set("spark.executor.heartbeatInterval", "6000s")
conf.set("spark.network.timeout", "10000000s")
conf.set("spark.shuffle.spill", "true")
conf.set("spark.driver.memory", "10g")
conf.set("spark.driver.maxResultSize", "10g")

reader = SparkSession.builder.config(conf=conf) \
                     .appName(application_name) \
                     .getOrCreate()


In [5]:
raw = reader.createDataFrame(data)
raw.show(10)
print(raw.count())

+--------------------+---------+
|             Message|Sentiment|
+--------------------+---------+
|-----------------...|  Neutral|
|....tend to use M...| Negative|
|"1 Billion CFU fr...|  Neutral|
|"Combining vitami...| Positive|
|"Fasting glucose ...|  Neutral|
|"I brought Fiber ...| Negative|
|"no shoes, no bra...| Positive|
|"One time I was h...|  Neutral|
|"Prayer will neve...| Negative|
|"Researchers may ...|  Neutral|
+--------------------+---------+
only showing top 10 rows

148544


In [6]:
import re 
def regex_or(*items):
    return '(?:' + '|'.join(items) + ')'

contractions = re.compile("(?i)(\w+)(n[''′]t|[''′]ve|[''′]ll|[''′]d|[''′]re|[''′]s|[''′]m)$", re.UNICODE)
whitespace = re.compile("[\s\u0020\u00a0\u1680\u180e\u202f\u205f\u3000\u2000-\u200a]+", re.UNICODE)
punct_chars = r"['\"""''.?!…,:;]"
# punct_seq = punct_chars + "+"    # 'anthem'. => ' anthem '.
punct_seq = r"['\"""'']+|[.?!,…]+|[:;]+"  # 'anthem'. => ' anthem ' .
entity = r"[&<>\"]"
url_start_1 = r"(?:https?://|\bwww\.)"
commonTLDs = r"(?:com|org|edu|gov|net|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|pro|tel|travel|xxx)"
cc_tlds = \
    r"(?:ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|" \
    r"bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|" \
    r"er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|" \
    r"hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|" \
    r"lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|" \
    r"nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|sk|" \
    r"sl|sm|sn|so|sr|ss|st|su|sv|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|" \
    r"va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|za|zm|zw)"
url_start_2 = \
    r"\b(?:[A-Za-z\d-])+(?:\.[A-Za-z0-9]+){0,3}\." + regex_or(commonTLDs, cc_tlds) + r"(?:\." + cc_tlds + r")?(?=\W|$)"
url_body = r"(?:[^\.\s<>][^\s<>]*?)?"
url_extra_crap_before_end = regex_or(punct_chars, entity) + "+?"
url_end = r"(?:\.\.+|[<>]|\s|$)"
URL_REGEX = regex_or(url_start_1, url_start_2) + url_body + "(?=(?:" + url_extra_crap_before_end + ")?" + url_end + ")"
USER_ID_REGEX = "@\S+"
PUNCTUATION_REGEX="[@\:;`~=\+&^%,\'\"#\{\}\(\)\[\]\-\*/|]"
HTML_REGEX= "<.*?>"
SPACE_REGEX=" +"

In [7]:
replace_with_space = regex_or(URL_REGEX,USER_ID_REGEX,PUNCTUATION_REGEX)


In [8]:
from pyspark.sql.functions import regexp_replace,col,lower

raw = raw.withColumn("Message", regexp_replace(col("Message"), replace_with_space, " "))
raw.show(10)
raw = raw.withColumn("Message", regexp_replace(col("Message"), SPACE_REGEX, " "))

raw = raw.withColumn("Sentiment", lower(col("Sentiment")));
raw.show(10)

processed = raw


+--------------------+---------+
|             Message|Sentiment|
+--------------------+---------+
|                 ...|  Neutral|
|....tend to use M...| Negative|
| 1 Billion CFU fr...|  Neutral|
| Combining vitami...| Positive|
| Fasting glucose ...|  Neutral|
| I brought Fiber ...| Negative|
| no shoes  no bra...| Positive|
| One time I was h...|  Neutral|
| Prayer will neve...| Negative|
| Researchers may ...|  Neutral|
+--------------------+---------+
only showing top 10 rows

+--------------------+---------+
|             Message|Sentiment|
+--------------------+---------+
| Amino acid and p...|  neutral|
|....tend to use M...| negative|
| 1 Billion CFU fr...|  neutral|
| Combining vitami...| positive|
| Fasting glucose ...|  neutral|
| I brought Fiber ...| negative|
| no shoes no bra ...| positive|
| One time I was h...|  neutral|
| Prayer will neve...| negative|
| Researchers may ...|  neutral|
+--------------------+---------+
only showing top 10 rows



In [9]:
from pyspark.ml.feature import StringIndexer,IndexToString,OneHotEncoderEstimator,VectorIndexer,Word2Vec,Tokenizer

string_indexer = StringIndexer().setInputCol("Sentiment").setOutputCol("Index")
indexed = string_indexer.fit(processed).transform(processed)
encoder = OneHotEncoderEstimator(inputCols=["Index"],
                                 outputCols=["Label"])
encoded = encoder.fit(indexed).transform(indexed).drop("Index")
encoded.show()

tokenizer = Tokenizer(inputCol="Message", outputCol="Words")
wordsData = tokenizer.transform(encoded)
word2Vec = Word2Vec(vectorSize=300, minCount=0, inputCol="Words", outputCol="result")
model = word2Vec.fit(wordsData)

result = model.transform(wordsData).drop("Words")
result.show()
# indexer_string = IndexToString().setInputCol("Label").setOutputCol("originalCategory")
# unindexed = indexer_string.transform(indexed_df)

# unindexed.show()

+--------------------+---------+-------------+
|             Message|Sentiment|        Label|
+--------------------+---------+-------------+
| Amino acid and p...|  neutral|(2,[1],[1.0])|
|....tend to use M...| negative|    (2,[],[])|
| 1 Billion CFU fr...|  neutral|(2,[1],[1.0])|
| Combining vitami...| positive|(2,[0],[1.0])|
| Fasting glucose ...|  neutral|(2,[1],[1.0])|
| I brought Fiber ...| negative|    (2,[],[])|
| no shoes no bra ...| positive|(2,[0],[1.0])|
| One time I was h...|  neutral|(2,[1],[1.0])|
| Prayer will neve...| negative|    (2,[],[])|
| Researchers may ...|  neutral|(2,[1],[1.0])|
| The other thing ...|  neutral|(2,[1],[1.0])|
| The other thing ...|  neutral|(2,[1],[1.0])|
| There are other ...|  neutral|(2,[1],[1.0])|
| Vitamin C antibi...|  neutral|(2,[1],[1.0])|
| Vitamin C and an...| positive|(2,[0],[1.0])|
| Wayne Boatwright...| negative|    (2,[],[])|
|“I’m not lazy and...| negative|    (2,[],[])|
|» Cucumber Garlic...|  neutral|(2,[1],[1.0])|
| Medical New

In [48]:
from keras.layers import Input, Reshape, LSTM, Activation, SpatialDropout1D, initializers
from keras.layers.embeddings import Embedding
from keras.layers.recurrent import GRU
from keras.metrics import categorical_accuracy
from keras.models import Model

# import tensorflow as tf
# from keras.engine.topology import merge
from keras.layers.wrappers import Bidirectional, TimeDistributed
from keras.layers.core import Dropout, Dense, Lambda, Masking
from keras.layers import merge
from keras.engine.topology import Layer, InputSpec
from keras import constraints, regularizers
import os
from keras import backend as K
from keras.regularizers import l1_l2
from keras.optimizers import Adam, Nadam, SGD, RMSprop

# class AttentionLayer(Layer):
#     '''
#     Attention layer.
#     '''

#     def __init__(self, init='glorot_uniform', **kwargs):
#         super(AttentionLayer, self).__init__(**kwargs)
#         self.supports_masking = True
#         self.init = initializers.get(init)

#     def build(self, input_shape):
#         input_dim = input_shape[-1]
#         self.Uw = self.init((input_dim,))
#         self.trainable_weights = [self.Uw]
#         super(AttentionLayer, self).build(input_shape)

#     def compute_mask(self, input, mask):
#         return mask

#     def call(self, x, mask=None):
#         print(self.Uw.shape)
#         print(list(range(K.ndim(self.Uw))))
#         multData = K.exp((K.dot(x, self.Uw)))
#         if mask is not None:
#             multData = mask * multData
#         output = multData / (K.sum(multData, axis=1) + K.epsilon())[:, None]
#         return K.reshape(output, (output.shape[0], output.shape[1], 1))

#     def get_output_shape_for(self, input_shape):
#         newShape = list(input_shape)
#         newShape[-1] = 1
#         return tuple(newShape)

#     def compute_output_shape(self, input_shape):
#         return get_output_shape_for(self,input_shape)

class AttentionLayer(Layer):
    '''
    Attention layer. 
    Usage:
        lstm_layer = LSTM(dim, return_sequences=True)
        attention = AttentionLayer()(lstm_layer)
        sentenceEmb = merge([lstm_layer, attention], mode=lambda x:x[1]*x[0], output_shape=lambda x:x[0])
        sentenceEmb = Lambda(lambda x:K.sum(x, axis=1), output_shape=lambda x:(x[0],x[2]))(sentenceEmb)
    '''
    def __init__(self, init='glorot_uniform', kernel_regularizer=None, bias_regularizer=None, kernel_constraint=None, bias_constraint=None,  **kwargs):
        self.supports_masking = True
        self.init = initializers.get(init)
        self.kernel_initializer = initializers.get('glorot_uniform')

        self.kernel_regularizer = regularizers.get(kernel_regularizer)
        self.bias_regularizer = regularizers.get(bias_regularizer)

        self.kernel_constraint = constraints.get(kernel_constraint)
        self.bias_constraint = constraints.get(bias_constraint)

        super(AttentionLayer, self).__init__(**kwargs)
        
    def build(self, input_shape):
        self.kernel = self.add_weight((input_shape[-1], 1),
                                 initializer=self.kernel_initializer,
                                 name='{}_W'.format(self.name),
                                 regularizer=self.kernel_regularizer,
                                 constraint=self.kernel_constraint)
        self.built = True
    
    def compute_mask(self, input, mask):
        return mask
    
    def call(self, x, mask=None):
        multData =  K.exp(K.dot(x, self.kernel))
        if mask is not None:
            mask = K.cast(mask, K.floatx())
            mask = K.expand_dims(mask)
            multData = mask*multData

        output = multData/(K.sum(multData, axis=1)+K.epsilon())[:,None]
        return output

    def compute_output_shape(self, input_shape):
        newShape = list(input_shape)
        newShape[-1] = 1
        return tuple(newShape)


In [49]:
def createHierarchicalAttentionModel(maxSeq,
                                     embWeights=None, embeddingSize=None, vocabSize=1000,  # embedding
                                     recursiveClass=GRU, wordRnnSize=100, sentenceRnnSize=100,  # rnn
                                     # wordDenseSize = 100, sentenceHiddenSize = 128, #dense
                                     dropWordEmb=0.5, dropWordRnnOut=0.5, dropSentenceRnnOut=0.5, nb_classes=3):
    wordsInputs = Input(shape=(maxSeq,), name='words_input')
    if embWeights is None:
        emb = Embedding(vocabSize, 300,mask_zero=True,embeddings_initializer='glorot_uniform')(wordsInputs)
    else:
        emb = Embedding(embWeights.shape[0], embWeights.shape[1],mask_zero=True,  weights=[embWeights])(wordsInputs)
    if dropWordEmb != 0.0:
        emb = Dropout(dropWordEmb)(emb)
    reg=l1_l2(l2=0.001)
    wordRnn = Bidirectional(recursiveClass(wordRnnSize, return_sequences=True), merge_mode='concat')(emb)

    if dropWordRnnOut > 0.0:
        wordRnn = Dropout(dropWordRnnOut, )(wordRnn)
    attention = AttentionLayer()(wordRnn)
    sentenceEmb = merge([wordRnn, attention], mode=lambda x: x[1] * x[0], output_shape=lambda x: x[0])
    sentenceEmb = Lambda(lambda x: K.sum(x, axis=1), output_shape=lambda x: (x[0], x[2]))(sentenceEmb)
    modelSentence = Model(wordsInputs, sentenceEmb)
    modelSentAttention = Model(wordsInputs, attention)

    documentInputs = Input(shape=(None, maxSeq), name='document_input')
    sentenceMasking = Masking(mask_value=0)(documentInputs)
    sentenceEmbbeding = TimeDistributed(modelSentence)(sentenceMasking)
    sentenceAttention = TimeDistributed(modelSentAttention)(sentenceMasking)
    sentenceRnn = Bidirectional(recursiveClass(wordRnnSize, return_sequences=True), merge_mode='concat')(
        sentenceEmbbeding)
    if dropSentenceRnnOut > 0.0:
        sentenceRnn = Dropout(dropSentenceRnnOut)(sentenceRnn)
    attentionSent = AttentionLayer()(sentenceRnn)
    documentEmb = merge([sentenceRnn, attentionSent], mode=lambda x: x[1] * x[0], output_shape=lambda x: x[0])
    documentEmb = Lambda(lambda x: K.sum(x, axis=1), output_shape=lambda x: (x[0], x[2]), name="att2")(documentEmb)
    documentOut = Dense(nb_classes, activation="softmax", name="documentOut")(documentEmb)

    
    model = Model(input=[documentInputs], output=[documentOut])
    adam=SGD(momentum=0.9)
    model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])

    return model

In [50]:
keras_model =createHierarchicalAttentionModel(300, embWeights=None, nb_classes=3,
                                         vocabSize=model.getVectors().count())

  name=name)


In [63]:
import sys
sys.path.append('/DATA/experiments/scatter/src')


In [67]:
from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *
optimizer_mlp = 'adam'
loss_mlp = 'categorical_crossentropy'
trainer = DOWNPOUR(keras_model=keras_model, worker_optimizer=optimizer_mlp, loss=loss_mlp, num_workers=num_workers,
                   batch_size=4, communication_window=5, num_epoch=1,
                   features_col="result", label_col="Label")
trained_model = trainer.train(result)

  str(node.arguments) + '. They will not be included '
  str(node.arguments) + '. They will not be included '


ValueError: Unknown layer: AttentionLayer

In [112]:
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = reader.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [-0.059452523291111,-0.04019885137677193,0.03383271899074316]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [0.07344705558248928,0.027889671070235114,-0.007856659591197968]

Text: [Logistic, regression, models, are, neat] => 
Vector: [-0.012440059613436461,0.012617905344814063,0.03669551573693752]

