___
<h1> Machine Learning </h1>
<h2> M. Sc. in Electrical and Computer Engineering </h2>
<h3> Instituto Superior de Engenharia / Universidade do Algarve </h3>

[LESTI](https://ise.ualg.pt/curso/1941) / [ISE](https://ise.ualg.pt) / [UAlg](https://www.ualg.pt)

Pedro J. S. Cardoso (pcardoso@ualg.pt)

___

# Threads (Optional)

- The main idea is to more than one thing at a time (open to discussion!)
- Interests to programmers writing code for running on big iron, but also of interest for users of multicore PCs, e.g.:  
    - A network server that communicates with several hundred clients all connected at once
    - A big number crunching job that spreads its work across multiple CPUs

In [None]:
import time
import threading

class CountdownThread(threading.Thread):    # inherit from Thread
    def __init__(self, n):
        threading.Thread.__init__(self)
        self.n = n

    def run(self):                          # redefine run()
        while self.n > 0:
            print(self.n)
            self.n -= 1
            time.sleep(.5)

CountdownThread(50).start()       # executes until run() stops
CountdownThread(50).start()   

In [None]:
import time
import threading

class CountDownThread(threading.Thread):    # inherit from Thread

    def __init__(self, n, who_am_i=None):
        threading.Thread.__init__(self)
        self._n = n
        self._who_am_i = who_am_i

    def run(self):                          # redefine run()
        while self._n > 0:
            print("{}: {}".format(self._who_am_i if self._who_am_i else ".", self._n))
            self._n -= 1
            time.sleep(.5)
            
class CountUpThread(threading.Thread):    # inherit from Thread

    def __init__(self, n, who_am_i=None):
        threading.Thread.__init__(self)
        self._n = n
        self._who_am_i = who_am_i

    def run(self):                          # redefine run()
        k = 0
        while k < self._n :
            print("{}: {}".format(self._who_am_i if self._who_am_i else ".", k))
            k += 1
            time.sleep(.5)


t1 = CountDownThread(5, 'A')       # executes until run() stops
t2 = CountDownThread(8, 'B')       # executes until run() stops
t3 = CountUpThread(5, 'C')

t1.start()
t2.start()
t3.start()

print('over and out! well... maybe not!')

An alternative way of calling the threads

In [None]:
import time
import threading

def countdown(n, name):
    while n > 0:
        print('{}:{}'.format(name, n))
        n -= 1
        time.sleep(.5)


# Creates a Thread object, but its run() method just calls the given function
threading.Thread(target=countdown, args=(5, 'A')).start()
threading.Thread(target=countdown, args=(5, 'B')).start()

print('over and out!')

In [None]:
import time
import threading

def countdown(n):
    while n > 0:
        print(n)
        n -= 1
        time.sleep(.5)

t1 = threading.Thread(target=countdown, args=(10,))
t2 = threading.Thread(target=countdown, args=(5,))

t1.start()
t2.start()

t1.join()       # Use t.join() to wait for a thread to exit
t2.join()

print('over and out!')

* Threads share all of the data in your program
* Thread scheduling is non-deterministic
* Operations often take several steps and might be interrupted mid-stream (non-atomic)
* Thus, access to any kind of shared data is also non-deterministic

In [None]:
import time
import threading

def my_print(s):
    time.sleep(.5)
    print(s + ' ', end='')

print('Why did the multithreaded chicken cross the road?')

for s in 'To get to the other side.'.split():
    threading.Thread(target=my_print, args=(s,)).start()

Accessing Shared Data

In [None]:
import threading

M, k = 1000000, 0

def up():
    global k
    for i in range(M):
        k += 1

def down():
    global k
    for i in range(M):
        k -= 1

t1 = threading.Thread(target=up)
t2 = threading.Thread(target=down)

t1.start() 
t2.start()

t1.join()
t2.join()

print(k)   # Oh! this (almost) never is equal to zero!?

## Thread Synchronization Primitives: Mutex Locks

* Acquired locks must always be released
* However, it gets evil with exceptions and other non-linear forms of control-flow
* There are synchronization primitives, look for: Lock, RLock, Semaphore, BoundedSemaphore, Event, and Condition


In [None]:
import threading
        
lock = threading.Lock()

M = 1000000;        
k = 0

def up():
    global k, lock
    for i in range(M):
        lock.acquire()
        k += 1
        lock.release()

def down():
    global k, lock
    for i in range(M):
        lock.acquire()
        k -= 1
        lock.release()

t1 = threading.Thread(target=up)
t2 = threading.Thread(target=down)

t1.start()
t2.start()

t1.join()
t2.join()        
print(k)

Using the `with` command

In [None]:
import threading
        
lock = threading.Lock()

M = 1000000;        
k = 0

def up():
    global k, lock
    for i in range(M):
        with lock:
            k += 1
            
def down():
    global k, lock
    for i in range(M):
        with lock:
            k -= 1

t1 = threading.Thread(target=up)
t2 = threading.Thread(target=down)

t1.start()
t2.start()

t1.join()
t2.join()        
print(k)

## Queues

In [None]:
import urllib.request, time, threading
from queue import Queue
from urls import url_list           # url_list - tuple of urls 

In [None]:
N = 20                              # number of urls to fetch

In [None]:
""" count the chars in a list of web pages"""  # SEQUENCIAL VERSION

chars_total = 0                     # chars counter

def get_page_size(url):             # sum the chars of each page
    global chars_total
    try:
        with urllib.request.urlopen(url) as response:
            chars_total += len(response.read())
    except:
        print('error reading {}'.format(url))

s = time.time()
for url in url_list[:N]:           
    get_page_size(url)

print(chars_total)
print('took {} seconds'.format(time.time() - s))

In [None]:
''' count the chars in a list of web pages '''  # THREADED VERSION

def queued_get_page_size():
    global chars_total
    while not q.empty():
        url = q.get()               # get a 'job' from the Queue
        try:
            with urllib.request.urlopen(url) as response:
                lock.acquire()      # just in case!
                chars_total += len(response.read())
                lock.release()
        except:
            print('error reading {}'.format(url))
        q.task_done()               # Signal that work is done


chars_total = 0                     # chars counter

s = time.time();        

q = Queue();                        # define a queue 
for url in url_list[:N]:            # 'put' jobs in the Queue
    q.put(url)

lock = threading.Lock()

workers = []
for _ in range(10):                 # create 10 (!) workers
    w = threading.Thread(target=queued_get_page_size)
    workers.append(w)
    w.start()                       

q.join()                            # Wait for all work to be done

for w in workers:
    w.join()

print(chars_total)
print('took {} seconds'.format(time.time() - s))