In [1]:
class MyQueue:
    def __init__(self):
        self.que = []
    
    def enqueue(self, value):
        self.que.append(value)
    

    def dequeue(self):
        return self.que.pop(0)
    

    def __str__(self):
        return str(self.que)

In [2]:
q = MyQueue()

q.enqueue('a')
print(q)
q.enqueue('b')
print(q)
q.enqueue('c')
print(q)

val1 = q.dequeue()
print(q)
val2 = q.dequeue()
print(q)
val3 = q.dequeue()
print(q)

print(val1, val2, val3)

['a']
['a', 'b']
['a', 'b', 'c']
['b', 'c']
['c']
[]
a b c


## 固定長配列の場合のqueueの実装

In [3]:
class MyQueue:
    def __init__(self, N):
        self.N = N
        self.que = [None] * N
        self.head = 0 # キューの先頭
        self.tail = 0 # キューの末尾
        self.count = 0
    

    def enqueue(self, value):
        if self.count == self.N:
            # キューがいっぱいの場合はエンキュー失敗
            print("キューがいっぱいなのでエンキューできません")
            return
        
        # キューの最後尾に値を設定
        self.que[self.tail] = value
        
        # データ件数の追加
        self.count += 1

        # キューの最後尾の位置を一つ後ろにずらす
        self.tail = (self.tail + 1) % self.N

    def dequeue(self):
        # キューに要素がない場合はデキュー失敗
        if self.count == 0:
            print("キューに要素がないのででキューできません")
            return 
        
        # キューの先頭からデータを取り出す
        value = self.que[self.head]
        self.que[self.head] = None

        # データ件数を-1
        self.count -= 1

        # 先頭の位置を一つ進める
        self.head = (self.head + 1) % self.N

        # 取り出した値を返す
        return value
    

    def __str__(self):
        return f"(head:{self.head}, tail:{self.tail}), {self.que}"

In [4]:
q = MyQueue(4)
q.enqueue("a")
print(q)
q.enqueue("b")
print(q)
q.enqueue("c")
print(q)

a = q.dequeue()
print(q)

b = q.dequeue()
print(q)

q.enqueue("d")
q.enqueue("e")
print(q)

c = q.dequeue()
d = q.dequeue()
print(q)

(head:0, tail:1), ['a', None, None, None]
(head:0, tail:2), ['a', 'b', None, None]
(head:0, tail:3), ['a', 'b', 'c', None]
(head:1, tail:3), [None, 'b', 'c', None]
(head:2, tail:3), [None, None, 'c', None]
(head:2, tail:1), ['e', None, 'c', 'd']
(head:0, tail:1), ['e', None, None, None]


## 非同期処理でキューを渡す(マルチスレッド)

In [5]:
import threading
import queue
import time

q = queue.Queue()

def producer():
    """キューに処理対象データを追加"""
    print("producer: データ追加処理開始")
    # データをエンキュー
    # (nは勝利に要する時間で順番に実行すると処理時間は14秒かかる)
    q.put({"task": "データ1", "n": 2})
    q.put({"task": "データ2", "n": 3})
    q.put({"task": "データ3", "n": 5})
    q.put({"task": "データ4", "n": 4})
    print("producer: データ追加完了")


def worker():
    """キューからデータを取り出して処理を実行"""
    while True:
        item = q.get()
        if item is None:
            break
        print(f"worker, {item.get("task")}の処理を開始します")
        time.sleep(item.get("n")) # ダミー処理
        print(f"worker, {item.get("task")}の処理を終了します")
        q.task_done()


def main():
    print("全体処理開始")
    start_time = time.perf_counter()

    # producerのスレッドを起動
    pt = threading.Thread(target=producer)
    pt.start()

    # workerのスレッドを2つ起動
    wts: list[threading.Thread] = []
    for _ in range(2):
        t = threading.Thread(target=worker)
        t.start()
        wts.append(t)
    
    # キューがからになるまで処理をブロック
    q.join()
    # thread数分Noneをpushして各workerのthreadを終了
    for _ in range(len(wts)):
        q.put(None)
    
    # 各スレッドが終了するまで待機
    pt.join()
    [t.join() for t in wts]

    # 処理時間を表示
    end_time = time.perf_counter()
    elapsed_time = end_time - start_time
    print(f"全体処理終了: {elapsed_time}")

main()

全体処理開始
producer: データ追加処理開始
producer: データ追加完了
worker, データ1の処理を開始します
worker, データ2の処理を開始します
worker, データ1の処理を終了します
worker, データ3の処理を開始します
worker, データ2の処理を終了します
worker, データ4の処理を開始します
worker, データ3の処理を終了します
worker, データ4の処理を終了します
全体処理終了: 7.01495320815593


## 両端キューの使用(データ列の両側からデータの追加, 取り出しが可能なデータ構造 → stack, queueどちらにも対応)

In [6]:
from collections import deque

my_deque = deque()
my_deque.append(1)
my_deque.append(2)
my_deque.appendleft(3)
my_deque.appendleft(4)

print(my_deque)
x = my_deque.pop()
y = my_deque.popleft()

print(x, y)

deque([4, 3, 1, 2])
2 4
