# Параллельное программирование

## Модели параллельного программирования

* Модель с разделяемой памятью
* Многопоточная модель
* Модель передачи сообщений
* Модель с параллельными данными

## Модели с разделяемой памятью

Преимущества:
* Не надо контролировать взаимодействие между задачами

Недостатки:
* Сложность в управлении локализацией данных

## Многопоточные модели

Преимущества:
* Можно достигать параллелизм даже при нелинейном расположении данных

Недостатки:
* Управление синхронизацией между потоками бывает сложным

## Модель передачи сообщений

Преимущества:
* Простота написания кода

Недостатки:
* В библиотечных функциях, вызываемых для таких моделей, не исключены утечки памяти и баги

## Модель с параллельными данными

Преимущества:
* Каждый процесс работает со своими данными

Недостатки:
* Требуется определить распределение и группировку данных

## GIL - Global Interpreter Lock, Глобальная блокировка интерпретатора

Контроль за выполнением потоков

Гарантирует эксклюзивный доступ к переменным интерпретатора

In [None]:
sys.setcheckinterval()

## Потоки и процессы

|Потоки|Процессы|
|------|--------|
|Совместно используют память|Не разделяют память|
|Запуск/ изменение менее затратны с точки зрения вычислений|Запуск/ изменение затратны с точки зрения вычислений|
|Требуют меньше ресурсов (облегчённые процессы)|Требуют больше вычислительных ресурсов|
|Нуждаются в механизмах синхронизации для корректной обработки данных|Не требуется синхронизация памяти|

# Многопоточное программирование

In [None]:
import random

def do_smth(count, out_list):
    for i in range(count):
        out_list.append(random.randint(0,100))

In [None]:
import time

start_time = time.time()
size = 10000000 
n_exec = 10
for i in range(n_exec):
    out_list = list()
    do_smth(size, out_list)

end_time = time.time()
print("serial time=", end_time - start_time)

In [None]:
len(out_list)

In [None]:
import threading

start_time = time.time()
size = 10000000
threads = 10 
jobs = []
for i in range(threads):
    out_list = list()
    thread = threading.Thread(target=do_smth(size,out_list))
    jobs.append(thread)
    
end_time = time.time()
print("serial time=", end_time - start_time)

In [None]:
len(out_list)

In [None]:
import multiprocessing
from do_smth import do_smth
import time


start_time = time.time()
size = 10000000 
procs = 10 
jobs = []
out_list = []
for i in range(procs):
    out_list = []
    process = multiprocessing.Process(target=do_smth,args=(size,out_list))
    jobs.append(process)
    
for j in jobs:
    j.start()

for j in jobs:
    j.join()

print ("List processing complete.")
end_time = time.time()
print("multiprocesses time=", end_time - start_time)

## Параллельность на основе потоков

Поток (thread) это независимое течение (flow) которое может исполняться параллельно и одновременно с прочими потоками в системе

Возможные состояния потока:
* Готов
* Исполняется
* Блокирован

In [None]:
import threading

In [None]:
def my_function(thread_number):
    return print('Сalled thread {}'.format(thread_number))


threads = []
for i in range(10):
    t = threading.Thread(target=my_function, args=(i,))
    threads.append(t)
    t.start()
    t.join()


In [None]:
import threading
import time

def function_A():
    print (threading.current_thread().name+str(' starting \n'))
    time.sleep(2)
    print (threading.current_thread().name+str( ' exiting \n'))
  
def function_B():
    print (threading.current_thread().name+str(' starting \n'))
    time.sleep(2)
    print (threading.current_thread().name+str( ' exiting \n'))
  
def function_C():
    print (threading.current_thread().name+str(' starting \n'))
    time.sleep(2)
    print (threading.current_thread().name+str( ' exiting \n'))

In [None]:
t1 = threading.Thread(name='function_A', target=function_A)
t2 = threading.Thread(name='function_B', target=function_B)
t3 = threading.Thread(name='function_C',target=function_C) 

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

In [None]:
import time
import os
from random import randint
from threading import Thread

