In [1]:
# Pandas implementation
import pandas as pd
import numpy as np
import time
import pickle
from collections import Counter
import itertools
import re
import warnings

# Dask implementation
from numba import njit
from dask import compute, delayed
import dask.dataframe as dd
import datashader as ds
from dask.distributed import Client

# Sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from sklearn.manifold import TSNE
from sklearn.feature_extraction.text import TfidfVectorizer

# Keras
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Activation, Dense, Dropout, Embedding, Flatten, Conv1D, MaxPooling1D, LSTM
from keras import utils
from keras.callbacks import ReduceLROnPlateau, EarlyStopping

# nltk
import nltk
from nltk.corpus import stopwords
from  nltk.stem import SnowballStemmer
import gensim

warnings.filterwarnings('ignore')

Using TensorFlow backend.
  return f(*args, **kwds)


In [2]:
# global variables
DATASET_ENCODING = "ISO-8859-1"
DATASET_COLUMNS = ["target", "ids", "date", "flag", "user", "text"]
TEXT_CLEANING_RE = "@\S+|https?:\S+|http?:\S|[^A-Za-z0-9]+"

# KERAS
SEQUENCE_LENGTH = 300
EPOCHS = 8
BATCH_SIZE = 1024

# SENTIMENT
POSITIVE = "POSITIVE"
NEGATIVE = "NEGATIVE"
NEUTRAL = "NEUTRAL"
SENTIMENT_THRESHOLDS = (0.4, 0.7)

# EXPORT
KERAS_MODEL = "model.h5"
WORD2VEC_MODEL = "model.w2v"
TOKENIZER_MODEL = "tokenizer.pkl"
ENCODER_MODEL = "encoder.pkl"

path = '/Users/chrislouie/Documents/Python/random/training.1600000.processed.noemoticon.csv'

In [3]:
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/chrislouie/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

# Pandas + Word2Vec

### TODO: %%time every cell in our preprocessing pipeline all the way to training


In [4]:
%%time

df = pd.read_csv(path, encoding = DATASET_ENCODING, 
                     names = DATASET_COLUMNS)

CPU times: user 3.29 s, sys: 291 ms, total: 3.58 s
Wall time: 3.59 s


In [5]:
df = pd.read_csv(path, encoding = DATASET_ENCODING, 
                     names = DATASET_COLUMNS)
df.head()

Unnamed: 0,target,ids,date,flag,user,text
0,0,1467810369,Mon Apr 06 22:19:45 PDT 2009,NO_QUERY,_TheSpecialOne_,"@switchfoot http://twitpic.com/2y1zl - Awww, t..."
1,0,1467810672,Mon Apr 06 22:19:49 PDT 2009,NO_QUERY,scotthamilton,is upset that he can't update his Facebook by ...
2,0,1467810917,Mon Apr 06 22:19:53 PDT 2009,NO_QUERY,mattycus,@Kenichan I dived many times for the ball. Man...
3,0,1467811184,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,ElleCTF,my whole body feels itchy and like its on fire
4,0,1467811193,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,Karoli,"@nationwideclass no, it's not behaving at all...."


In [6]:
decode_map = {0: "NEGATIVE", 2: "NEUTRAL", 4: "POSITIVE"}
def decode_sentiment(label):
    return decode_map[int(label)]

In [7]:
%%time
df.target = df.target.apply(lambda x: decode_sentiment(x))

CPU times: user 455 ms, sys: 7.9 ms, total: 463 ms
Wall time: 463 ms


In [8]:
stop_words = stopwords.words('english')
stemmer = SnowballStemmer('english')

In [9]:
def preprocess(text, stem=False):
    # Remove link,user and special characters
    text = re.sub(TEXT_CLEANING_RE, ' ', str(text).lower()).strip()
    tokens = []
    for token in text.split():
        if token not in stop_words:
            if stem:
                tokens.append(stemmer.stem(token))
            else:
                tokens.append(token)
    return " ".join(tokens)

In [10]:
%%time
df.text = df.text.apply(lambda x: preprocess(x))

CPU times: user 42.8 s, sys: 242 ms, total: 43 s
Wall time: 43.1 s


