<hr style="border-width:4px; border-color:coral"/>

# How many processes can I access? 

<hr style="border-width:4px; border-color:coral"/>

Before we start talking about "multiprocessing", we need to understand how many processes we have available on the machine we are working on.  

The command line gives us a few different ways to count the number of CPUs we have. 

In OSX : 

    $ sysctl -a | grep machdep.cpu.core_count
    machdep.cpu.core_count: 10
    
In Linux : 

    $ more /proc/cpu_info | grep processor
    processor	: 0
    processor	: 1
    processor	: 2
    processor	: 3

Let's do this from within the Jupyter Notebook : 

In [None]:
'''
%%bash

sysctl -a

# sysctl -a | grep grep machdep.cpu.core_count

'''

Alternatively, we can run these by piping to a Python subprocess : 

In [2]:
import subprocess
import shlex

In [3]:
cmd = 'sysctl -a'
cmd_args = shlex.split(cmd)
sysctl_out = subprocess.run(cmd_args,capture_output=True,text=True)
print(sysctl_out.stdout,end='')

user.cs_path: /usr/bin:/bin:/usr/sbin:/sbin
user.bc_base_max: 99
user.bc_dim_max: 2048
user.bc_scale_max: 99
user.bc_string_max: 1000
user.coll_weights_max: 2
user.expr_nest_max: 32
user.line_max: 2048
user.re_dup_max: 255
user.posix2_version: 200112
user.posix2_c_bind: 0
user.posix2_c_dev: 0
user.posix2_char_term: 0
user.posix2_fort_dev: 0
user.posix2_fort_run: 0
user.posix2_localedef: 0
user.posix2_sw_dev: 0
user.posix2_upe: 0
user.stream_max: 20
user.tzname_max: 255
kern.ostype: Darwin
kern.osrelease: 21.6.0
kern.osrevision: 199506
kern.version: Darwin Kernel Version 21.6.0: Mon Aug 22 20:17:10 PDT 2022; root:xnu-8020.140.49~2/RELEASE_X86_64
kern.maxvnodes: 263168
kern.maxproc: 4176
kern.maxfiles: 122880
kern.argmax: 1048576
kern.securelevel: 0
kern.hostname: GRAD-PHDCOMP-02
kern.hostid: 0
kern.clockrate: { hz = 100, tick = 10000, tickadj = 0, profhz = 100, stathz = 100 }
kern.posix1version: 200112
kern.ngroups: 16
kern.job_control: 1
kern.saved_ids: 1
kern.boottime: { sec = 1671768

Let's pipe the command through `grep` so that we get something more readable.

In [4]:
cmd = 'grep machdep.cpu.core_count'
cmd_args = shlex.split(cmd)
count_out = subprocess.run(args=cmd_args,capture_output=True,text=True,\
                            input=sysctl_out.stdout)
print(count_out.stdout,end='')

machdep.cpu.core_count: 6


For quick checks like this, however, we can resort to the `shell` version of these commands. Note that for this example, though, you will still want to set `capture_output` and `text` to `True`.

In [5]:
cmd = 'sysctl -a | grep machdep.cpu.core_count'
count_out = subprocess.run(cmd,capture_output=True,text=True,shell=True)
print(count_out.stdout,end='')              

machdep.cpu.core_count: 6


<hr style="border-width:4px; border-color:coral"/>

## Using the Python multiprocessing module

<hr style="border-width:4px; border-color:coral"/>


Another way to count the number of processors available that does not depend on knowing archane commands for  the underlying shell is to use the Python `multiprocessing` module.  We will talk more about this in a subsequent notebook, but for now, let's use 

An excellent tutorial on details of the multiprocessing module can be found [here](https://superfastpython.com/multiprocessing-in-python/). 

In [4]:
import multiprocessing as mp
mp.set_start_method('fork')

RuntimeError: context has already been set

In [5]:
print("Number of processors available : {:d}".format(mp.cpu_count()))

Number of processors available : 12


### Example

In [6]:
def sayhello():
    print("Hello!")
    
job = mp.Process(target=sayhello)
job.start()

print("All done ")

All done 


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/anaconda3/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/anaconda3/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'sayhello' on <module '__main__' (built-in)>


Notice that the 'All Done' appears before the "Hello!" greeting.   Why? 

Let's force the process to wait before printing 'All Done'. 

In [7]:
def sayhello():
    print("Hello!")
    
job = mp.Process(target=sayhello)
job.start()

# Wait for job to finish
job.join()

print("All done ")

All done 


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/anaconda3/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/anaconda3/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'sayhello' on <module '__main__' (built-in)>


Now let's see if we can get some information from the current process. 

In [10]:
def sayhello():
    id = mp.current_process().name
    print("Hello from {:s}".format(id))
    
job = mp.Process(target=sayhello)
job.start()

# Wait for job to finish
job.join()

print("All done ")

Hello from Process-3
All done 


The process name is just a name that the multiprocessing module uses.   The numbers are simply the numbers assigned by the module.  You will notce that if you restart the notebook (use the "kernel" menu item), your processes numbers will restart at 0. 

We can launch multiple processes using a loop. 

In [11]:
def sayhello():
    pname = mp.current_process().name
    print(f"Hello from {pname}")
    
 
nprocs = 10
jobs = []
for i in range(nprocs):
    p = mp.Process(target=sayhello)
    p.start()
    jobs.append(p)   # List of jobs

for j in jobs:
    j.join()     # Wait for each job to join 
        
print("All done ")

Hello from Process-4
Hello from Process-5
Hello from Process-6
Hello from Process-7
Hello from Process-8
Hello from Process-9
Hello from Process-10
Hello from Process-11
Hello from Process-12Hello from Process-13

All done 


In [12]:
import time
def sayhello():
    id = mp.current_process().name
    print("Hello from {:s}\n".format(id))
    time.sleep(1)
    print("Done with job {:s}\n".format(id))

procs = 5
jobs = [0]*procs
for i in range(procs):
    job = mp.Process(target=sayhello)
    jobs[i] = job   # List of jobs
    
for job in jobs:
    job.start()
    
for job in jobs:
    job.join()     # Wait for each job to join 
        
print("All done ")

Hello from Process-14

Hello from Process-15

Hello from Process-16

Hello from Process-17

Hello from Process-18

Done with job Process-14

Done with job Process-15

Done with job Process-16

Done with job Process-17

Done with job Process-18

All done 


<hr style="border-width:4px; border-color:coral"/>

## Using Pipes

<hr style="border-width:4px; border-color:coral"/>

We can communicate between processes using "pipes".  A pipe is exactly as it sounds - a two way "communication channel" with ends that can send and receive messages. 

     conn1, conn2 = mp.Pipe()
          

In [13]:
conn1, conn2 = mp.Pipe()

conn1.send("Hello!")
msg = conn2.recv()

print(msg)

Hello!


In [14]:
conn1, conn2 = mp.Pipe() # we can send anything through the pipe
x=5
conn1.send(x)
msg = conn2.recv()

print(msg) # y=x

5


In [15]:
conn1,conn2=mp.Pipe()
msg="Hello"
conn1.send(msg)
msg=conn2.recv()

conn2,conn3=mp.Pipe()
conn2.send(msg)
msg=conn3.recv()

print(msg)

Hello


In [16]:
# two pipes with a link

In [17]:
def link(conn_rev,conn_send):  #link b/w pipes
    pname = mp.current_process().name
    print(f"Message received by {pname}\n")
    msg=conn_rev.recv()

    
    print(f"Message sent {msg}\n")
    conn_send.send(msg)
    
pipe_01=mp.Pipe() #pipe_01[0],#pipe_01[1]
pipe_02=mp.Pipe() #pipe_02[0],#pipe_02[1]

msg="Hello"
pipe_01[0].send(msg)

job=mp.Process(target=link,name="Link",args=(pipe_01[1],pipe_02[0]))

job.start()


#msg=pipe_02[1].recv()

#print(msg)

job.join()
msg=pipe_02[1].recv()
print("All done")
print(msg)

Message received by Link

Message sent Hello

All done
Hello


In [18]:
pipe_01

(<multiprocessing.connection.Connection at 0x7fb5263f80a0>,
 <multiprocessing.connection.Connection at 0x7fb5263f8130>)

### Exercise : Create a chain of pipes

Chain together several pipes and send a message down the chain!

Create "processes" to chain the pipes together. 

In [19]:
def link(con1,con2):  
    con2[0].send(con1)
    print(con2[1].recv())
    print(f"link received, {con1}")

def connect(Plist):
    base=Plist[0][1].recv()
    print(base)
    n=len(Plist)
    for i in range(1,n):
        cont=link(base,Plist[i])
        print(cont)
        base=Plist[i][1].recv()+' stanley'
        print(base)
    #print(Plist[-1][1].recv())
  
    #return Plist[-1]
    

In [20]:
pipe1=mp.Pipe() #pipe_01[0],#pipe_01[1]
pipe2=mp.Pipe() #pipe_02[0],#pipe_02[1]
pipe3=mp.Pipe()
P=[pipe1,pipe2,pipe3]
msg="Hello"
(P[0])[0].send(msg)

job=mp.Process(target=connect,name="Link",args=(P,))

job.start()
job.join()

Hello
Hello
link received, Hello
None


Process Link:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/7g/rflphmn571x5sx7zf1_vvyfmxz1tft/T/ipykernel_62780/1058404779.py", line 13, in connect
    base=Plist[i][1].recv()+' stanley'
  File "/opt/anaconda3/lib/python3.9/multiprocessing/connection.py", line 255, in recv
    buf = self._recv_bytes()
  File "/opt/anaconda3/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
    buf = self._recv(4)
  File "/opt/anaconda3/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
#pipe3[1].recv()

In [None]:
import multiprocessing, time

def consumer(pipe,id):
    output_p, input_p = pipe
    input_p.close()                    
    while True:
        try:
            item = output_p.recv()
        except EOFError:
            break
        print("%s consume：%s" % (id,item))
        #time.sleep(3)      # if no sleep  these code will fault in Linux environment
                            # but windows environment is well
    print('Consumer done')

def producer(sequence, input_p):
    for item in sequence:
        print('produce：',item)
        input_p.send(item) 
        time.sleep(1)

if __name__ == '__main__':
    (output_p, input_p) = multiprocessing.Pipe()

    # create two consumer process
    cons_p1 = multiprocessing.Process(target=consumer,args=((output_p,input_p),1)) 
    cons_p1.start() 
    cons_p2 = multiprocessing.Process(target=consumer,args=((output_p,input_p),2))
    cons_p2.start() 

    output_p.close()

    sequence = [i for i in range(10)]
    producer(sequence, input_p)
    input_p.close()

    cons_p1.join()
    cons_p2.join()

In [None]:
import multiprocessing, time

def consumer(pipe,id):
    output_p, input_p = pipe
    input_p.close()                    
    while True:
        try:
            item = output_p.recv()
        except EOFError:
            break
        print("%s consume：%s" % (id,item))
        #time.sleep(3)      # if no sleep  these code will fault in Linux environment
                            # but windows environment is well
    print('Consumer done')


In [None]:
from multiprocessing import Process, JoinableQueue
from queue import Empty
import time


def consumer(que, pid):
    while True:
        try:
            item = que.get(timeout=10)
            print("%s consume:%s" % (pid, item))
            que.task_done()
        except Empty:
            break
    print('Consumer done')


def producer(sequence, que):
    for item in sequence:
        print('produce:', item)
        que.put(item) 
        time.sleep(1)

if __name__ == '__main__':
    que = JoinableQueue()

    # create two consumer process
    cons_p1 = Process(target=consumer, args=(que, 1)) 
    cons_p1.start() 
    cons_p2 = Process(target=consumer, args=(que, 2))
    cons_p2.start() 

    sequence = [i for i in range(10)]
    producer(sequence, que)
    que.join()
    cons_p1.join()
    cons_p2.join()

In [24]:
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Pipe
 
# generate work
def sender(connection):
    print('Sender: Running', flush=True)
    # generate work
    for i in range(10):
        # generate a value
        value = random()
        # block
        sleep(value)
        # send data
        connection.send(value)
    # all done
    connection.send(None)
    print('Sender: Done', flush=True)
 
# consume work
def receiver(connection):
    print('Receiver: Running', flush=True)
    # consume work
    while True:
        # get a unit of work
        item = connection.recv()
        # report
        print(f'>receiver got {item}', flush=True)
        # check for stop
        if item is None:
            break
    # all done
    print('Receiver: Done', flush=True)
 
# entry point
if __name__ == '__main__':
    # create the pipe
    conn1, conn2 = Pipe()
    # start the sender
    sender_process = Process(target=sender, args=(conn2,))
    sender_process.start()
   
    # start the receiver
    receiver_process = Process(target=receiver, args=(conn1,))
    receiver_process.start()
    # wait for all processes to finish
    
    receiver_process.join()
    sender_process.join()

Sender: Running
Receiver: Running
>receiver got 0.007228846782026843
>receiver got 0.3979744477229661
>receiver got 0.2673475799559428
>receiver got 0.7531867245206879
>receiver got 0.7481411820521626
>receiver got 0.3201620048954821
>receiver got 0.053091232782789155
>receiver got 0.6808073319750755
>receiver got 0.11299787390586946
>receiver got 0.14505643922307376Sender: Done

>receiver got None
Receiver: Done


In [30]:
con1,con2=mp.Pipe()

In [31]:
con2.send('hee')

In [32]:
con1.recv()

'hee'