In [6]:
from collections import Counter, OrderedDict
from operator import itemgetter
import tempfile
import numpy as np
from codingChallenge import utils
from threading import Thread
import threading
from multiprocessing import cpu_count


class UniqueWordsCalculator(object):
    """
    Class container for the functions related to coungint the number of unique
    words as Tweets arrive
    """
    def __init__(self, tweet_iterable):
        self.tweet_iterable = tweet_iterable
        # self.input_file_path = input_file_path

    def count_unique(self):
        """
        This is the function documentation
        """
        count_container = Counter()
        for tweet in self.tweet_iterable:
            # Encapsulte tweet in string call and return strip to
            # escape any strange characters
            count_container = count_container + \
                Counter(utils.clean_tweet(tweet).split(" "))

        sorted_count_dictionary = OrderedDict(sorted(count_container.items(),
                                              key=itemgetter(0)))

        # Remove edge cases of blank string or space string
        sorted_count_dictionary.pop(' ', None)
        sorted_count_dictionary.pop('', None)
        return sorted_count_dictionary

    def counter_on_all_words(self):
        with tempfile.TemporaryFile() as tmpfile:
            # Clean each tweet and write it out to the temporary file, with
            # a trailing newline
            for tweet in self.tweet_iterable:
                for word in utils.clean_tweet(tweet).split(" "):
                    tmpfile.write(word + "\n")
            # Make sure that the file is at the beginning and then create a
            # Counter from it to get the unique items
            tmpfile.seek(0)
            count_container = Counter(tmpfile.read().splitlines())

            sorted_count_dictionary = OrderedDict(sorted(count_container.items(),
                                              key=itemgetter(0)))
            return sorted_count_dictionary.items()

    def numpy_count_unique(self):
        tweet_array = np.genfromtxt(self.tweet_iterable, dtype=np.string_,
                                    comments=False, delimiter="\n")
        word_array = np.concatenate([tweet.split() for tweet in tweet_array])
        word_count = Counter(word_array)
        alphabetized_count = OrderedDict(sorted(word_count.items(), key=itemgetter(0)))
        return alphabetized_count.items()

    def distributed_numpy_count_unique(self):
        lock = threading.Lock()
        NUM_CORES = cpu_count()
        global_count_tracker = Counter()
        thread_list = []

        def worker(test, lock, global_counter):
            unique_counts = np.unique(test, return_counts=True)
            unique_dictionary = dict(zip(unique_counts[0], unique_counts[1]))
            with lock:
                global_counter.update(unique_dictionary)

        tweet_array = np.genfromtxt(self.tweet_iterable, comments=False,
                                    dtype=np.string_,
                                    delimiter="\n")
        word_array = np.concatenate([tweet.split() for tweet in tweet_array])

        for number, split in enumerate(np.array_split(word_array, NUM_CORES)):
            t = Thread(target=worker, args=(split, lock, global_count_tracker))
            thread_list.append(t)
            t.start()
        for thread_instance in thread_list:
            thread_instance.join()

        alphabetized_count = OrderedDict(sorted(global_count_tracker.items(),
                                         key=itemgetter(0)))
        return alphabetized_count.items()


    def run(self):
        """
        Create a generic run method on the object to make reimplemntation
        easier. That way code in the Dispatcher doesn't need to be refactored
        """
        # return self.count_unique()
        return self.distributed_numpy_count_unique()



In [58]:
    test_tweets = [
        "is #bigdata finally the answer to end poverty? \
        @lavanyarathnam http://ow.ly/o8gt3 #analytics",
        "interview: xia wang, astrazeneca on #bigdata and the promise of effective \
        healthcare #kdn http://ow.ly/ot2uj",
        "big data is not just for big business. on how #bigdata is being deployed for \
        small businesses: http://bddy.me/1bzukb3 @cxotodayalerts #smb"
    ] * 100000

In [79]:
def numpy_count_unique(input_text):
        tweet_array = np.genfromtxt(input_text, comments=False, dtype=np.string_, delimiter="\n")
        word_array = np.concatenate([tweet.split() for tweet in tweet_array])
        word_count = Counter(word_array)
        alphabetized_count = OrderedDict(sorted(word_count.items(), key=itemgetter(0)))
        return alphabetized_count.items()

In [40]:
z = np.unique(["the", "rabbit", "jumpted", "rabbit"], return_counts=True)
dict(zip(z[0], z[1]))


{'jumpted': 1, 'rabbit': 2, 'the': 1}

In [88]:
from multiprocessing import cpu_count
def distributed_numpy_count_unique(WAT):
    lock = threading.Lock()
    NUM_CORES = cpu_count()
    global_count_tracker = Counter()
    thread_list = []

    def worker(part, lock, global_counter):
        unique_counts = np.unique(part, return_counts=True)
        unique_dictionary = dict(zip(unique_counts[0], unique_counts[1]))
        with lock:
            global_counter.update(unique_dictionary)

#     try:
#         word_array = np.concatenate([tweet.split() for tweet in tweet_array])
#     except:

    for number, split in enumerate(np.array_split(WAT, NUM_CORES)):
        t = Thread(target=worker, args=(split, lock, global_count_tracker))
        thread_list.append(t)
        t.start()
    for thread_instance in thread_list:
        thread_instance.join()

    alphabetized_count = OrderedDict(sorted(global_count_tracker.items(),
                                     key=itemgetter(0)))
    return alphabetized_count.items()

In [89]:
# a = None
# try: 
#     distributed_numpy_count_unique(["this"])
# except Exception, e:
#     a = e
# dir(a)
distributed_numpy_count_unique(["this"])

[('this', 1)]

In [84]:
a = np.genfromtxt("/Users/Jacob/Documents/Projects/DataInsight/codingChallenge/benchmark/data/1k_tweets.txt",
             dtype='|S160',
             delimiter="\n",
            )

def split_string(in_string):
    return in_string.split()
vfunc = np.vectorize(split_string, otypes=[np.string_])


In [113]:
b = Counter(a)

In [231]:
a = np.array([['a','b','c'], ['d','e','f','g']])

In [235]:
print a
print a.ravel()
np.concatenate(a)

[['a', 'b', 'c'] ['d', 'e', 'f', 'g']]
[['a', 'b', 'c'] ['d', 'e', 'f', 'g']]


array(['a', 'b', 'c', 'd', 'e', 'f', 'g'], 
      dtype='|S1')

In [233]:
print a

[['a', 'b', 'c'] ['d', 'e', 'f', 'g']]
