diff --git a/lib/async/queue.rb b/lib/async/queue.rb index 3751e20f..9c81872b 100644 --- a/lib/async/queue.rb +++ b/lib/async/queue.rb @@ -43,14 +43,16 @@ def empty? @items.empty? end - def enqueue(item) - @items.push(item) + def <<(item) + @items << item self.signal unless self.empty? end - def <<(item) - enqueue(item) + def enqueue(*items) + @items.concat(items) + + self.signal unless self.empty? end def dequeue @@ -91,7 +93,7 @@ def limited? @items.size >= @limit end - def enqueue item + def <<(item) while limited? @full.wait end @@ -99,6 +101,19 @@ def enqueue item super end + def enqueue *items + while !items.empty? + while limited? + @full.wait + end + + available = @limit - @items.size + @items.concat(items.shift(available)) + + self.signal unless self.empty? + end + end + def dequeue item = super diff --git a/spec/async/queue_spec.rb b/spec/async/queue_spec.rb index 205dcb15..9a1b2533 100644 --- a/spec/async/queue_spec.rb +++ b/spec/async/queue_spec.rb @@ -42,6 +42,18 @@ end end + it 'can enqueue multiple items' do + items = Array.new(10) { rand(10) } + + reactor.async do |task| + subject.enqueue(*items) + end + + items.each do |item| + expect(subject.dequeue).to be == item + end + end + it 'can dequeue items asynchronously' do reactor.async do |task| subject << 1 @@ -53,6 +65,14 @@ end end + describe '#<<' do + it 'adds an item to the queue' do + subject << :item + expect(subject.size).to be == 1 + expect(subject.dequeue).to be == :item + end + end + describe '#size' do it 'returns queue size' do expect(subject.size).to be == 0 @@ -117,7 +137,17 @@ subject.enqueue(10) expect(subject).to be_limited end - + + it 'enqueues items up to a limit' do + items = Array.new(2) { rand(10) } + reactor.async do + subject.enqueue(*items) + end + + expect(subject.size).to be 1 + expect(subject.dequeue).to be == items.first + end + it 'should resume waiting tasks in order' do total_resumed = 0 total_dequeued = 0 @@ -138,4 +168,27 @@ expect(total_resumed).to be == total_dequeued end end + + describe '#<<' do + context 'when queue is limited' do + before do + subject << :item1 + expect(subject.size).to be == 1 + expect(subject).to be_limited + end + + it 'waits until a queue is dequeued' do + reactor.async do + subject << :item2 + end + + reactor.async do |task| + task.sleep 0.01 + expect(subject.items).to contain_exactly :item1 + subject.dequeue + expect(subject.items).to contain_exactly :item2 + end + end + end + end end