Example of multiple processing and saving files
=========================

In [7]:
import random, os
import multiprocessing

def list_append(count, out_list):
    print os.getpid(), 'is working'
    for i in range(count):
        out_list.append(random.random())
    return out_list

size = 1000000
procs = 3
process_list = []
result = []

for i in range(0, procs):
    out_list = list()
    process = multiprocessing.Process(target=list_append, args=(size, out_list))
    process_list.append(process)
    
for p in process_list:
    p.start()

for p in process_list:
    p.join()
    
print "finished"
                                

4288 is working
4290 is working
4289 is working
finished


In [9]:
import time
from multiprocessing import Pool

def f():
    start = time.time()
    time.sleep(2)
    end = time.time()
    return end - start
p = Pool(processes=1)

result = p.apply(f)
print "first",result


result = p.apply_async(f)
while not result.ready():
    time.sleep(0.5)
    print 'working on else'

print result.get() 

first 2.00100517273
working on else
working on else
working on else
working on else
2.00096201897


The following is the way to create multiple process (resulted a 5x speedup). 
----------------------------
### 1. using map and collect results later ###
### 2. using imap and collect results during the process###
### 3. using async and collect results during the process###
1 and 3 have somehow similar performance



In [19]:
import time
from multiprocessing import Pool,cpu_count

def f(x):
    time.sleep(1)
    return (x**3)**3
fn = '/Users/raybao/Documents/workspace/data_anaysis/temp1.txt'
y = range(int(1e2))

p = Pool(processes=cpu_count()+2)

start = time.time()
results = p.map(f, y)

print "map blocks"

with open(fn, 'w') as ff:
    for item in results:
        ff.write("%f¥n"%item)
end = time.time()
print "time", end-start

map blocks
time 20.0190420151


The following is an example of multiprocessing using imap
--------------------



In [6]:
def f(x):
    time.sleep(10)
    return (x**3)**3
y = range(int(1e1))
fn = '/Users/raybao/Documents/workspace/data_anaysis/temp.txt'
start = time.time()
with open(fn, 'w') as ff:
    for result in p.imap(f, y, chunksize=2):
        ff.write("%f/n"%result)
end = time.time()
print "time", end-start
print "map_async"


time 100.003145933
map_async


An alternative way to do the job
-----------------------

In [15]:
import multiprocessing as mp
import time



fn = '/Users/raybao/Documents/workspace/data_anaysis/temp3.txt'


def worker(arg, q):
    '''stupidly simulates long running process'''
    res = (arg**3)**3
    q.put(res)
    time.sleep(1)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    f = open(fn, 'wb') 
    while 1:
        m = q.get()
        f.write(str(m) + '\n')
        f.flush()
    f.close()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    y = range(int(1e2))
    for i in y:
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()

start = time.time()
main()
print "finished"
print "time is {0}".format(time.time()-start)

finished
time is 20.1237740517
