In [12]:
# Код для выполнения в независимом потоке
import time
from threading import Thread

def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(1)

# Создать и запустить поток
t = Thread(target=countdown, args=(5,))
t.start()
    


T-minus 5


T-minus 4
T-minus 3
T-minus 2
T-minus 1


In [18]:
class CountdownTask:
    def __init__(self):
        self._running = True
        
    def terminate(self):
        self._running = False
        
    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(1)
            

c = CountdownTask()
t = Thread(target=c.run, args=(5,))
t.start()

# c.terminate() # Сигнал завершения
t.join() # Ждать реального завершения (если необходимо)

T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


In [23]:
from threading import Thread

class CountdownThread(Thread):
    def __init__(self, n):
        super().__init__()
        self.n = n

    def run(self):
        while self.n > 0:
            print('T-minus', self.n)
            self.n -= 1
            time.sleep(1)
            

c = CountdownThread(5)
c.start()

T-minus 5


T-minus 4
T-minus 3
T-minus 2
T-minus 1


In [28]:
from threading import Thread, Event
import time

# Код для выполнения в независимом потоке
def countdown(n, started_evt):
    print('countdown starting')
    started_evt.set()
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(1)
        
# Создать объект события, который будет использован для сигнала о запуске
started_evt = Event()

# Запустить поток и передать событие запуска
print('Launching countdown')
t = Thread(target=countdown, args=(5, started_evt))
t.start()

# Ждать запуска потока
started_evt.wait()
print('countdown is running')

Launching countdown
countdown starting
T-minus 5
countdown is running


T-minus 4
T-minus 3
T-minus 2
T-minus 1


In [31]:
import threading
import time

class PeriodicTimer:
    def __init__(self, interval):
        self._interval = interval
        self._flag = 0
        self._cv = threading.Condition()
        
    def start(self):
        t = threading.Thread(target=self.run)
        t.daemon = True
        t.start()
        
    def run(self):
        '''
        Запустить таймер и уведомлять ждущие потоки
        после каждого интервала
        '''
        while True:
            time.sleep(self._interval)
            with self._cv:
                self._flag ^= 1
                self._cv.notify_all()
                
    def wait_for_tick(self):
        '''Ждать следующего срабатывания таймера'''
        with self._cv:
            last_flag = self._flag
            while last_flag == self._flag:
                self._cv.wait()
                

# Пример использования таймера
ptimer = PeriodicTimer(1)
ptimer.start()