In [11]:
df_train, df_test = train_test_split(df, test_size=0.25, random_state=42)
print("TRAIN size:", len(df_train))
print("TEST size:", len(df_test))

TRAIN size: 1200000
TEST size: 400000


In [12]:
%%time

# word2vec

documents = [_text.split() for _text in df_train.text] 

CPU times: user 2.17 s, sys: 220 ms, total: 2.39 s
Wall time: 2.41 s


In [13]:
documents = [_text.split() for _text in df_train.text]

In [14]:
# Word2Vec vars

W2V_SIZE = 300
W2V_WINDOW = 7
W2V_EPOCH = 32
W2V_MIN_COUNT = 10

w2v_model = gensim.models.word2vec.Word2Vec(size=W2V_SIZE, 
                                            window=W2V_WINDOW, 
                                            min_count=W2V_MIN_COUNT, 
                                            )

In [15]:
w2v_model.build_vocab(documents)

In [16]:
%%time
w2v_model.train(documents, total_examples=len(documents), epochs=W2V_EPOCH)

CPU times: user 12min 12s, sys: 3.34 s, total: 12min 16s
Wall time: 4min 3s


(246357460, 276847840)

In [17]:
w2v_model.most_similar("tesla")

[('logitech', 0.33693981170654297),
 ('scheme', 0.3176518678665161),
 ('ferris', 0.31498610973358154),
 ('steering', 0.2967177927494049),
 ('chevy', 0.2907896637916565),
 ('wineries', 0.2887212336063385),
 ('pci', 0.2836020886898041),
 ('scripting', 0.2831823229789734),
 ('kde', 0.28136351704597473),
 ('vehicles', 0.27894753217697144)]

In [18]:
w2v_model.most_similar("facebook")

[('fb', 0.5913577675819397),
 ('twitter', 0.5785638093948364),
 ('myspace', 0.5402857065200806),
 ('flickr', 0.483107328414917),
 ('profiles', 0.4721948504447937),
 ('lj', 0.462476909160614),
 ('friendster', 0.45860305428504944),
 ('hotmail', 0.45538684725761414),
 ('livejournal', 0.45441320538520813),
 ('orkut', 0.45092564821243286)]

In [19]:
%%time
tokenizer = Tokenizer()
tokenizer.fit_on_texts(df_train.text)

vocab_size = len(tokenizer.word_index) + 1
print("Total words", vocab_size)

Total words 278491
CPU times: user 16.4 s, sys: 118 ms, total: 16.5 s
Wall time: 16.3 s


For some odd reason, if you run the magic function time in the same cell as the assignment of a variable, the variable assignment is not saved to memory...? unless it is and i'm doing something wrong.

In [20]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(df_train.text)

vocab_size = len(tokenizer.word_index) + 1
print("Total words", vocab_size)

Total words 278491


In [21]:
%%time
X_train = pad_sequences(tokenizer.texts_to_sequences(df_train.text), maxlen=SEQUENCE_LENGTH)
X_test = pad_sequences(tokenizer.texts_to_sequences(df_test.text), maxlen=SEQUENCE_LENGTH)

CPU times: user 24.1 s, sys: 720 ms, total: 24.8 s
Wall time: 25.4 s


In [22]:
X_train = pad_sequences(tokenizer.texts_to_sequences(df_train.text), maxlen=SEQUENCE_LENGTH)
X_test = pad_sequences(tokenizer.texts_to_sequences(df_test.text), maxlen=SEQUENCE_LENGTH)

In [23]:
labels = df_train.target.unique().tolist()
labels.append(NEUTRAL)
labels

['NEGATIVE', 'POSITIVE', 'NEUTRAL']

In [24]:
encoder = LabelEncoder()
encoder.fit(df_train.target.tolist())

y_train = encoder.transform(df_train.target.tolist())
y_test = encoder.transform(df_test.target.tolist())

y_train = y_train.reshape(-1,1)
y_test = y_test.reshape(-1,1)

print("y_train",y_train.shape)
print("y_test",y_test.shape)

y_train (1200000, 1)
y_test (400000, 1)


In [25]:
embedding_matrix = np.zeros((vocab_size, W2V_SIZE))

for word, i in tokenizer.word_index.items():
    if word in w2v_model.wv:
        embedding_matrix[i] = w2v_model.wv[word]