class MyThreadClass(Thread):
    def __init__(self, name, duration):
        Thread.__init__(self)
        self.name = name
        self.duration = duration 

    def run(self):
        print (self.name + " running, process ID " + str(os.getpid()) + "\n")
        time.sleep(self.duration)
        print (self.name + " over\n")

In [None]:
start_time = time.time()

# Создание потока
thread1 = MyThreadClass("Thread#1 ", randint(1,10))
thread2 = MyThreadClass("Thread#2 ", randint(1,10))
thread3 = MyThreadClass("Thread#3 ", randint(1,10))
thread4 = MyThreadClass("Thread#4 ", randint(1,10))
thread5 = MyThreadClass("Thread#5 ", randint(1,10))
thread6 = MyThreadClass("Thread#6 ", randint(1,10))
thread7 = MyThreadClass("Thread#7 ", randint(1,10))
thread8 = MyThreadClass("Thread#8 ", randint(1,10)) 
thread9 = MyThreadClass("Thread#9 ", randint(1,10))

# Запуск потока
thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread5.start()
thread6.start()
thread7.start()
thread8.start()
thread9.start()

# Присоединение потока
thread1.join()
thread2.join()
thread3.join()
thread4.join()
thread5.join()
thread6.join()
thread7.join()
thread8.join()
thread9.join()


In [None]:
import threading
import time
import os
from threading import Thread
from random import randint

threadLock = threading.Lock()

class MyThreadClass(Thread):
    def __init__(self, name, duration):
        Thread.__init__(self)
        self.name = name
        self.duration = duration
    def run(self):
        threadLock.acquire()
        print (self.name + " running, process ID " + str(os.getpid()) + "\n")
        time.sleep(self.duration)
        print (self.name + " over\n")
        threadLock.release()

In [None]:
thread1 = MyThreadClass("Thread#1 ", randint(1,10))
thread2 = MyThreadClass("Thread#2 ", randint(1,10))
thread3 = MyThreadClass("Thread#3 ", randint(1,10))
thread4 = MyThreadClass("Thread#4 ", randint(1,10))
thread5 = MyThreadClass("Thread#5 ", randint(1,10))
thread6 = MyThreadClass("Thread#6 ", randint(1,10))
thread7 = MyThreadClass("Thread#7 ", randint(1,10))
thread8 = MyThreadClass("Thread#8 ", randint(1,10))
thread9 = MyThreadClass("Thread#9 ", randint(1,10))

thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread5.start()
thread6.start()
thread7.start()
thread8.start()
thread9.start()

thread1.join()
thread2.join()
thread3.join()
thread4.join()
thread5.join()
thread6.join()
thread7.join()
thread8.join()
thread9.join()

In [None]:
import threading
import time
import os
from threading import Thread
from random import randint

threadLock = threading.Lock()

class MyThreadClass (Thread):
    def __init__(self, name, duration):
        Thread.__init__(self)
        self.name = name
        self.duration = duration
    def run(self):
        threadLock.acquire() 
        print (self.name + " running, process ID " + str(os.getpid()) + "\n")
        threadLock.release()
        time.sleep(self.duration)
        print (self.name + " over\n")

In [None]:
thread1 = MyThreadClass("Thread#1 ", randint(1,10))
thread2 = MyThreadClass("Thread#2 ", randint(1,10))
thread3 = MyThreadClass("Thread#3 ", randint(1,10))
thread4 = MyThreadClass("Thread#4 ", randint(1,10))
thread5 = MyThreadClass("Thread#5 ", randint(1,10))
thread6 = MyThreadClass("Thread#6 ", randint(1,10))
thread7 = MyThreadClass("Thread#7 ", randint(1,10))
thread8 = MyThreadClass("Thread#8 ", randint(1,10))
thread9 = MyThreadClass("Thread#9 ", randint(1,10))

thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread5.start()
thread6.start()
thread7.start()
thread8.start()
thread9.start()

thread1.join()
thread2.join()
thread3.join()
thread4.join()
thread5.join()
thread6.join()
thread7.join()
thread8.join()
thread9.join()


In [None]:
import threading
import time
import random

