In [4]:
import multiprocessing
import os

In [5]:
multiprocessing.cpu_count()

4

In [6]:
def print_square(n):
    print(os.getpid())
    print("Square:",n*n)

In [7]:
def print_cube(n):
    print(os.getpid())
    print("Cube:",n*n*n)

In [8]:
#Once p1 and p2 are created they can be run only once. For running multiple times, you have to create it again
p1 = multiprocessing.Process(target=print_cube,args=(3,))
p2 = multiprocessing.Process(target=print_square,args=(4,))

In [9]:
p1.start() 
p2.start()
p1.join() #Waits the program until process is completed
p2.join()
print("Done") #This actually means process p1 is complete


#p1.start()
#print("Done") Does not mean that process p1 is completed


4964
Cube: 27
4967
Square: 16
Done


In [10]:
p1.is_alive()

False

In [11]:
result=[]

In [12]:
def squareList(mylist):
    global result
    
    for num in mylist:
        result.append(num*num)
        
    print("Result {}".format(result))#not empty


In [13]:
mylist = [1,2,3,4]

In [14]:
p1 = multiprocessing.Process(target=squareList,args=(mylist,))

In [15]:
p1.start()
p1.join() 

Result [1, 4, 9, 16]


In [16]:
result #Empty as it belongs to parent process and parent and child process have separate memory space so global result of child process is modified

[]

How to solve it

Shared Memory

Multiprocessing module provides array and value objects to share data between processes

i) Array - a ctypes array allocated from shared memory
ii) Value - a ctypes object allocated from shared memory

In [17]:
def squareList(mylist,result,squaresum):
    
    for idx,num in enumerate(mylist):
        result[idx] = num*num
    
    squaresum.value=sum(result)
        
#     print("Result {}".format(result))#not empty

In [18]:
mylist=[1,2,3,4]
result=multiprocessing.Array('i',4) #i=int, 4=size
square_sum = multiprocessing.Value('i')

p1=multiprocessing.Process(target=squareList,args=(mylist,result,square_sum))

In [19]:
p1.start()
p1.join()

In [20]:
#main process vars changed. This is concept of shared memory
print(square_sum.value)
for i in result:
    print(i)

30
1
4
9
16


Method 2

Server Process

In [21]:
def printRecords(records):
    for record in records:
        print("Name: {0}\nScore: {1}\n".format(record[0],record[1]))

def insertRecord(record,records):
    records.append(record)
    print("New record inserted")

In [22]:
with multiprocessing.Manager() as manager:
    records = manager.list([('Sam',10),('Adam',9),('Kevin',9)]) #Shared Data
    new_record = ('Jeff',8)
    
    p1=multiprocessing.Process(target=insertRecord,args=(new_record,records))
    p2=multiprocessing.Process(target=printRecords,args=(records,))
    p1.start()
    p1.join()    
    
    p2.start()
    p2.join()

New record inserted
Name: Sam
Score: 10

Name: Adam
Score: 9

Name: Kevin
Score: 9

Name: Jeff
Score: 8



Communication Between Processes

Effect multiprocessing requires some communication between processes. This can be done by:

1)Queue (A python object can pass through a queue)


In [23]:
def square_list(mylist,q):
    for num in mylist:
        q.put(num*num) #putting messages in queue
        
def print_list(q):
    while not q.empty():
        print(q.get()) #get messages

In [24]:
q = multiprocessing.Queue()

In [25]:
p1 = multiprocessing.Process(target=square_list,args=([1,2,3,4],q))
p2 = multiprocessing.Process(target=print_list,args=(q,))

In [26]:
p1.start()
p2.start()

p1.join()
p2.join()

1
4
9
16


2) Pipes

- Pipes can have only 2 end points
- Queues can have multiple end points
- If you need more than 2 points to communicate use Queue
- If you need performance use pipe as it is faster than queue

In [27]:
msgs=["hey","hello","hru?","end"]



In [28]:
def send_msg(conn,msgs):
    for msg in msgs:
        conn.send(msg)
    
    conn.close()

def rcv_msg(conn):
    while(1):
        msg=conn.recv()
        if msg=="end":
            break
        print(msg)

In [29]:
parent_conn,child_conn=multiprocessing.Pipe()

p1 = multiprocessing.Process(target=send_msg,args=(parent_conn,msgs))
p2 = multiprocessing.Process(target=rcv_msg,args=(child_conn,))

In [30]:
p1.start()
p2.start()
p1.join()
p2.join()

hey
hello
hru?


**Pooling between the processes**

In [33]:
def square(n):
    print(n,":",os.getpid())
    return n*n

mylist=[1,2,3,4,5]
result=[]

for num in mylist:
    result.append(square(num))

1 : 4719
2 : 4719
3 : 4719
4 : 4719
5 : 4719


In [34]:
#How to use pool
p = multiprocessing.Pool()
result = p.map(square,mylist)

2 : 5063
4 : 5064
5 : 5063
3 : 5065
1 : 5062


In [35]:
result

[1, 4, 9, 16, 25]

**Process Syncronization between the processes**

In [94]:
def withdraw(balance,lock):
    for _ in range(1000):
        lock.acquire()
        balance.value=balance.value-1
        lock.release()
        
def deposit(balance,lock):
    for _ in range(1000):
        lock.acquire()
        balance.value=balance.value+1
        lock.release()

In [95]:
def perform_transaction():
    balance = multiprocessing.Value('i',100)

    lock = multiprocessing.Lock()
    p1 = multiprocessing.Process(target=withdraw,args=(balance,lock))
    p2 = multiprocessing.Process(target=deposit,args=(balance,lock))

    p1.start()
    p2.start()
    p1.join()
    p2.join()
    
    print("Balance: {}".format(balance.value))

In [101]:
for _ in range(10):
    perform_transaction()

Balance: 100
Balance: 100
Balance: 100
Balance: 100
Balance: 100
Balance: 100
Balance: 100
Balance: 100
Balance: 100
Balance: 100


**Multithreading**

In [116]:
def print_cube(n):
    print(threading.current_thread().name,os.getpid())
    print("Cube:{}".format(n*n*n))
    
def print_square(n):
    print(threading.current_thread().name,os.getpid())
    print("Square:{}".format(n*n))

In [117]:
import threading

In [118]:
t1 = threading.Thread(target=print_cube,args=(3,),name="t1")
t2 = threading.Thread(target=print_square,args=(4,),name="t2")

t1.start()
t2.start()
t1.join()
t2.join()

t1 4719
Cube:27
t2 4719
Square:16


**Thread Synchronization**

In [129]:
x=0
def increment(lock):
    global x
    lock.acquire()
    x+=1
    lock.release()

In [130]:
def thread_task(lock):
    for _ in range(100000):
        increment(lock)
        
def main_task():
    global x
    x=0
    
    lock=threading.Lock()
    t1=threading.Thread(target=thread_task,args=(lock,))
    t2=threading.Thread(target=thread_task,args=(lock,))
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    

In [131]:
for i in range(10):
    main_task()
    print("Iteration {0}: x:{1}".format(i,x))

Iteration 0: x:200000
Iteration 1: x:200000
Iteration 2: x:200000
Iteration 3: x:200000
Iteration 4: x:200000
Iteration 5: x:200000
Iteration 6: x:200000
Iteration 7: x:200000
Iteration 8: x:200000
Iteration 9: x:200000
