# Parallel Processing

$ ipcluster start --n=4 --profile='movie-view'

In [1]:
from ipyparallel import Client
rc = Client(profile='movie-view')
dview = rc[:] # use all engines

In [2]:
# Load the punkt tokenizer
import nltk

tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
    
import pandas as pd

header = ["sentiment","id","time","query","handler","tweet"]

# Read data from files 
manual = pd.read_csv("./data/twitter_manual.csv", header=None,encoding = "ISO-8859-1")
full = pd.read_csv("./data/twitter_full.csv", header=None,encoding = "ISO-8859-1")

manual.columns = header
full.columns = header

In [3]:
# Split list into training and testing
import numpy as np

msk = np.random.rand(len(full)) < 0.15
train = full[msk]
rest = full[~msk]

msk2 = np.random.rand(len(rest)) < 0.01
test = rest[msk2]
rest = rest[~msk2]

print ("Train - %d; Test - %d; Rest - %d" % (len(train), len(test), len(rest)))

Train - 239466; Test - 13687; Rest - 1346847


In [4]:
from ipyparallel import require

@require('re')
def process_tweet( tweet , punctuation=False):
    
    print(tweet)
    #
    # 1. Remove @username 
    tweet = re.sub('@[^\s]+','',tweet)
    #
    # 2. Remove www.* or https?://*
    tweet = re.sub('((www\.[\s]+)|(https?:/?/?[^\s]+))','',tweet)
    #
    # 3. Remove 'RT'
    tweet = tweet.replace('RT','')
    #
    # 4. Hashtag remove
    tweet = tweet.replace('#','')
    #
    # 5. Remove commas, ?s, !s, and periods.   
    if punctuation:
        tweet = tweet.replace('.','')
        tweet = tweet.replace(',','')
        tweet = tweet.replace('?','')
        tweet = tweet.replace('!','')
    #
    # 6. Convert to lower case, split into individual words
    words = tweet.lower().split()    
    #
    # 7. Join the words back into one string separated by space, 
    # and return the result.
    return( words)   

In [5]:
@require('numpy')
def create_bag_of_centroids( wordlist ):
    #
    # The number of clusters is equal to the highest cluster index
    # in the word / centroid map
    num_centroids = max( word_centroid_map.values() ) + 1
    #
    # Pre-allocate the bag of centroids vector (for speed)
    bag_of_centroids = numpy.zeros( num_centroids, dtype="float32" )
    #
    # Loop over the words in the review. If the word is in the vocabulary,
    # find which cluster it belongs to, and increment that cluster count 
    # by one
    for word in wordlist:
        if word in word_centroid_map:
            index = word_centroid_map[word]
            bag_of_centroids[index] += 1
    #
    # Return the "bag of centroids"
    return bag_of_centroids

In [6]:
import _pickle as pickle

def save_obj(obj, name ):
    with open('./output/'+ name + '.pkl', 'wb') as f:
        pickle.dump(obj, f)

def load_obj(name ):
    with open('./output/'+ name + '.pkl', 'rb') as f:
        return pickle.load(f)
word_centroid_map = load_obj("twitter_word_centroid_map") 
dview.push(dict(word_centroid_map=word_centroid_map)) # send bar

<AsyncResult: _push>

In [7]:
# Pre-allocate an array for the training set bags of centroids (for speed)
print("Starting")
train_centroids = np.zeros( (train["tweet"].size, 1837), \
    dtype="float32" )
print("half done")
list_of_true = [True] * train["tweet"].size

Starting
half done


In [8]:
%%time
print("starting")
half  = dview.map_sync(process_tweet, train["tweet"].tolist(), list_of_true )

print("half done")
train_centroids  = dview.map_sync(create_bag_of_centroids, half )

starting
half done
CPU times: user 5.78 s, sys: 2.47 s, total: 8.25 s
Wall time: 2h 14min 50s


In [None]:
%%time
from sklearn.ensemble import RandomForestClassifier

# Fit a random forest and extract predictions 
forest = RandomForestClassifier(n_estimators = 200, n_jobs=-1)

# Fitting the forest may take a few minutes
print ("Fitting a random forest to labeled training data...")
forest = forest.fit(train_centroids,train["sentiment"])

print ("Saveing Forest as file...")
save_obj(forest,"twitter_forest_parallel")

Fitting a random forest to labeled training data...
