# wprowadzenie i GIL

Upraszcza implementację interpretera kosztem jednowątkowości w jego obrębie - na raz może zostac wykonana tylko jedna instrukcja.


# Wątki

pakiet threading - do programowania wielowątkowego


Uruchomienie funkcji w nowym wątku jest proste:

In [None]:
from random import randint
from time import sleep
from threading import Thread

# Thread(group=None, target=None, name=None, args=(), kwargs={})
# group - zarezerwowane na przyszłość
# target - funkcja do wywołania
# name - unikalna nazwa wątku
# args, kwargs - argumenty przekazywane do funkcji


def f(*args, **kwargs):
    print("f({} ,{})".format(args, kwargs))
    sleep(randint(0, 3))
    return


for i in range(5):
    t = Thread(target=f, args=(i,), kwargs={'kwa': 'rg'})
    t.start()  # uruchomienie wątku 
    #t.join()  # oczekiwanie przez wątek wywołujący na wykonanie wątku wywoływanego
    
print("asdf")

Wątek można stworzyć dziedzicząc po klasie Thread

In [None]:
from threading import Thread


class NewThread(Thread):
    def __init__(self, *args, **kwargs):
        super(NewThread, self).__init__()
        self.args = args
        self.kwargs = kwargs
        
    def run(self):
        # metoda wywoływana przy wywołaniu .start()
        print('thread is running')
        print(self.args)
        print(self.kwargs)
    
    
t = NewThread(1, 2, 3, 4, 5, a=1, b=3)
t.start()
t.join()

## Współdzielenie zasobów

ograniczenie dostępu do seksji krytycznej przez pechanizm:
* Lock/RLock
* Semafor
* Event
* Barrier (Python3) - dzieli program na fazy. Przejście do kolejnej fazy jest konieczne tylko po zakończeniu wszystkich wątków z poprzeniej fazy

### Lock/RLock

Lock i RLock działają podobnie - jedyna różnica to ta, że RLock może być zwolniony tylko przez wątek, który go zajął

In [None]:
from random import randint
from time import sleep
from threading import Thread, Lock

def print_thread(thread):
    print('Wykonuje watek {}'.format(thread))
    sleep(randint(0, 2))  # sekcja krytyczna
    print('Konczy watek {}'.format(thread))
    
    
t1 = Thread(target=print_thread, args=(1,))
t2 = Thread(target=print_thread, args=(2,))
t3 = Thread(target=print_thread, args=(3,))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

print(50 * '=')
def print_thread_lock(thread, lock):
    #try:  # eksperyment - zwolanianie locka przez inny wątek - na klasie Lock DZIAŁA
    #    lock.release()
    #except:
    #    pass
    lock.acquire()  # lock, zostaje przechwycony
    print('Wykonuje watek {}'.format(thread))
    sleep(randint(0, 2))  # sekcja krytyczna
    print('Konczy watek {}'.format(thread))
    lock.release()  # lock zostaje zwolniony - nie zwolnienie blokady zablokuje inne wątki
    

lock = Lock()    
t1 = Thread(target=print_thread_lock, args=(1, lock))
t2 = Thread(target=print_thread_lock, args=(2, lock))
t3 = Thread(target=print_thread_lock, args=(3, lock))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

print(50 * '=' + '\n\n')
def print_thread_lock_with(thread, lock):
    with lock:  # to samo co wcześniej tylko z managerem kontekstu
        print('Wykonuje watek {}'.format(thread))
        sleep(randint(0, 2))  # sekcja krytyczna
        print('Konczy watek {}'.format(thread))
    

lock = Lock()    
t1 = Thread(target=print_thread_lock_with, args=(1, lock))
t2 = Thread(target=print_thread_lock_with, args=(2, lock))
t3 = Thread(target=print_thread_lock_with, args=(3, lock))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

In [None]:
# RLock

from random import randint
from time import sleep
from threading import Thread, RLock