print(embedding_matrix.shape)

(278491, 300)


In [26]:
embedding_layer = Embedding(vocab_size, W2V_SIZE, 
                            weights=[embedding_matrix], 
                            input_length=SEQUENCE_LENGTH, 
                            trainable=False)

### At this point the preprocessing is done 
### How do we know preprocessing is ready?
- Train and Test set are encoded and padded
- We have an embedding matrix which is essentially all the vectorized words 
- an embedding layer is created using the embedding matrix as weights

# Dask + Keras

### TODO: Dask pipeline below 

In [27]:
%%time
df = dd.read_csv(path,encoding=DATASET_ENCODING)

CPU times: user 13.8 ms, sys: 12.8 ms, total: 26.6 ms
Wall time: 26.9 ms


Well Dask is much faster than Pandas (but we already knew that)

In [33]:
df = dd.read_csv(path,encoding=DATASET_ENCODING,names=DATASET_COLUMNS)

In [34]:
decode_map = {0: "NEGATIVE", 2: "NEUTRAL", 4: "POSITIVE"}
def decode_sentiment(label):
    return decode_map[int(label)]

In [38]:
%%time
df['target'] = df['target'].apply(lambda x: decode_sentiment(x), meta='dict')

CPU times: user 5.32 ms, sys: 211 µs, total: 5.53 ms
Wall time: 5.43 ms


In [40]:
df.head()

Unnamed: 0,target,ids,date,flag,user,text
0,NEGATIVE,1467810369,Mon Apr 06 22:19:45 PDT 2009,NO_QUERY,_TheSpecialOne_,"@switchfoot http://twitpic.com/2y1zl - Awww, t..."
1,NEGATIVE,1467810672,Mon Apr 06 22:19:49 PDT 2009,NO_QUERY,scotthamilton,is upset that he can't update his Facebook by ...
2,NEGATIVE,1467810917,Mon Apr 06 22:19:53 PDT 2009,NO_QUERY,mattycus,@Kenichan I dived many times for the ball. Man...
3,NEGATIVE,1467811184,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,ElleCTF,my whole body feels itchy and like its on fire
4,NEGATIVE,1467811193,Mon Apr 06 22:19:57 PDT 2009,NO_QUERY,Karoli,"@nationwideclass no, it's not behaving at all...."


In [41]:
stop_words = stopwords.words("english")
stemmer = SnowballStemmer("english")

In [42]:
def preprocess(text, stem=False):
    # Remove link,user and special characters
    text = re.sub(TEXT_CLEANING_RE, ' ', str(text).lower()).strip()
    tokens = []
    for token in text.split():
        if token not in stop_words:
            if stem:
                tokens.append(stemmer.stem(token))
            else:
                tokens.append(token)
    return " ".join(tokens)

In [43]:
%%time
df['text'] = df['text'].apply(lambda x: preprocess(x))

CPU times: user 13.7 ms, sys: 870 µs, total: 14.6 ms
Wall time: 13.9 ms


Well that's settled, 13.9 ms is much much much faster than 43 seconds with Pandas.

But lets finish the preprocessing in Dask

In [52]:
%%time

# not using dask

documents = [_text.split() for _text in df_train.text] 

CPU times: user 2.36 s, sys: 192 ms, total: 2.55 s
Wall time: 2.55 s


In [53]:
w2v_model = gensim.models.word2vec.Word2Vec(size=W2V_SIZE, 
                                            window=W2V_WINDOW, 
                                            min_count=W2V_MIN_COUNT, 
                                            workers=8)

In [54]:
w2v_model.build_vocab(documents)

In [55]:
%%time
w2v_model.train(documents, total_examples=len(documents), epochs=W2V_EPOCH)

CPU times: user 15min 5s, sys: 5.34 s, total: 15min 10s
Wall time: 3min 33s


(246355878, 276847840)

# Conclusion:

- Dask is faster for preprocessing and applying regex changes to data
- Labeled data is obviously easier to work with
- The unsupervised approach to creating labeled data would be something we can tackle
- If only Word2Vec training could be paralellized... maybe it can?

# Scala for Spark

- Going to do the same implementation of preprocessing up until the words2vec part to measure the difference in time from Dask to Spark