In [9]:
from multiprocessing import Pool
import numpy as np
import time


def f(x, n=100000):
    res = 0.0
    for i in range(n):
         res += sum(x)
    return res        

def g(x, n=1000):
    res = 0.0
    for i in range(n):
         res += np.sin(x)
    return res        


if __name__ == '__main__':
    tic = time.perf_counter()
    pool = Pool(processes=4)              # start 8 worker processes
    result = pool.apply_async(f, [range(10)])    # evaluate "f(10)" asynchronously
    print(result.get(timeout=1))           # prints "100" unless your computer is *very* slow
    n = 100
    print(pool.map(f, [np.linspace(0.0, 10.0, n)]*8))          # prints "[0, 1, 4,..., 81]"
    toc = time.perf_counter()
    print('processing time: {}'.format(toc-tic))

4500000.0
[49999999.999999993, 49999999.999999993, 49999999.999999993, 49999999.999999993, 49999999.999999993, 49999999.999999993, 49999999.999999993, 49999999.999999993]
processing time: 6.565761048999775


In [6]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

    
    


[42, None, 'hello']


In [7]:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()


parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())   # prints "[42, None, 'hello']"
p.join()


[42, None, 'hello']


### multiprocessing subclassing

In [1]:
from multiprocessing import Process, Pipe, Queue
import numpy as np


class NPComm(Process):
    def __init__(self, arr, conn, commands):
        super(NPComm, self).__init__()
        self.arr = arr
        self.conn = conn
        self.commands = commands
        
    def run(self):
        print('running')
        while True:
            comm = self.commands.recv()
            if comm[0] == 'print':
                print(comm[1])
                self.commands.send(['received print command: ', comm[1]])
            if comm[0] == 'printlen':
                print(len(comm[1]))
            if comm[0] == 'send':
                conn.send(comm[1])
                print('sent!')
            if comm[0] == 'recv':
                recv = conn.recv()
                print('received!  len:', len(recv))
            if comm[0] == 'modifyArr':
                print('before: ', comm[1])
                comm[1][-1] = 0
                print('after: ', comm[1])
                self.commands.send(['modified: ', comm[1]])
            if comm[0] == 'quit':
                print('quitting process!')
                return

commPipe0, commPipeResp0 = Pipe()
commPipe1, commPipeResp1 = Pipe()
conn0, conn1 = Pipe()

p1_arr = np.arange(11)

p1 =  NPComm(p1_arr, conn0, commands=commPipeResp0)
p2 =  NPComm(None, conn1, commands=commPipeResp1)

p1.start()
p2.start()
arr_1 = np.ones(10)
arr_2 = np.ones(5)

commPipe0.send(['printlen', arr_1])
commPipe1.send(['printlen', arr_2])

commPipe0.send(['modifyArr', arr_1])
arr_1_new = commPipe0.recv()
print('arr_1_new:', arr_1_new)
print('arr_1    :', arr_1)


print('p1.arr = ', p1.arr)

commPipe0.send(['quit'])
commPipe1.send(['quit'])
                  
p1.join()
p2.join()
                  
            

running
running
5
10
before:  [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]
after:  [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  0.]
quitting process!
quitting process!
arr_1_new: ['modified: ', array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  0.])]
arr_1    : [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]
p1.arr =  [ 0  1  2  3  4  5  6  7  8  9 10]


### threading subclassing

In [51]:
## queue.Queue passes objects by reference
## multiprocessing.Queue passes a copy of the object

from threading import Thread
from queue import Queue
#from multiprocessing import Queue
import numpy as np



class NPComm(Thread):
    def __init__(self, arr, conn, commands):
        super(NPComm, self).__init__()
        ##or: Thread.__init__(self)
        self.arr = arr
        self.conn = conn
        self.commands = commands
        
    def run(self):
        print('running')
        while True:
            comm = self.commands.get()
            if comm[0] == 'print':
                print(comm[1])
            if comm[0] == 'printlen':
                print(len(comm[1]))
            if comm[0] == 'send':
                conn.send(comm[1])
                print('sent!')
            if comm[0] == 'recv':
                recv = conn.recv()
                print('received!  len:', len(recv))
            if comm[0] == 'modifyArr':
                print('before: ', comm[1])
                comm[1][-1] = 0
                print('after: ', comm[1])
            if comm[0] == 'quit':
                print('quitting process!')
                return

commPipe0 = Queue()
commPipe1 = Queue()
conn0, conn1 = Pipe()
                  
p1 =  NPComm(None, conn0, commands=commPipe0)
p2 =  NPComm(None, conn1, commands=commPipe1)

p1.start()
p2.start()
arr_1 = np.ones(10)
arr_2 = np.ones(5)

commPipe0.put(['printlen', arr_1])
commPipe1.put(['printlen', arr_2])

commPipe0.put(['modifyArr', arr_1])

commPipe0.put(['quit'])
commPipe1.put(['quit'])
                  
p1.join()
p2.join()
                  
print('arr_1    :', arr_1)

running
running
10
before:  5
quitting process!
[ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]
after:  [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  0.]
quitting process!
arr_1    : [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  0.]


In [49]:
print('arr_1    :', arr_1)

arr_1    : [ 1.  1.  1.  1.  1.  1.  1.  1.  1.  1.]


In [18]:
from multiprocessing import Process, Pipe
import numpy as np
import time
import os

#print(np.__config__.show())

#os.system("taskset -p 0xff %d" % os.getpid())

tic = time.perf_counter()


class NPTest(Process):
    def __init__(self, rand_size, n):
        super(NPTest, self).__init__()
        self.rand_size = rand_size
        self.daemon = True
        self.n = n
        
    def run(self):
        import numpy as np
        self.arr = np.random.random(self.rand_size)
        for i in range(self.n):
            #a = self.arr.dot(self.arr)
            a = self.arr*self.arr


n_proc = 8
rand_size = (int(2**12), 2**14)


Ps = [None]*n_proc

for i in range(n_proc):
    Ps[i] = NPTest(rand_size, 2)
    Ps[i].start()
    

for i in range(n_proc):
    Ps[i].join()
    
toc = time.perf_counter()
print('processing time: {}'.format(toc-tic))




processing time: 3.767641413000092


In [5]:
import numpy as np

a = np.random.random(10**9)

print(a.dot(a))

333343333.601


In [20]:
import numpy as np
import math

n = (10000,10000)
def g(x):
    y = np.random.random(n)
    y**2
    

from multiprocessing import Pool

n_p = 1
p = Pool(n_p)

tic = time.perf_counter()
p.map(g, range(int(16)))
toc = time.perf_counter()
print('processing time: {}'.format(toc-tic))




processing time: 20.874418754000544


In [13]:
from multiprocessing import Process, Pipe
import numpy as np
import time

a = np.random.random(25000)
b = None

n = 10

p1, p2 = Pipe()


tic = time.perf_counter()
for i in range(n):
    p1.send(a)
    #time.sleep(0.1)
    b = p2.recv()
toc = time.perf_counter()
print('processing time: {}'.format(toc-tic))


tic = time.perf_counter()
for i in range(n):
    p1.send_bytes(a)
    #time.sleep(0.1)
    b = p2.recv_bytes()
    b = np.frombuffer(b)
    b = b.reshape(a.shape)
toc = time.perf_counter()
print('processing time: {}'.format(toc-tic))

#print('a:', a)
#print('b:', b)



processing time: 0.0019614360001014575
processing time: 0.0010588840000309574
