## Dequeue

In [1]:
from collections import deque

### 当作 stack 使用

In [2]:
stack = deque()
stack.append('cat')
stack.append('dog')
stack.append('tiger')
print(stack)

deque(['cat', 'dog', 'tiger'])


In [3]:
stack.pop()

'tiger'

In [4]:
stack.pop()

'dog'

In [5]:
stack.pop()

'cat'

In [6]:
stack.pop()

IndexError: pop from an empty deque

### 当作队列

In [7]:
queue = deque()
queue.append('cat')
queue.append('dog')
queue.append('tiger')
print(queue)

deque(['cat', 'dog', 'tiger'])


In [8]:
queue.popleft()

'cat'

In [9]:
queue.popleft()

'dog'

In [10]:
queue.popleft()

'tiger'

In [11]:
queue.popleft()

IndexError: pop from an empty deque

In [None]:
def tail(filename, n = 10):
    """Return the last n lines of a file"""
    with open(filename) as f:
        return deque(f, n)

In [12]:
d = deque([1, 2, 3, 4, 5])
d.rotate(1)
d

deque([5, 1, 2, 3, 4])

In [23]:
def roundrobin(*iterables):
    """roundrobin('ABC', 'D', 'EF') -> A, D, E, B, F, C"""
    iterators = deque(map(iter, iterables))
    while iterators:
        try:
            while True:
                yield next(iterators[0])
                iterators.rotate(-1)
        except StopIteration:
            # Remove an exhaused iterator
            # print(iterators)
            iterators.popleft()

In [24]:
list(roundrobin('ABC', 'D', 'EF'))

['A', 'D', 'E', 'B', 'F', 'C']

## Queue

In [1]:
from queue import Queue

In [2]:
simpleq = Queue()

In [3]:
simpleq.put('cat')
simpleq.put('dog')
simpleq.put('tiger')
print(simpleq)

<queue.Queue object at 0x000002480DA5E720>


In [4]:
simpleq.get()

'cat'

In [5]:
simpleq.get()

'dog'

In [6]:
simpleq.get()

'tiger'

In [1]:
# 当没有数据时会阻塞，等待有数据输入
simpleq.get()

NameError: name 'simpleq' is not defined

```python
Queue.get(block=True, timeout=None)
```
从队列中移除并返回一个项目(数据)。