class Bag:
    def __init__(self):
        self.lock = threading.RLock()
        self.total_items = 0

    def execute(self, value):
        with self.lock:
            self.total_items += value

    def add(self):
        with self.lock:
            self.execute(1)

    def remove(self):
        with self.lock:
            self.execute(-1)
            
def adder(bag, items):
    print("{} items to ADD \n".format(items))
    while items:
        bag.add()
        time.sleep(1)
        items -= 1
        print("ADDED one item in result {} item to ADD \n".format(items))

def remover(bag, items):
    print("{} items to REMOVE\n".format(items))
    while items:
        bag.remove()
        time.sleep(1)
        items -= 1
        print("REMOVED one item in result {} item to REMOVE\n".format(items))
        

items = 10
bag = Bag()

t1 = threading.Thread(target=adder, \
                      args=(bag, random.randint(10,20)))
t2 = threading.Thread(target=remover, \
                      args=(bag, random.randint(2,10)))

t1.start()
t2.start()

t1.join()
t2.join()

In [None]:
bag.total_items

In [None]:
import threading
import time
import random

semaphore = threading.Semaphore(0)
item = 0

def getter():
    print('Getter is waiting')
    semaphore.acquire()
    print(f'Getter item number {item}')

def creator():
    global item
    time.sleep(3)
    item = random.randint(0, 100)
    print(f'creator item number {item}')
    semaphore.release()

    
for i in range(5):
    t1 = threading.Thread(target=getter)
    t2 = threading.Thread(target=creator)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

In [None]:
import threading
import time

items = []
condition = threading.Condition()


class Getter(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def get(self):
        with condition:
            if len(items) == 0:
                print('no items to get')
                condition.wait()

            items.pop()
            print('got 1 item')
            condition.notify()

    def run(self):
        for i in range(20):
            time.sleep(2)
            self.get()


class Creator(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def create(self):

        with condition:
            if len(items) == 10:
                print('items created {}. Stopped'.format(len(items)))
                condition.wait()

            items.append(1)
            print('total items {}'.format(len(items)))

            condition.notify()

    def run(self):
        for i in range(20):
            time.sleep(0.5)
            self.create()

In [None]:
t1 = Getter()
t2 = Creator()

t1.start()
t2.start()

t1.join()
t2.join()

In [None]:
items = []

event = threading.Event()

class Getter(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def run(self):
        while True:
            time.sleep(2)
            event.wait()
            item = items.pop()
            print(f'Getter {item} popped by {self.name}')
            
            
class Creator(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
    def run(self):
        for i in range(5):
            time.sleep(2)
            item = random.randint(0, 100)
            items.append(item)
            print(f'Creator item {item} appended by {self.name}')
            event.set()
        event.clear()

In [None]:
t1 = Creator()
t2 = Getter()

t1.start()
t2.start()

t1.join()
t2.join()

In [None]:
from random import randrange
from threading import Barrier, Thread
from time import ctime, sleep

num_runners = 3
finish_line = Barrier(num_runners)
runners = ['Huey', 'Dewey', 'Louie']

def runner():
    name = runners.pop()
    sleep(randrange(2, 5))
    print('%s reached barrier: %s \n' % (name, ctime()))
    finish_line.wait()

threads = []
print('START!!!!')
for i in range(num_runners):
    threads.append(Thread(target=runner))
    threads[-1].start()
for thread in threads:
    thread.join()
print('Over!')

In [None]:
from threading import Thread
from queue import Queue
import time
import random

class Creator(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue
    def run(self):
        for i in range(5):
            item = random.randint(0, 256)
            self.queue.put(item)
            print('Creator : %d appended to queue by %s\n' % (item, self.name))
            time.sleep(1)

class Getter(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get()
            print('Getter : %d popped from queue by %s' % (item, self.name))
            self.queue.task_done()

queue = Queue()
t1 = Creator(queue)
t2 = Getter(queue)
t3 = Getter(queue)
t4 = Getter(queue)

t1.start()
t2.start()
t3.start()
t4.start()

t1.join()
t2.join()
t3.join()
t4.join()
