## Use subprocess to manage child processes
Child processes started by Python are able to run in parallel, enabling you to use Python to consume all of the CPU cores of your machine.

In [1]:
import subprocess

proc = subprocess.Popen(["echo", "Hello from the child!"], stdout=subprocess.PIPE, shell=True)
out, err = proc.communicate()
print(out.decode("utf-8"))

"Hello from the child!"



Child processes can run independently of their parent process, the Python interpreter. Their status can be polled periodically:

In [2]:
import time

proc = subprocess.Popen(["sleep", "0.3"], shell=True)
while proc.poll() is None:
    print("Working...")
    time.sleep(0.05)
    
print("Exit status", proc.poll())

Working...
Working...
Working...
Working...
Working...
('Exit status', 1)


Starting several child processes...

In [8]:
# https://docs.python.org/2/library/subprocess.html
import subprocess
import time

def run_sleep(period):
    proc = subprocess.Popen(["sleep", str(period)], shell=True)
    return proc
    
start = time.time()
procs = []

# start all of the processes
for _ in range(10):
    proc = run_sleep(0.1)
    procs.append(proc)

for proc in procs:
    proc.communicate()

statusList = [proc.poll() for proc in procs]
print(statusList)

end = time.time()
print("Finished in %.3f seconds" % (end - start))

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
Finished in 0.669 seconds


In [17]:
# from https://pymotw.com/2/subprocess/
import subprocess

# Simple command
subprocess.call(["dir"], shell=True) # running on Windows

0

Setting the shell argument to a true value causes subprocess to spawn an intermediate shell process, and tell it to run the command. The default is to run the command directly. Using an intermediate shell means that variables, glob patterns, and other special shell features in the command string are processed before the command is run.

In [18]:
import subprocess

# Command with shell expansion
subprocess.call("echo $HOME", shell=True)

0

In [19]:
# capturing output
import subprocess

output = subprocess.check_output(["dir"], shell=True) # running on Windows
print 'Have %d bytes in output' % len(output)
print output

Have 805 bytes in output
 Volume in drive U is New Volume
 Volume Serial Number is AAD0-7DCA

 Directory of U:\git\Effective-Python

09/05/2018  06:07 PM    <DIR>          .
09/05/2018  06:07 PM    <DIR>          ..
23/04/2018  05:29 PM             1,258 .gitignore
07/05/2018  09:22 AM    <DIR>          .ipynb_checkpoints
23/04/2018  05:16 PM            19,867 1 Pythonic Thinking.ipynb
02/05/2018  07:01 PM            21,706 2 Functions.ipynb
03/05/2018  06:27 PM            26,085 3 Classes and Inheritance.ipynb
07/05/2018  09:20 AM            35,868 4 Metaclasses and Attributes.ipynb
09/05/2018  06:07 PM             6,225 5 Concurrency and Parallelism.ipynb
23/04/2018  05:29 PM               241 README.md
               7 File(s)        115,346 bytes
               3 Dir(s)  315,015,225,344 bytes free



In [21]:
# or running a batch file
import subprocess

filepath="myBatch.bat"
p = subprocess.Popen(filepath, shell=True, stdout = subprocess.PIPE)

stdout, stderr = p.communicate()
print p.returncode # is 0 if success
print stdout

0

[r] U:\git\Effective-Python>echo Message in a batch file! 
Message in a batch file!



## Use Threads for Blocking I/O, Avoid for Parallelism

In [22]:
import time

def factorize(number):
    for i in range(1, number + 1):
        if number % i == 0:
            yield i
            
numbers = [2139079, 1214759, 1516637, 1852285]
start = time.time()
for number in numbers:
    list(factorize(number))
end = time.time()
print("Took %0.3f seconds" % (end - start))

Took 0.575 seconds


Using threading for the same task

In [23]:
import threading

class FactorizeThread(threading.Thread):
    def __init__(self, number):
        super(FactorizeThread, self).__init__()
        self._number = number
        
    def run(self):
        self.factors = list(factorize(self._number))


In [24]:
start = time.time()
threads = []
for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)
    
for thread in threads:
    thread.join()
    
end = time.time()
print("Took %.3f seconds" % (end - start))

Took 0.820 seconds


This is slower because of Python's GIL. Only one thread makes forward progress at a time.

## Use Lock to prevent data races in threads
Here we have two threads both adding to a counter. If each count to 1000 then we might expect the total to be 2000, but this is not always the case.

In [25]:
import threading
import time

class SomeThread(threading.Thread):
    def __init__(self):
        super(SomeThread, self).__init__()
        self.daemon = True
        self.start()
        
    def run(self):
        for i in range(1000):
            global count
            count = count + 1

for i in range(5):
    count = 0
    threads = [SomeThread(), SomeThread()]
    [thread.join() for thread in threads]
    print(count)

2000
2000
2000
1592
2000


