In [1]:
import numpy as np
import functools
import time

### simple example

In [2]:
def find_longest_str(list_of_str):
    longest = None
    longest_len = 0
    for i in list_of_str:
        if len(i)>longest_len:
            longest_len=len(i)
            longest = i
    return longest

list_of_str = ['python','stone','map','reduce','peter','cold']
%time max_length = print(find_longest_str(list_of_str))

python
Wall time: 1 ms


### scale up to 100000 elements

In [3]:
%time max_length = print(find_longest_str(list_of_str*100000))

python
Wall time: 49 ms


### Optimize using map

In [8]:
length_list = [len(s) for s in list_of_str]
maps = zip(list_of_str,length_list)
max_len = max(maps,key=lambda x: x[1])
max_len

('python', 6)

### split into map and reduce

###### step 1: map your list into a list of tuples entailing str and its length (or other numbers) using the mapper function
###### step 2: use hte reducer function, goes over the tuples from strp one and applies it one by one, to aggregate the results from mapper step

In [13]:
import functools
mapper = len
#reducer orperating on each element of mapped result
def reducer(p,c):
    if p[1]>c[1]:
        return p
    return c

%time
mapped = map(mapper,list_of_str)
mapped = zip(list_of_str,mapped)
reduced = functools.reduce(reducer, mapped)
print(reduced)

Wall time: 0 ns
('reduce', 6)


### Break into chunks before parallelization

In [20]:
def chunkify(seq, num):
    avg = len(seq) / float(num)
    out = []
    last = 0.0

    while last < len(seq):
        out.append(seq[int(last):int(last + avg)])
        last += avg

    return out


In [26]:
data_chunks = chunkify(list_of_str, num=2)
#step 1:
from functools import reduce
reduced_all = []
for chunk in data_chunks:
    mapped_chunk = map(mapper, chunk)
    mapped_chunk = zip(chunk, mapped_chunk)
    
    reduced_chunk = functools.reduce(reducer, mapped_chunk)
    reduced_all.append(reduced_chunk)
    
#step 2:
reduced = reduce(reducer, reduced_all)
print(reduced)


('reduce', 6)


#### parallel

In [27]:
def chunks_mapper(chunk):
    mapped_chunk = map(mapper,chunk)
    mapped_chunk = zip(chunk,mapped_chunk)
    return reduce(reducer,mapped_chunk)
data_chunks = chunkify(list_of_str*1000,num=30)
mapped = map(chunks_mapper,data_chunks)
reduced = reduce(reducer,mapped)
print(reduced)

('reduce', 6)


#### parallel --> multiprocessing

In [None]:
#from multiprocessing import Pool
#mapper = len
#pool = Pool(8)
#data_chunks = chunkify(list_of_str*1000, num = 8)
#mapped = pool.map(chunks_mapper,data_chunks)
#reduced = reduce(reducer, mapped)
#print(reduced)
#print(mapped)

## word count

In [7]:
from sklearn.datasets import fetch_20newsgroups
data = 'pythoh is a nice program, I like it '

For each text in the dataset, we want to tokenize it, clean it, remove stop words and finally count the words:

In [14]:
from collections import Counter
import re
def clean_word(word):
    return re.sub(r'[^\w\s]','',word).lower()

def word_not_in_stopwords(word):
    return word not in ['a'] and word and word.isalpha()

def find_top_words(data):
    cnt=Counter()
    for text in data:
        tokens_in_text = text.split()
        tokens_in_text = map(clean_word, tokens_in_text)
        tokens_in_text = filter(word_not_in_stopwords,tokens_in_text)
        cnt.update(tokens_in_text)
    return cnt.most_common(10)

In [15]:
%time find_top_words(data)

Wall time: 0 ns


[('i', 5),
 ('p', 2),
 ('t', 2),
 ('h', 2),
 ('o', 2),
 ('e', 2),
 ('r', 2),
 ('y', 1),
 ('s', 1),
 ('n', 1)]

In [16]:
def mapper(text):
    tokens_in_text = text.split()
    tokens_in_text = map(clean_word, tokens_in_text)
    tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
    return Counter(tokens_in_text)
def reducer(cnt1, cnt2):
    cnt1.update(cnt2)
    return cnt1
def chunk_mapper(chunk):
    mapped = map(mapper, chunk)
    reduced = reduce(reducer, mapped)
    return reduced