def print_thread_lock(thread, lock):
    #try:  # eksperyment - zwolanianie locka przez inny wątek - na RLock NIE DZIAŁA
    #    lock.release()
    #except:
    #    pass
    lock.acquire()  # lock, zostaje przechwycony
    print('Wykonuje watek {}'.format(thread))
    sleep(randint(0, 2))  # sekcja krytyczna
    print('Konczy watek {}'.format(thread))
    lock.release()  # lock zostaje zwolniony - nie zwolnienie blokady zablokuje inne wątki
    

lock = RLock()    
t1 = Thread(target=print_thread_lock, args=(1, lock))
t2 = Thread(target=print_thread_lock, args=(2, lock))
t3 = Thread(target=print_thread_lock, args=(3, lock))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

print(50 * '=' + '\n\n')
def print_thread_lock_with(thread, lock):
    with lock:  # to samo co wcześniej tylko z managerem kontekstu
        print('Wykonuje watek {}'.format(thread))
        sleep(randint(0, 2))  # sekcja krytyczna
        print('Konczy watek {}'.format(thread))
    

lock = RLock()    
t1 = Thread(target=print_thread_lock_with, args=(1, lock))
t2 = Thread(target=print_thread_lock_with, args=(2, lock))
t3 = Thread(target=print_thread_lock_with, args=(3, lock))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

### Semafor

Semafor pozwala kilku wątkom na dostęp do jednego zasobu - zawiera wewnętrzna wartosć określającą ile wątków może mieć dostęp do zasobu.

Zajmowanie semafora:
- jeżeli wartość semafora jest > 0, wątek obniża wartosć o 1 i kontynuuje wykonanie swojego kodu
- jeżeli wartosć to 0 to wątek obniża wartosć semafora i czeka, aż wartość semafora będzie nieujemna
- jeżeli wartosć semafora < 0 wątek czeka aż ktoś zwolni dostęp do zasobu


In [None]:
from threading import Semaphore, BoundedSemaphore

semaphore = Semaphore(0)
print(semaphore._value)

# release zwiększa wartość wewnętrzego licznika semafora o 1
semaphore.release()
semaphore.release()
semaphore.release()
# acquire zmniejsza wewnętrzny licznik semafora o 1
semaphore.acquire()
print(semaphore._value)


print(50 * '=')
# BoundedSemaphore nie pozwala na zwiększenie wewnętrzego licznika powyżej początkowego
semaphore = BoundedSemaphore(4)
print(semaphore._value)
semaphore.acquire()
print(semaphore._value)
semaphore.release()
print(semaphore._value)
semaphore.release()


# Semafory mogą byc używane jako managery kontekstu

mutex to semafor zainicjalizowany wartością 1

### Synchronizacja warunkami (Conditions)

Condition wykrywa zmianę stanu aplikacji i powiadamia inny wątek czekajacy na tę zmianę stanu

In [None]:
from time import sleep
from threading import Thread, Condition

def consumer(condition):
    print('wewnatrz consumera; ')
    condition.acquire()  # acquire i release są konieczne do powiadamiania i czekania na powiadomienie
    print('consumer oczekuje na warunek; ')
    condition.wait()  # bez wywołania wait() consumer kontynuuje działanie
    print('consumer kontynuuje; ')
    condition.release()
    print('consumerer konczy; ')
    

def producer(condition):
    print('wewnatrz producera; ')
    sleep(1)
    condition.acquire()
    sleep(1)
    print('producer powiadiamia; ')
    condition.notify()
    print('producer zwalnia condition; ')
    sleep(3)  
    condition.release()  # póki producer nie zwolni warunku, consumer nie będzie mógł kontynuować
    print('producer konczy; ')
    
    
cond = Condition()
t1 = Thread(target=consumer, args=(cond,))
t2 = Thread(target=producer, args=(cond,))

t1.start()
t2.start()

t1.join()
t2.join()

# Warunki mogą być też używane jako managery kontekstu

### Synchronizacja zdarzeniem (Event

Działa podobnie do Condition, z tym że ustawia wewnatrzna flasę na true/false oznaczającą czy inny wątek może kontynuować

In [None]:
from time import sleep
from threading import Thread, Event

# nie przechwytuje się i zwalniania eventu (acquire/release)

def consumer(event):
    print('wewnatrz consumera; ')
    print('consumer oczekuje na zdarzenie; ')
    event.wait()  # oczekuje na flagę eventu będącą true
    print('consumer kontynuuje; ')
    event.clear()
    print('consumerer konczy; ')
    

