From d2685ff13a39b78e0360cb992c9c47f168e6f8f5 Mon Sep 17 00:00:00 2001 From: Bruno Sutic Date: Thu, 24 Dec 2020 20:44:53 +0100 Subject: [PATCH] Enable enqueuing multiple items to Async::Queue --- lib/async/queue.rb | 25 +++++++++++++++--- spec/async/queue_spec.rb | 55 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/lib/async/queue.rb b/lib/async/queue.rb index 53eb5f2b..3629ac41 100644 --- a/lib/async/queue.rb +++ b/lib/async/queue.rb @@ -42,13 +42,17 @@ def empty? @items.empty? end - def enqueue(item) - @items.push(item) + def <<(item) + @items << item self.signal unless self.empty? end - alias << enqueue + def enqueue(*items) + @items.concat(items) + + self.signal unless self.empty? + end def dequeue while @items.empty? @@ -87,7 +91,7 @@ def limited? @items.size >= @limit end - def enqueue item + def <<(item) while limited? @full.wait end @@ -95,6 +99,19 @@ def enqueue item super end + def enqueue *items + while !items.empty? + while limited? + @full.wait + end + + available = @limit - @items.size + @items.concat(items.shift(available)) + + self.signal unless self.empty? + end + end + def dequeue item = super diff --git a/spec/async/queue_spec.rb b/spec/async/queue_spec.rb index 1e62a00c..e08d7073 100644 --- a/spec/async/queue_spec.rb +++ b/spec/async/queue_spec.rb @@ -42,6 +42,18 @@ end end + it 'can enqueue multiple items' do + items = Array.new(10) { rand(10) } + + reactor.async do |task| + subject.enqueue(*items) + end + + items.each do |item| + expect(subject.dequeue).to be == item + end + end + it 'can dequeue items asynchronously' do reactor.async do |task| subject << 1 @@ -53,6 +65,14 @@ end end + describe '#<<' do + it 'adds an item to the queue' do + subject << :item + expect(subject.size).to be == 1 + expect(subject.dequeue).to be == :item + end + end + describe '#size' do it 'returns queue size' do reactor.async do |task| @@ -120,7 +140,17 @@ subject.enqueue(10) expect(subject).to be_limited end - + + it 'enqueues items up to a limit' do + items = Array.new(2) { rand(10) } + reactor.async do + subject.enqueue(*items) + end + + expect(subject.size).to be 1 + expect(subject.dequeue).to be == items.first + end + it 'should resume waiting tasks in order' do total_resumed = 0 total_dequeued = 0 @@ -141,4 +171,27 @@ expect(total_resumed).to be == total_dequeued end end + + describe '#<<' do + context 'when queue is limited' do + before do + subject << :item1 + expect(subject.size).to be == 1 + expect(subject).to be_limited + end + + it 'waits until a queue is dequeued' do + reactor.async do + subject << :item2 + end + + reactor.async do |task| + task.sleep 0.01 + expect(subject.items).to contain_exactly :item1 + subject.dequeue + expect(subject.items).to contain_exactly :item2 + end + end + end + end end