```block=Ture```: 意味着没有获取到数据时会发生阻塞，直到获取到数据。如果此时设置了 ```timeout``, 则会最多阻塞 timeout 秒，超过了这个时间将引发 Empty 异常。

```block=False```: 时如果没有获取到数据，立即引发 Empty 异常。

```python
Queue.task_done()
```
表示前面排队的任务已经被完成，被队列的消费者线程使用。

每个get()被用于获取一个任务，后续调用 task_done() 告诉队列，该任务的处理已经完成。

如果 join()当前正在阻塞，在所有条目都被处理后，将接触阻塞（意味着每个 put() 进队列的条目的 task_done() 都被收到）。
如果被调用的次数多于放入队列中的项目数量，将引发 ValueError 异常。

```python
Queue.join()
```
阻塞至队列中所有的元素都被接收和处理完毕。当条目添加到队列的时候，未完成的任务的计数就会增加。每当消费者线程调用 task_done 表示这个条目已经被回收，该条目所有工作已经完成，未完成计数就会减少。当未完成计数降到零的时候，join()阻塞被解除。

In [1]:
import threading
import queue

def worker(q):
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

q = queue.Queue()
threading.Thread(target=worker, args=(q, ), daemon=True).start()

for item in range(10):
    q.put(item)
q.join()
print('All work done.')

Working on 0
Finished 0
Working on 1
Finished 1
Working on 2
Finished 2
Working on 3
Finished 3
Working on 4
Finished 4
Working on 5
Finished 5
Working on 6
Finished 6
Working on 7
Finished 7
Working on 8
Finished 8
Working on 9
Finished 9
All work done.


In [10]:
from queue import Queue
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

def worker(q, idx):
    print('--- Exrcuteing Job ---')
    data = q.get()
    time.sleep(3)
    print('Done', idx)
    return f'{data} [H]'

q = Queue()
start_time = datetime.now()
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {
        executor.submit(worker, q, idx): idx for idx in range(12)
    }

    # 生产者制造数据
    for i in range(11):
        q.put(i)

    # print("制造数据完成")
    # 归并结果
    for f in as_completed(futures):
        print('RESULT: ', f.result())

end_time = datetime.now()
elapsed = end_time - start_time
print(f'elapsed {elapsed}')

--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
DoneDone 3
--- Exrcuteing Job ---
Done 2
--- Exrcuteing Job ---
Done 1
--- Exrcuteing Job ---
 0
--- Exrcuteing Job ---
RESULT:  3 [H]
RESULT:  2 [H]
RESULT:  1 [H]
RESULT:  0 [H]
Done 7
--- Exrcuteing Job ---
RESULT:  7 [H]
Done 5
--- Exrcuteing Job ---
RESULT:  5 [H]
Done 6
--- Exrcuteing Job ---
RESULT:  6 [H]
Done 4
--- Exrcuteing Job ---
RESULT:  4 [H]
Done 10
RESULT:  10 [H]
Done 8
RESULT:  8 [H]
Done 9
RESULT:  9 [H]


: 

: 

## LifoQueue

In [1]:
from queue import LifoQueue

In [2]:
squeue = LifoQueue()
squeue.put('cat')
squeue.put('dog')
squeue.put('tiger')
print(squeue)

<queue.LifoQueue object at 0x000001C4FFD438C0>


In [3]:
squeue.get()

'tiger'

In [4]:
squeue.put('fox')

In [5]:
squeue.get()

'fox'

In [6]:
squeue.get()

'dog'

In [7]:
squeue.get()

'cat'

Lifo 队列为空时继续取数据，会发生阻塞。直到新的数据进入。

In [None]:
squeue.get()

: 

: 

In [None]:
squeue.put('x')

In [9]:
import threading
import queue
import time

def worker(q):
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finisher {item}')
        q.task_done()

q = queue.LifoQueue()
threading.Thread(target=worker, args=(q, ), daemon=True).start()
for item in range(10):
    q.put(item)
    # time.sleep(3)
q.join()
print('All work done.')

Working on 9
Finisher 9
Working on 8
Finisher 8
Working on 7
Finisher 7
Working on 6
Finisher 6
Working on 5
Finisher 5
Working on 4
Finisher 4
Working on 3
Finisher 3
Working on 2
Finisher 2
Working on 1
Finisher 1
Working on 0
Finisher 0
All work done.


## Muiltiprocessing 中 Queue 作为共享作业队列
解决 python 环境中多线程的 GIL(global interpreter lock)

```python
multiprocessing.Queue
```
进程间共享数据， 跨进程工作， 解决 GIL限制

In [1]:
from multiprocessing import Queue

In [12]:
multiq = Queue()
multiq.put('cat')
multiq.put('dog')
multiq.put('tiger')

In [13]:
print(multiq)

<multiprocessing.queues.Queue object at 0x00000209A5A4C6B0>


In [14]:
multiq.get()

'cat'

In [15]:
multiq.get()

'dog'

In [16]:
multiq.get()

'tiger'

In [None]:
multiq.get()

In [1]:
import time
import random

from multiprocessing import Process, Queue, current_process

def worker(input: Queue, output: Queue) -> None:
    for func, args in iter(input.get, 'QUIT'):
        result = calculate(func, args)
        output.put(result)

def calculate(func, args):
    result = func(*args)
    return f'{current_process().name}: {func.__name__}{args} = {result}'

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def main():
    NUMBER_OF_PROCESS = 4
    COUNT_OF_TASK1 = 20
    COUNT_OF_TASK2 = 10
    TASK1 = [(mul, (i, 10)) for i in range(COUNT_OF_TASK1)]
    TASK2 = [(plus, (i, 20)) for i in range(COUNT_OF_TASK2)]

    task_queue = Queue()
    done_queue = Queue()

    for task in TASK1:
        task_queue.put(task)

    for i in range(NUMBER_OF_PROCESS):
        Process(target=worker, args=(task_queue, done_queue)).start()

    print('result:')
    for i in range(len(TASK1)):
        print('\t', done_queue.get())

    for task in TASK2:
        task_queue.put(task)

    for i in range(len(TASK2)):
        print('\t', done_queue.get())

    # Send STOP message to stop the child process
    for i in range(NUMBER_OF_PROCESS):
        task_queue.put('QUIT')

In [None]:
if __name__ == '__main__':
    main()

result:


In [1]:
%%writefile multiprocess_queue.py
import time
import random

from multiprocessing import Process, Queue, current_process

def worker(input: Queue, output: Queue) -> None:
    for func, args in iter(input.get, 'QUIT'):
        result = calculate(func, args)
        output.put(result)

def calculate(func, args):
    result = func(*args)
    return f'{current_process().name}: {func.__name__}{args} = {result}'

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def main():
    NUMBER_OF_PROCESS = 4
    COUNT_OF_TASK1 = 20
    COUNT_OF_TASK2 = 10
    TASK1 = [(mul, (i, 10)) for i in range(COUNT_OF_TASK1)]
    TASK2 = [(plus, (i, 20)) for i in range(COUNT_OF_TASK2)]

    task_queue = Queue()
    done_queue = Queue()

    for task in TASK1:
        task_queue.put(task)

    for i in range(NUMBER_OF_PROCESS):
        Process(target=worker, args=(task_queue, done_queue)).start()

    print('result:')
    for i in range(len(TASK1)):
        print('\t', done_queue.get())

    for task in TASK2:
        task_queue.put(task)

    for i in range(len(TASK2)):
        print('\t', done_queue.get())

    # Send STOP message to stop the child process
    for i in range(NUMBER_OF_PROCESS):
        task_queue.put('QUIT')

if __name__ == "__main__":
    main()

Overwriting multiprocess_queue.py


In [2]:
%run multiprocess_queue.py

result:
	 Process-4: mul(3, 10) = 30
	 Process-4: mul(4, 10) = 40
	 Process-3: mul(2, 10) = 20
	 Process-1: mul(0, 10) = 0
	 Process-1: mul(7, 10) = 70
	 Process-2: mul(1, 10) = 10
	 Process-3: mul(6, 10) = 60
	 Process-2: mul(9, 10) = 90
	 Process-2: mul(11, 10) = 110
	 Process-3: mul(10, 10) = 100
	 Process-4: mul(5, 10) = 50
	 Process-1: mul(8, 10) = 80
	 Process-2: mul(12, 10) = 120
	 Process-1: mul(15, 10) = 150
	 Process-3: mul(13, 10) = 130
	 Process-2: mul(16, 10) = 160
	 Process-4: mul(14, 10) = 140
	 Process-2: mul(19, 10) = 190
	 Process-1: mul(17, 10) = 170
	 Process-3: mul(18, 10) = 180
	 Process-1: plus(2, 20) = 22
	 Process-2: plus(1, 20) = 21
	 Process-3: plus(3, 20) = 23
	 Process-3: plus(6, 20) = 26
	 Process-4: plus(0, 20) = 20
	 Process-1: plus(4, 20) = 24
	 Process-4: plus(8, 20) = 28
	 Process-2: plus(5, 20) = 25
	 Process-3: plus(7, 20) = 27
	 Process-1: plus(9, 20) = 29


## ProorityQueue
内部实现为 heapq

O($log_{n}$)

In [3]:
import heapq

items = []

heapq.heappush(items, (2, 'dog'))
heapq.heappush(items, (1, 'cat'))
heapq.heappush(items, (3, 'tiger'))

while items:
    nex_thing = heapq.heappop(items)
    print(nex_thing)

(1, 'cat')
(2, 'dog')
(3, 'tiger')


In [4]:
from queue import PriorityQueue

In [6]:
pq = PriorityQueue()
pq.put((2, 'dog'))
pq.put((3, 'tiger'))
pq.put((1, 'cat'))

In [7]:
while not pq.empty():
    next_element = pq.get()
    print(next_element)

(1, 'cat')
(2, 'dog')
(3, 'tiger')


In [4]:
from queue import PriorityQueue
import time
import random
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED

def worker(input_queue: PriorityQueue, output_queue: PriorityQueue, idx):
    print('--- Exrcuteing Job ---')
    data = input_queue.get()
    time.sleep(3)
    print('Done', idx)
    # output_queue.put(f'{data} [H]')
    output_queue.put(data)

input_queue = PriorityQueue()
output_queue = PriorityQueue()
start_time = datetime.now()
with ThreadPoolExecutor(max_workers=12) as executor:
    futures = {
        executor.submit(
            worker, input_queue, output_queue, idx): idx for idx in range(12)
    }

    # 生产者制造数据
    raw_data = list(range(12))
    random.shuffle(raw_data)
    print('Original Data: ', raw_data)
    for data in raw_data:
        input_queue.put(data * data)

    # print("制造数据完成")
    # 归并结果
    wait(futures, return_when=ALL_COMPLETED)
    result = [output_queue.get() for _ in range(len(raw_data))]
    print('RESULT: ', result)

end_time = datetime.now()
elapsed = end_time - start_time
print(f'elapsed {elapsed}')

--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
--- Exrcuteing Job ---
Original Data:  [4, 8, 7, 3, 9, 6, 11, 10, 0, 2, 1, 5]
Done 8
Done 11
Done 9
Done 7
Done 2
Done 1
Done 6
Done 5
Done 4
Done 0
Done 10
Done 3
RESULT:  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121]
elapsed 0:00:03.011030
