### Concurrency ###

#### Concurrency with threads ####

In [118]:
%%file twrite.py
import threading
import random
import sys, time

def generate_data(n=1000000):
    return (random.random() for i in range(n))

def cputask():
    s = 0
    for i in range(1000):
        for j in range(10000):
            s += i*j*1.0
    print(s)

def countdown(*args):
    for i in reversed(range(10)):
        time.sleep(0.25)
    print("Finished!")
    
def writedata(filename, data):
    with open(filename, "w") as f:
        for item in data:
            f.write(str(item).zfill(8))
            
def threads_():
    concurrency = int(sys.argv[1])
    taskcount = 10
    filenames = ["base{}".format(i) for i in range(taskcount)]
    data = [generate_data() for i in range(taskcount)]
    argseq = list(zip(filenames, data))
    while argseq:
        tcount = concurrency if len(argseq)>concurrency else len(argseq)
        print("Starting {} threads".format(tcount))
        threads = [threading.Thread(target=writedata, args=argseq.pop()) for i in range(tcount)]
        
        for t in threads:
            t.start()
        for t in threads:
            t.join()
    
    
if __name__ == "__main__":
    threads_()

Overwriting twrite.py


In [119]:
!time -p python twrite.py 1

Starting 1 threads
Starting 1 threads
Starting 1 threads
Starting 1 threads
Starting 1 threads
Starting 1 threads
Starting 1 threads
Starting 1 threads
Starting 1 threads
Starting 1 threads
real 21.95
user 21.75
sys 0.18


In [120]:
!time -p python twrite.py 2

Starting 2 threads
Starting 2 threads
Starting 2 threads
Starting 2 threads
Starting 2 threads
real 22.13
user 21.99
sys 0.37


In [121]:
!time -p python twrite.py 4

Starting 4 threads
Starting 4 threads
Starting 2 threads
real 21.96
user 21.96
sys 0.40


In [8]:
%%file tpwrite.py
from multiprocessing.pool import ThreadPool
import random
import sys, time

def cputask(*args):
    s = 0
    for i in range(1000):
        for j in range(10000):
            s += i*j*1.0
    print(s)
    
def writedata(*args):
    filename = args[0]
    with open(filename, "w") as f:
        for i in range(1000000):
            f.write(str(random.random()).zfill(8))

def countdown(*args):
    for i in reversed(range(10)):
        time.sleep(0.25)
    print("Finished!")
    
def threads():
    concurrency = int(sys.argv[1])
    taskcount = 10
    filenames = ["base{}".format(i) for i in range(taskcount)]
    pool = ThreadPool(concurrency)
    pool.map(countdown, filenames)
    
    
if __name__ == "__main__":
    threads()

Overwriting tpwrite.py


In [94]:
!time -p python tpwrite.py 1

Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
real 25.16
user 0.11
sys 0.01


In [95]:
!time -p python tpwrite.py 2

Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
real 15.14
user 0.10
sys 0.01


In [96]:
!time -p python tpwrite.py 4

Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
Finished!
real 7.70
user 0.13
sys 0.02


In [9]:
%%file pwrite.py
from multiprocessing.pool import Pool
import random
import sys, time

def cputask(*args):
    s = 0
    for i in range(1000):
        for j in range(10000):
            s += i*j*1.0
    print(s)
    
def writedata(*args):
    filename = args[0]
    with open(filename, "w") as f:
        for i in range(1000000):
            f.write(str(random.random()).zfill(8))

        
def countdown(*args):
    for i in reversed(range(10)):
        time.sleep(0.25)
    print("Finished!")
    
def process():
    concurrency = int(sys.argv[1])
    taskcount = 10
    filenames = ["base{}".format(i) for i in range(taskcount)]
    pool = Pool(concurrency)
    pool.map(cputask, filenames)
    
    
if __name__ == "__main__":
    process()

Overwriting pwrite.py


In [12]:
!time -p python pwrite.py 1

24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
real 16.85
user 16.76
sys 0.03


In [13]:
!time -p python pwrite.py 2

24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
real 11.87
user 19.71
sys 0.00


In [14]:
!time -p python pwrite.py 4

24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
24972502500000.0
real 10.47
user 34.05
sys 0.01


### Starting and Stopping Threads ###

In [122]:
import time
class CountdownTask:
    def __init__(self):
        self._running = True

    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)
        print("Stopping, Bye!")


In [123]:
from threading import Thread
c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()

T-minus 10
T-minus 9
T-minus 8


In [124]:
c.terminate()

Stopping, Bye!


In [125]:
from threading import Event, Thread

def task(n, event):
    while (not event.is_set()) and n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)
    print("Stopping, Bye!")
e = Event()
t = Thread(target=task, args=(10,e))
t.start()

T-minus 10
T-minus 9
T-minus 8


In [126]:
e.set()

Stopping, Bye!


### Communicating between threads###

In [127]:
%%file producer-consumer.py
from queue import Queue

from threading import Thread

def producer(out_q):
    while True:
        #Produce some data
        #...
        #...
        out_q.put(data)
        
        
def consumer(in_q):
    #consume data
    while True:
        #get some data
        data = in_q.get()
        #process the data
        #...
        
q = Queue()
t1 = Thread(target=producer, args=(q,))
t2 = Thread(target=consumer, args=(q,))
t1.start()
t2.start()


Overwriting producer-consumer.py