# Два потока, синхронизирующихся по таймеру
def countdown(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print('T-minus ', nticks)
        nticks -= 1
        
def countup(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print('Counting ', n)
        n += 1
        

threading.Thread(target=countdown, args=(6,)).start()
threading.Thread(target=countup, args=(5,)).start()

T-minus Counting  0
 6
Counting  1
T-minus  5
T-minus Counting  2
 4
Counting T-minus  3
 3
Counting T-minus  2
 4
T-minus  1


In [34]:
import threading

# Поток-воркер
def worker(n, sema):
    # Ждет сигнала
    sema.acquire()
    # Выполняет работу
    print('Working', n)
    
# Создаем несколько потоков
sema = threading.Semaphore(0)
nworkers = 10

for n in range(nworkers):
    t = threading.Thread(target=worker, args=(n, sema,))
    t.start()
    
sema.release()
sema.release()

Working 0
Working 1


In [1]:
from concurrent.futures import ThreadPoolExecutor
import urllib.request

def fetch_url(url):
    u = urllib.request.urlopen(url)
    data = u.read()
    return data

pool = ThreadPoolExecutor(10)
# Отправить работу в пул
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')

# Получить результаты
x = a.result()
y = b.result()

print(x)
print(y)

b'<!DOCTYPE html>\n<html \\ prefix="\n        og: http://ogp.me/ns# article: http://ogp.me/ns/article#\n    " vocab="http://ogp.me/ns" lang="en">\n<head>\n<meta charset="utf-8">\n<meta name="viewport" content="width=device-width, initial-scale=1">\n<title>PyPy</title>\n<link href="assets/css/rst_base.css" rel="stylesheet" type="text/css">\n<link href="assets/css/nikola_rst.css" rel="stylesheet" type="text/css">\n<link href="assets/css/code.css" rel="stylesheet" type="text/css">\n<link href="assets/css/theme.css" rel="stylesheet" type="text/css">\n<link href="assets/css/styles.css" rel="stylesheet" type="text/css">\n<meta name="theme-color" content="#5670d4">\n<meta name="generator" content="Nikola (getnikola.com)">\n<link rel="alternate" type="application/rss+xml" title="RSS" hreflang="en" href="rss.xml">\n<link rel="canonical" href="https://www.pypy.org/">\n<link rel="icon" href="favicon2.ico" sizes="16x16">\n<link rel="icon" href="favicon32x32.ico" sizes="32x32">\n<!--[if lt IE 9]><s

In [2]:
from queue import Queue
from threading import Thread, Event

# Страж, использующийся для отключения
class ActorExit(Exception):
    pass

class Actor:
    def __init__(self):
        self._mailbox = Queue()
        
    def send(self, msg):
        '''Посылает сообщение в актор'''
        self._mailbox.put(msg)
        
    def recv(self):
        '''Получает входящее сообщение'''
        msg = self._mailbox.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg
    
    def close(self):
        '''Закрывает актор и отключает его'''
        self.send(ActorExit)
        
    def start(self):
        '''Запускает конкурентное выполнение'''
        self._terminated = Event()
        t = Thread(target=self._bootstrap)
        t.daemon = True
        t.start()
        
    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()
            
    def join(self):
        self._terminated.wait()
        
    def run(self):
        '''Запускает метод, который реализует пользователь'''
        while True:
            self.recv()
            

# Пример ActorTask
class PrintActor(Actor):
    def run(self):
        while True:
            msg = self.recv()
            print('Got:', msg)
            

# Пример использования
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()   

Got: Hello
Got: World


In [5]:
class TaggedActor(Actor):
    def run(self):
        while True:
            tag, *payload = self.recv()
            getattr(self, 'do_' + tag)(*payload)
    
    # Методы, соответствующие различным тегам сообщений
    def do_A(self, x):
        print('Running A', x)

    def do_B(self, x, y):
        print('Running B', x, y)
        
# Пример
a = TaggedActor()
a.start()
a.send(('A', 1))  
a.send(('B', 2, 3))  

Running A 1
Running B 2 3


In [6]:
from threading import Event

class Result:
    def __init__(self):
        self._evt = Event()
        self._result = None

    def set_result(self, value):
        self._result = value
        self._evt.set()

    def result(self):
        self._evt.wait()
        return self._result
    

class Worker(Actor):
    def submit(self, func, *args, **kwargs):
        r = Result()
        self.send((func, args, kwargs, r))
        return r
    
    def run(self):
        while True:
            func, args, kwargs, r = self.recv()
            r.set_result(func(*args, **kwargs))
            

# Пример использования
worker = Worker()
worker.start()
r = worker.submit(pow, 2, 3)
print(r.result())

8


In [8]:
from collections import deque

# Два простых генератора
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield
        n -= 1
    print('Blastoff!')

def countup(n):
    x = 0
    while x < n:
        print('Counting up', x)
        yield
        x += 1
        

class TaskScheduler:
    def __init__(self):
        self._task_queue = deque()
    
    def new_task(self, task):
        '''Допускает новую запущенную задачу в планировщик'''
        self._task_queue.append(task)
        
    def run(self):
        '''Работает, пока не останется задач'''
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                # Работает до следующей инструкции yield
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                # Генератор более не выполняется
                pass

# Пример использования
sched = TaskScheduler()
sched.new_task(countdown(5))
sched.new_task(countdown(3))
sched.new_task(countup(10))
sched.run()

T-minus 5
T-minus 3
Counting up 0
T-minus 4
T-minus 2
Counting up 1
T-minus 3
T-minus 1
Counting up 2
T-minus 2
Blastoff!
Counting up 3
T-minus 1
Counting up 4
Blastoff!
Counting up 5
Counting up 6
Counting up 7
Counting up 8
Counting up 9


In [11]:
from collections import deque

class ActorScheduler:
    def __init__(self):
        self._actors = {} # Отображение имен на акторы
        self._msg_queue = deque() # Очередь сообщений
        
    def new_actor(self, name, actor):
        '''Допускает новый запущенный актор в планировщик и дает ему имя'''
        self._msg_queue.append((actor, None))
        self._actors[name] = actor
        
    def send(self, name, msg):
        '''Посылает сообщение актору с соответствующим именем'''
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor, msg))
            
    def run(self):
        '''Работает до тех пор, пока в очереди есть сообщения'''
        while self._msg_queue:
            actor, msg = self._msg_queue.popleft()
            try:
                actor.send(msg)
            except StopIteration:
                pass
            

def printer():
    while True:
        msg = yield
        print('Got:', msg)
        
def counter(sched):
    while True:
        # Получить текущий счет
        n = yield
        if n == 0:
            break
        # Послать задаче-принтеру
        sched.send('printer', n)
        # Послать следующий счет задаче-счетчику (рекурсивно)
        sched.send('counter', n-1)
        
        
sched = ActorScheduler()
# Создать первоначальные акторы
sched.new_actor('printer', printer())
sched.new_actor('counter', counter(sched))

# Послать начальное сообщение в счетчик для инициализации
sched.send('counter', 5)
sched.run()

Got: 5
Got: 4
Got: 3
Got: 2
Got: 1


In [3]:
import threading, queue
import time

def washer(dishes, dish_queue):
    for dish in dishes:
        print ("Washing", dish)
        time.sleep(5)
        dish_queue.put(dish)

def dryer(dish_queue):
    while True:
        dish = dish_queue.get()
        print (" Drying", dish)
        time.sleep(10)
        dish_queue.task_done()
        
dish_queue = queue.Queue()

for n in range(2):
    dryer_thread = threading.Thread(target=dryer, args=(dish_queue,))
    dryer_thread.start()

dishes = ['salad', 'bread', 'entree', 'desert']
washer(dishes, dish_queue)
dish_queue.join()

Washing salad
Washing bread
 Drying salad
Washing entree
 Drying bread
Washing Drying entree
 desert
 Drying desert