def producer(event):
    print('wewnatrz producera; ')
    sleep(1)
    print('producer ustawia flage; ')
    event.set()  # inny wątek oczekujacy wykona się od razu
    print('producer ustawil flage; ')
    sleep(3)  
    print('producer konczy; ')
    
    
evt = Event()
t1 = Thread(target=consumer, args=(evt,))
t2 = Thread(target=producer, args=(evt,))

t1.start()
t2.start()

t1.join()
t2.join()

### Komunikacja przy użyciu kolejki (queue)

najważńiejsze 4 metody:
* put - dodaje element do kolejki
* get - pobiera element z kolejki
* task_done - oznacza zakończenie przetwarzania elementu (musi być wywołane)
* join - oczekuje na opróżnienie kolejki

In [None]:
from queue import Queue 
from random import randint
from time import sleep
from threading import Thread


def producer(name, queue):
    x = randint(1, 14)
    print('producer {} puts {}; '.format(name, x))
    queue.put(x)  # jezeli kolejka jest pełna to wyrzuca wyjatek - chyba że dodamy block=True
    print('producer {} finishes; '.format(name))
    
    
def consumer(name, queue):
    print('consumer {} starts; '.format(name))
    x = queue.get()  # jeżeli kolejka jest pusta to czeka aż się coś pojawi
    sleep(randint(0, 2))
    print('consumer {} got {}; '.format(name, x))
    queue.task_done()
    print('consumer {} finishes; '.format(name))
    

queue = Queue()
producers = [Thread(target=producer, args=(i, queue)) for i in range(5)]
consumers = [Thread(target=consumer, args=(i, queue)) for i in range(5)]

for producer, consumer in zip(producers, consumers):
    producer.start()
    consumer.start()
    

# Procesy

pakiet multiprocessing - zezwala na wspóldzielenie pamięci

interfejs modułu multiprocessing jest bardzo podobny do interfejsu modułu threading

In [None]:
# multiprocessing/ex1.py
import multiprocessing
import time

def f():
    time.sleep(1)
    print("running function f")
    

proc = multiprocessing.Process(target=f)
proc.start()  # uruchomienie procesu
proc.join()  # oczekiwanie na zakończenie procesu

print("koniec programu")

In [None]:
# multiprocessing/ex2.py
# żeby uruchomić proces w tle trzeba ustawić atrybut daemon na True 
# daemon jest zabijany, jezeli jego proces-rodzic skończy działanie
import multiprocessing
import time

def f():
    time.sleep(1)
    open('daemon-log', 'w').close()  # zostawianie pliku jako ślad po uruchomieniu daemona
    print("running function f")     # jezeli proces-rodzic zakończy się wcześniej niż daemon
                                     # ta linijka nie będzie wywołana i plik nie powstanie

    
proc = multiprocessing.Process(target=f)
proc.daemon = True
proc.start()
# proc.join()  # dodanie join oczekuje na zakończenie daemona

print("koniec programu")

In [None]:
# multiprocessing/ex3.py
# Daemon nie może tworzyć procesów potomnych

import multiprocessing
import time

def daemon_child():
    time.sleep(1)
    open('daemon-child-log', 'w').close()


def daemon():
    child = multiprocessing.Process(target=daemon_child)
    child.daemon = True
    child.start()
    time.sleep(2)
    open('daemon-log', 'w').close()
    print("running function f")

    
proc = multiprocessing.Process(target=daemon)
proc.daemon = True
proc.start()
time.sleep(0.1)

print("koniec programu")

to ręcznego ubicia procesu służy metoda .terminate() a do sprawdzenia czy proces jest uruchomiony służy metoda .is_alive().

kod ze statusem procesu można uzyskać z parametru .exitcode
status == 0 oznacza, że proces zakończył sie sukcesem
status > 0 oznacza, zę proces zakończył sie z tym statusem
status < 0 oznacza numer sygnału jaki został wysąłny do procesu

### Proces jako klasa

Tak jak w przypadku wątków

In [None]:
import multiprocessing