Suppose thread 1 references the current value of count as 1500 and then increments it to 1501. However, before thread 1 gets the chance to change count the second thread also references count as 1500 and increments it to 1501. Thread 1 has not finished applying its increment before thread two operates on count and we end up with a global count variable that is incorrect. <br>
<br>
This is where locks come in: Locks are the most fundamental synchronization mechanism provided by the threading module. Locks are typically used to synchronize access to a shared resource. For each shared resource, create a Lock object. When you need to access the resource, call acquire to hold the lock (this will wait for the lock to be released, if necessary), and call release to release it. If a thread attempts to hold a lock that’s already held by some other thread, execution of the first thread is halted until the lock is released. The acquire method takes an optional wait flag, which can be used to avoid blocking if the lock is held by someone else:

In [26]:
import threading
import time

lock = threading.Lock() # create a lock per shared resource

class SomeThread(threading.Thread):
    def __init__(self):
        super(SomeThread, self).__init__()
        self.daemon = True
        self.start()
        
    def run(self):
        for i in range(1000):
            global count
            lock.acquire(True) # acquire the lock with wait for release by other threads = True
            count = count + 1
            lock.release()     # always release the lock so that resources can be accessed by other threads

for i in range(5):
    count = 0
    threads = [SomeThread(), SomeThread()]
    [thread.join() for thread in threads]
    print(count)

2000
2000
2000
2000
2000


When using lock there is the possibility that a thread can block itself from acquiring the lock when calling lock.acquire if it already holds the lock itself:

In [None]:
import threading
import time

lock = threading.Lock()

class SomeThread(threading.Thread):
    def __init__(self):
        super(SomeThread, self).__init__()
        self.daemon = True
        self.start()
        
    def run(self):
        for i in range(1000):
            global count
            lock.acquire(True)
            lock.acquire(True)
            count = count + 1
            lock.release()

count = 0
thread = SomeThread()
thread.join()
print(count)

Notice that the thread gets stuck. The thread attempts to acquire the lock twice with lock.acquire(True). When calling lock.acquire(True) a second time the thread will wait for the lock to be released not realising that the lock has already been acquired and unreleased by itself. To get around this there are reentrant locks (RLock Objects). With RLocks a thread cannot block itself from acquiring the lock:

In [1]:
import threading
import time

lock = threading.RLock()

class SomeThread(threading.Thread):
    def __init__(self):
        super(SomeThread, self).__init__()
        self.daemon = True
        self.start()
        
    def run(self):
        for i in range(1000):
            global count
            lock.acquire(True)
            lock.acquire(True)
            count = count + 1
            lock.release()

count = 0
thread = SomeThread()
thread.join()
print(count)

1000


## Use Queue to coordinate work between threads

A quick intro to deque which provides you with a double ended queue which means that you can append and delete items from either side of the list.

In [1]:
from collections import deque

d = deque()
for i in range(1, 6):
    d.append(str(i))
d

deque(['1', '2', '3', '4', '5'])

In [2]:
item = d.pop() # comes off the right
print(item)
d

5


deque(['1', '2', '3', '4'])

In [3]:
item = d.popleft()
print(item)
d

1


deque(['2', '3', '4'])

In [4]:
d.extend(["7", "8", "9"]) # extends on the right
d

deque(['2', '3', '4', '7', '8', '9'])

In [5]:
d.extendleft(["-3", "-2", "-1"])
d

deque(['-1', '-2', '-3', '2', '3', '4', '7', '8', '9'])

Onto the original example from this book:

In [4]:
import collections
import threading
import time

class MyQueue(object):
    def __init__(self):
        self.items = collections.deque()
        self.lock = threading.Lock()
        
    def put(self, item):
        with self.lock:
            self.items.append(item)     # append item on the right
            
    def get(self):
        with self.lock:
            return self.items.popleft() # pop item from the left

queue = MyQueue()
queue.items

deque([])

In [5]:
queue.put("x")
queue.items

deque(['x'])

In [6]:
queue.get()

'x'

In [7]:
queue.items

deque([])

In [11]:
class Worker(threading.Thread):
    def __init__(self, func, in_queue, out_queue):
        super(Worker, self).__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
        
    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.01) # no work to do
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

In [12]:
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()

 # our actions
def download(x):
    print("downloading", x)
    return(x)
    
def resize(x):
    print("resizing", x)
    return(x)
    
def upload(x):
    print("uploading", x)
    return(x)

threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue)
]

for thread in threads:
    thread.start()


In [13]:
download_queue.put("x")

('downloading', 'x')
('resizing', 'x')
('uploading', 'x')


In [14]:
download_queue.put("y")

('downloading', 'y')
('resizing', 'y')
('uploading', 'y')


