Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions lib/async/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ def empty?
@items.empty?
end

def enqueue(item)
@items.push(item)
def <<(item)
@items << item

self.signal unless self.empty?
end

def <<(item)
enqueue(item)
def enqueue(*items)
@items.concat(items)

self.signal unless self.empty?
end

def dequeue
Expand Down Expand Up @@ -91,14 +93,27 @@ def limited?
@items.size >= @limit
end

def enqueue item
def <<(item)
while limited?
@full.wait
end

super
end

def enqueue *items
while !items.empty?
while limited?
@full.wait
end

available = @limit - @items.size
@items.concat(items.shift(available))

self.signal unless self.empty?
end
end

def dequeue
item = super

Expand Down
55 changes: 54 additions & 1 deletion spec/async/queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@
end
end

it 'can enqueue multiple items' do
items = Array.new(10) { rand(10) }

reactor.async do |task|
subject.enqueue(*items)
end

items.each do |item|
expect(subject.dequeue).to be == item
end
end

it 'can dequeue items asynchronously' do
reactor.async do |task|
subject << 1
Expand All @@ -53,6 +65,14 @@
end
end

describe '#<<' do
it 'adds an item to the queue' do
subject << :item
expect(subject.size).to be == 1
expect(subject.dequeue).to be == :item
end
end

describe '#size' do
it 'returns queue size' do
expect(subject.size).to be == 0
Expand Down Expand Up @@ -117,7 +137,17 @@
subject.enqueue(10)
expect(subject).to be_limited
end


it 'enqueues items up to a limit' do
items = Array.new(2) { rand(10) }
reactor.async do
subject.enqueue(*items)
end

expect(subject.size).to be 1
expect(subject.dequeue).to be == items.first
end

it 'should resume waiting tasks in order' do
total_resumed = 0
total_dequeued = 0
Expand All @@ -138,4 +168,27 @@
expect(total_resumed).to be == total_dequeued
end
end

describe '#<<' do
context 'when queue is limited' do
before do
subject << :item1
expect(subject.size).to be == 1
expect(subject).to be_limited
end

it 'waits until a queue is dequeued' do
reactor.async do
subject << :item2
end

reactor.async do |task|
task.sleep 0.01
expect(subject.items).to contain_exactly :item1
subject.dequeue
expect(subject.items).to contain_exactly :item2
end
end
end
end
end