class NewProcess(multiprocessing.Process):
    def __init__(self, *args, **kwargs):  # __init__ nie jest konieczny
        super(NewProcess, self).__init__()
        # kod
        
    def run(self):
        print('Proces sie uruchomil')
        
proc = NewProcess()
proc.start()
proc.join()

## Komunikacja pomiędzy procesami

kolejki (Queues) i Potoki (Pipes)

### kolejki

In [None]:
# multiprocessing/ex4.py
import multiprocessing
import time
from random import randint


def producer(queue):
    print('producer startuje')
    for i in range(5):
        time.sleep(0.5)
        x = randint(1, 5)
        print('producer wyprodukowal: {}'.format(x))
        queue.put(x)
    print('producer konczy')
    
    
def consumer(queue):
    print('consumer startuje')
    while not queue.empty():  # pobiera póki kolejka nie będzie pusta
        x = queue.get()  # nie trzeba sygnalizować przetworzenia elementu
        time.sleep(1.3)
        # time.sleep(0.3)  # szybsza konsumpcja niz produkcja doprowadzi do opróznienia kolejki
                           # i zakończenia działania nawet jak potem jeszcze bedą produkowane
                           # elementy
        print('consumer przetworzyl {}'.format(x))
    print('consumer konczy')
    

queue = multiprocessing.Queue()
prod = multiprocessing.Process(target=producer, args=(queue,))
cons = multiprocessing.Process(target=consumer, args=(queue,))

prod.start()
time.sleep(0.5)
cons.start()

prod.join()
cons.join()
print('koniec programu')

In [None]:
# multiprocessing/ex5.py
# JoinableQueue posiada dodatkowe metody .task_done i .join
# do synchronizacji przetwarzania elementów


import multiprocessing
import time
import Queue
from random import randint


def producer(queue):
    print('producer startuje')
    for i in range(5):
        time.sleep(0.5)
        x = randint(1, 5)
        print('producer wyprodukowal: {}'.format(x))
        queue.put(x)
        queue.join()  # oczekiwanie na zakończenie przetwarzania
    print('producer konczy')
    
    
def consumer(queue):
    print('consumer startuje')
    while True:
        try:
            x = queue.get(timeout=2)
        except Queue.Empty:
            break
        time.sleep(1.3)
        queue.task_done()  # sygnalizacja zakończenia przetwarzania
        print('consumer przetworzyl {}'.format(x))
    print('consumer konczy')
    

queue = multiprocessing.JoinableQueue()
prod = multiprocessing.Process(target=producer, args=(queue,))
cons = multiprocessing.Process(target=consumer, args=(queue,))

prod.start()
time.sleep(0.5)
cons.start()

prod.join()
cons.join()
print('koniec programu')

### Potoki (Pipe)

Zwraca parę obiektów połaczonych przez potok. Do transmisji służą metody .send() i .recv()

In [None]:
import multiprocessing


koniec1, koniec2 = multiprocessing.Pipe()

koniec1.send('hello world')
msg = koniec2.recv()
print(msg)


# domyśłnie potoki są dwukierunkowe
koniec2.send('hello world')
msg = koniec1.recv()
print(msg)

koniec1.send('witam')
koniec1.send('serdecznie')
msg = koniec2.recv()  # odbiera tylko jedną wiadomość
print(msg)
msg = koniec2.recv()
print(msg)

In [None]:
# multiprocessing/ex6.py
import multiprocessing
import time
from random import randint


def producer(pipe):
    print('producer startuje')
    for i in range(5):
        time.sleep(0.5)
        x = randint(1, 5)
        print('producer wyprodukowal: {}'.format(x))
        pipe.send(x)
    print('producer konczy')
    
    
def consumer(pipe):
    print('consumer startuje')
    while pipe.poll(3):  # sprawdzanie czy coś jest - max 3 sekundy
        x = pipe.recv()
        time.sleep(1.3)
        print('consumer przetworzyl {}'.format(x))
    print('consumer konczy')
    

koniec1, koniec2 = multiprocessing.Pipe()
prod = multiprocessing.Process(target=producer, args=(koniec1,))
cons = multiprocessing.Process(target=consumer, args=(koniec2,))

prod.start()
time.sleep(0.5)
cons.start()