There are a few problems with this:
* When some worker functions take longer than others it can create a bottleneck. Jobs may be passed to the worker faster than can be processed and passed on to the next worker. Such a problem will eventually cause the program to crash.
* CPU time is being wasted constantly checking for new input and then sleeping.

The lesson is that it's hard to build a pipeline yourself. The Queue class is designed to solve these problems. Queue eliminates busy wait times by making the get method block until new data is available.

In [17]:
import six
six.PY2, six.PY3

(True, False)

In [1]:
import six
import threading

if six.PY2: # 'Queue' is Python 2 is 'queue' in Python 3
    from Queue import Queue
else: 
    from queue import Queue
    
queue = Queue()

def consumer():
    print('Consumer waiting')
    value = queue.get() # Runs after put() below
    print("Got: ", value)
    print('Consumer done')

thread = threading.Thread(target=consumer)
thread.start()

Consumer waiting


In [2]:
queue.put("x")
thread.join()

('Got: ', 'x')
Consumer done


Queue also allows you to specify the max allowable pending work. Calls to put will block when the queue is full.

In [1]:
import six
import threading
import time
if six.PY2:
    from Queue import Queue
else: 
    from queue import Queue

queue = Queue(1) # Buffer size of 1

def consumer():
    time.sleep(0.1) # Wait
    queue.get()     # Runs second
    print('Consumer got 1')
    queue.get()     # Runs fourth
    print('Consumer got 2')
    
thread = threading.Thread(target=consumer)
thread.start()

In [2]:
queue.put(object()) # Runs first
print('Producer put 1')
queue.put(object()) # Runs third
print('Producer put 2')
thread.join()
print('Producer done')

Producer put 1Consumer got 1

Producer put 2
 Consumer got 2
Producer done


Queue can also track the progress of your work using the task_done method. Here in_queue can't be joined until all of the queued tasks are complete.

In [4]:
in_queue = Queue()

def consumer():
    print('Consumer waiting')
    work = in_queue.get() # Done second
    print('Consumer working')
    # Doing work
    # …
    print('Consumer done')
    in_queue.task_done()  # Done third
    
threading.Thread(target=consumer).start()

Consumer waiting


In [5]:
in_queue.put(object()) # Done first
print('Producer waiting')
in_queue.join()        # Done fourth
print('Producer done')

Producer waitingConsumer working
Consumer done

Producer done


Here, I define a close method that adds a special item to the queue that indicates there will be no more input items after it:

In [6]:
# https://docs.python.org/2/library/queue.html
# task_done(): Indicate that a formerly enqueued task is complete.

class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
        
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return # Cause the thread to exit
                yield item
            finally:
                self.task_done()

The Queue class has all of the facilities you need to build robust pipelines: blocking operations, buffer sizes, and joining.

## Consider coroutines to run many functions concurrently
Coroutines let you have many seemingly simultaneous functions in your Python programs.

In [1]:
def my_coroutine():
    while True:
        received = yield
        print("Received: ", received)

In [3]:
it = my_coroutine()
next(it) # prime the coroutine (required)
it.send("First")

('Received: ', 'First')


In [4]:
it.send("Second")

('Received: ', 'Second')


In [5]:
it.send("Third")

('Received: ', 'Third')


Say you want to implement a generator coroutine that yields the minimum value it's been sent so far. 

In [6]:
def minimize():
    current = yield
    while True:
        value = yield current
        current = min(value, current)

In [8]:
it = minimize()
next(it)
it.send(10)

10

In [9]:
it.send(4)

4

In [10]:
it.send(22)

4

In [11]:
it.send(-1)

-1

Like threads, coroutines are independent functions that can consume inputs from their environment and produce resulting outputs. The difference is that coroutines pause at each yield expression in the generator function and resume after each call to send from the outside. In short, next() is required to start the coroutine which will wait at yield for any values that are sent via it.send()

In [24]:
def func():
    while True:
        got = yield
        print("got", got)

In [25]:
it = func()

In [26]:
it.next() # starts the coroutine

In [27]:
it.next() # sends in None

('got', None)


In [28]:
it.next() # sends in None

('got', None)


In [29]:
it.send("x")

('got', 'x')


In [30]:
it.send("y")

('got', 'y')


In [58]:
def func():
    myList = []
    while True:
        got = yield        # receive a value
        myList.append(got) # append it to our list
        print(myList)

In [59]:
it = func()
it.next()

In [60]:
it.send("x")

['x']


In [61]:
it.send("y")

['x', 'y']


In [62]:
it.send("z")

['x', 'y', 'z']


In [78]:
def func():
    myList = []
    while True:
        got = yield myList # get the next value and give back myList
        myList.append(got)

In [79]:
it = func()
it.next()

[]

In [80]:
it.send("x")

['x']

In [81]:
it.send("y")

['x', 'y']

In [82]:
it.send("y")

['x', 'y', 'y']

## Consider concurrent.futures for True Parallelism