prod.join()
cons.join()
print('koniec programu')

## Synchronizacja

Synchronizacja wygląda podobnie jak dla wątków:
* Lock/RLock
* Event
* Condition
* Semaphore
* Barrier (Python3) - dzieli program na fazy - przejście do kolejnej fazy jest możliwe tylko po zakończeniu wszystkich procesów z poprzedniej fazy

## Pula procesów

Do zrównoleglania wykonania funkcji można użyc Puli procesów

In [None]:
from multiprocessing import Pool
import time

def f(x):
    print('funkcja startuje, argument {}'.format(x))
    time.sleep(x/10)
    return x

    
pool = Pool(processes=4)  # tworzy póle 4 procesów

result = pool.apply_async(f, (3,))  # wywołuję funkcję f z argumentem 3 i nie blokuje się
print(result.get(timeout=0.9))  # próbuje pobrać wartość po wykonaniu funkcji, ale daje na to
                                # tylko 0.9 sekundy

print(50 * '=')
print(pool.map(f, range(10)))   # wywołuje funkcję f na liscie 10 elementów - blokujące

print(50 * '=')
it = pool.imap(f, range(11))   # działa jak map tylko nie blokuje się i zwraca iterator
print(it.next())  # pobierany następny element z listy wyników
print(it.next())
print(it.next(timeout=0.9))  # pobieranie następnego elementu, ale z timeoutem
for i in it:
    print(i)

print(50 * '=')
result = pool.apply_async(time.sleep, (10,))
#print result.get(timeout=0.9)  # TimeoutError


pool.close()  # nie możńa dodawać nowych zadań do puli
#result = pool.apply_async(f, (3,))  # AssertionError

pool.join()  # oczekuje na zakończenie wszystkich pracujących procesów z puli

### od Pythona 3.2 funkcjonuje moduł concurrent.futures, który pomaga tworzyć pule procesów i wątków

In [None]:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from random import randint
from time import sleep


def loop(x):
    for i in range(10):
        print(f'{x}: {i}')
        sleep(randint(1,5)/10)
    return x

pool = ProcessPoolExecutor(max_workers=2)
results = pool.map(loop, [1, 2])

print(list(results))


print(50 * '=')
pool = ThreadPoolExecutor(max_workers=2)
results = pool.map(loop, [1, 2])

print(list(results))

# Asynchroniczność

- Twisted (2.7)
- Tornado
- Asyncio (3.4+)

Asynchroniczność działa podobnie jak wątki, tylko przełączaniem pomiędzy wykonywanymi zadaniami zarządza petla event loop. Programista ma większy wpływ na kontrolę kiedy jakie zadanie jest przerywane

od pythona 3.4 w bibliotece standardowej funkcjonuje moduł asyncio a od 3.5 są dodatkowe słowa kluczowe dla wsparica programowania asynchronicznego

In [None]:
# python 3.4
import asyncio
import random

@asyncio.coroutine  # ten dekorator oznacza zę bedzie to korutyna, która ma być wywoływana
def coro(a):        # w pętli
    for i in range(10):
        print('coro-{}: {}'.format(a, i))
        yield       # yield oznacza przerwanie wywołania - w przyszłości pozwoli to
                    # na działanie innym korutynom

            
print(coro)  # pokazuje, że to jest funkcja
print(coro(1))  # pokazuje, zę to jest generator

c = coro(1)  # działa jak zwykły generator
next(c)
next(c)

In [None]:
# python 3.4
import asyncio
import random

@asyncio.coroutine  # ten dekorator oznacza zę bedzie to korutyna, która ma być wywoływana
def coro(a):        # w pętli
    for i in range(10):
        print('coro-{}: {}'.format(a, i))
        yield


# korutyny można odpalać w pętli eventów        
loop = asyncio.get_event_loop()  # pobieramy pętle eventów z systemu
loop.run_until_complete(coro(1))  # uruchomienie 1 korutyny

# generatory też - chyba, że cos yieldują
def gen():
    for i in range(10):
        print(i)
        yield

loop.run_until_complete(gen())
def gen():
    for i in range(10):
        yield i

loop.run_until_complete(gen())  # RuntimeError - koturyny nie mogą yieldować wartości - 
                                # yield korutynom słuzy do innej rzeczy

w korutynach można wywoływać funkcje blokujace - ale wstrzymuje to cała pętle

In [None]:
# python 3.4
import asyncio
import random
import time

@asyncio.coroutine  # ten dekorator oznacza zę bedzie to korutyna, która ma być wywoływana
def coro(a):        # w pętli
    for i in range(10):
        print('coro-{}: {}'.format(a, i))
        st = random.random() / 2
        time.sleep(st)  # zwykłe wywołanie funkcji blokującej sleep


task = asyncio.wait([coro(1), coro(2), coro(3)])  # asyncio.wait wykonuje korutyny z listy
                                                  # i czeka aż wszystkie się zakończą

loop.run_until_complete(task)  # nie ma przeplotu wywołań -
                               # zawsze wykonuje się jedna korutyna na raz

yield słuzy korutynom do przerwania działania - wtedy kontrolna nad przebiegiem programu jest "w rękach" pętli co pozwala na wykonanie innych zadań

In [None]:
import asyncio
import random
import time

@asyncio.coroutine
def coro(a): 
    for i in range(10):
        print('coro-{}: {}'.format(a, i))
        st = random.random() / 2
        time.sleep(st)
        yield


task = asyncio.wait([coro(1), coro(2), coro(3)])  

loop.run_until_complete(task)

żeby wywołać inną korutynę używa się yield from

In [None]:
import asyncio
import random
import time

@asyncio.coroutine
def coro(a): 
    for i in range(10):
        print('coro-{}: {}'.format(a, i))
        st = random.random() / 2
        yield from asyncio.sleep(st)


task = asyncio.wait([coro(1), coro(2), coro(3)])  

loop.run_until_complete(task)

od Pythona 3.5 są dodatkowe słowa kluczowe *await* zastępuje *yield from* a *async* zastępuje dekorator

In [None]:
# python 3.5+
import asyncio
import random

async def coro(a):
    for i in range(10):
        print('coro-{}: {}'.format(a, i))
        st = random.random() / 2
        await asyncio.sleep(st)


task = asyncio.wait([coro(1), coro(2), coro(3)])
loop.run_until_complete(task)

In [None]:
import asyncio
import random

async def coro(a):
    for i in range(10):
        print('coro-{}: {}'.format(a, i))
    return 33

print(coro)  # funkcja 
c = coro(1)
print(c)  # coroutine object

try:
    print('next(c)')
    next(c)  # nie można
except TypeError:
    print('TypeError')
    
    

try:    
    c.send(None)  # odpali subrutynę ale bedzie StopIteration na końcu
                  # nie działa to też jezeli bedą wywoływane inne corutyny - bo wywołujemy
                  # tylko jedną korutynę
except StopIteration as e:
    print('zwrócono:', e.value)

In [None]:
def run(c):
    try:
        c.send(None)
    except StopIteration as e:
        return e.value

W pythonie 3.6 nowe są 
- asynchroniczne petle
- asynchroniczne dekoratory
- asynchroniczne comprehensions
- asynchroniczne managery kontekstu

In [None]:
# asynchroniczny dekorator

import asyncio

async def gen():  # wewnątrz takiego generatora można używać await
    for i in range(10):
        print('gen')
        yield i
        
print(gen)
g = gen()
print(g)

async def aiter(g):
    x = []
    async for i in g:
        x.append(i)
    return x

val = run(aiter(g))
print(val)

In [None]:
# asynchroniczny dekorator
async def gen():
    for i in range(10):
        print('gen')
        yield i


async def aiter(g):
    return {i: i**2 async for i in g}
    return {i async for i in g}
    return [i async for i in g]
    return (i async for i in g)  # asynchroniczny generator - można użyc tylko z async for


val = run(aiter(gen()))
print(val)

In [None]:
class Manager:
    async def __aenter__(self):
        print('aenter')
        return 3
    
    async def __aexit__(self, *exc_info):
        print('aexit')
        return True
    
    
async def amanager(man):
    async with man as x:
        print('wewnątrz kontekstu')
        print(x)

run(amanager(Manager()))