Skip to content

Conversation

@bruno-
Copy link
Contributor

@bruno- bruno- commented Dec 24, 2020

This PR adds a new feature (and a performance improvement) that enables enqueuing multiple items to Async::Queue.

Use case: I'm using Async::Queue and adding ~10.000 new items to it at a time.

Types of Changes

  • Bug fix.
  • New feature.
  • Performance improvement.

Testing

  • I added new tests for my changes.
  • I ran all the tests locally.

Benchmark

TL;DR: this feature is more than 10x faster.

# file: benchmark/queue.rb

require 'async'
require_relative '../lib/async/queue'
require 'benchmark/ips'

Async do
	Benchmark.ips do |benchmark|
		benchmark.report("single") do |count|
			queue = Async::Queue.new
			1000.times do |i|
				queue.enqueue(i)
			end
		end
		
		benchmark.report("multi") do |count|
			queue = Async::Queue.new
			array = 1000.times.map(&:itself) # unnecessary iteration to make examples as similar as possible
			queue.enqueue(*array)
		end
		
		benchmark.compare!
	end
end

Results:

Warming up --------------------------------------
              single   284.059B i/100ms
               multi   958.698B i/100ms
Calculating -------------------------------------
              single    686.066T (±15.1%) i/s -      3.284Q in   4.994371s
               multi      8.676Q (±14.0%) i/s -     41.593Q in   4.984121s

Comparison:
               multi: 8675609615004795.0 i/s
              single: 686065965589408.9 i/s - 12.65x  (± 0.00) slower


def enqueue(item)
@items.push(item)
def enqueue(*items)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please check if this allocates a new array? I like this interface, but I just want to confirm we aren't duplicating the argument by doing this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a single new Array is allocated here. My assumption is new array is used as a local variable inside the method.
From what I can see method arguments (MyObject instances in the script below) are not duplicated.
Do you think this is a problem?


Here's the script I used to test this:

require 'async'
require_relative 'lib/async/queue'
require 'memory'

class MyObject
end

Async do
	array = Array.new(3) { MyObject.new }
	queue = Async::Queue.new

	Memory.report {
		queue.enqueue(*array)
	}.print
end

Here are the results:

# Memory Profile

- Total Allocated: (7.00 B in 1 allocations)
- Total Retained: (0 B in 0 allocations)

## By Gem (40.00 B in 1 allocations)

- (40.00 B in 1 allocations)    other

## By File (40.00 B in 1 allocations)

- (40.00 B in 1 allocations)    test.rb

## By Location (40.00 B in 1 allocations)

- (40.00 B in 1 allocations)    test.rb:12

## By Class (40.00 B in 1 allocations)

- (40.00 B in 1 allocations)    Array

## Strings By Gem

## Strings By Location

@ioquatix
Copy link
Member

Also, should we consider a way to bulk dequeue items?

@ioquatix
Copy link
Member

ioquatix commented Dec 25, 2020

i.e.

queue.dequeue(10)

perhaps a similar interface to Array#pop(n) or Array#last(n)

@bruno-
Copy link
Contributor Author

bruno- commented Dec 25, 2020

Also, should we consider a way to bulk dequeue items?
i.e.
queue.dequeue(10)

Hmm, a tricky scenario is: a queue has 5 elements, user dequeues 10 elements - what happens? Just like Array.pop(10) we return existing 5 elements and a user does not wait? So, if a #dequeue has an integer argument a user never waits - even if a queue is completely empty?

I don't have an immediate need for this, but I can get involved if you think we should explore this feature.
I propose opening another issue for this.

@ioquatix
Copy link
Member

There is one other option...

queue.concat(items) # No temporary array.

vs

queue.enqueue(*items) # Allocates temporary copy.

I know the ergonomics isn't as good.

But I prefer avoiding

@bruno- bruno- force-pushed the queue-enqueue-multiple-items branch 2 times, most recently from cf634c4 to 3e01ce5 Compare December 27, 2020 17:37
@bruno-
Copy link
Contributor Author

bruno- commented Dec 27, 2020

@ioquatix here's a follow up on our slack discussion.

TL;DR:

Async::LimitedQueue#enqueue is a bit tricky. Performance depends on the limit argument:

  • If limit is small (eg 1) performance optimizations with Array#concat don't bring any performance gains - even when enqueued array is large.
  • If limit is close to enqueued array size performance gains of using Array#concat become more obvious.

In the end I went with Async::LimitedQueue#enqueue implementation that offers best overall benchmark results.
The downside of the proposed implementation is that it allocates a lot of temporary arrays.

Alternatively, I think we could go with the naive implementation items.each { |item| self.<<(item) }. Yes, it's very slow when enqueuing large arrays and limit is large, but in that case we may advise users to implement their own solution that's optimized for their use case.

What are your thoughts?

Follow up tasks

  • Add efficient Async::Queue#<<
  • Bugfix Async::LimitedQueue#enqueue, add specs
  • Add efficient Async::LimitedQueue#<<
  • Check LimitedQueue performance before/after for one item and 10_000 items vs the naive approach
require 'async'
require_relative '../lib/async/queue'
require 'benchmark/ips'

class Async::LimitedQueue
	# Naive solution.
	def enqueue0(*items)
		items.each do |item|
			self.<<(item)
		end
	end

	def enqueue1(*items)
		while !items.empty?
			while limited?
				@full.wait
			end
			
			available = @limit - @items.size
			# One array allocation per cycle.
			@items.concat(items.shift(available))
			
			self.signal unless self.empty?
		end
	end

	# Optimizes best case scenario with Array#concat
	def enqueue2(*items)
		available = @limit - @items.size

		if available > 0 && available >= items.size
			@items.concat(items)

			self.signal unless self.empty?
		else
			items.each do |item|
				self.<<(item)
			end
		end
	end

	def enqueue3(*items)
		available = @limit - @items.size

		if available > 0 && available >= items.size
			# optimizes best case scenario with Array#concat
			@items.concat(items)

			self.signal unless self.empty?
		else
			# First addition is always bulk.
			# Three arrays are allocated in this block: another 'enqueue3' call,
			# 'items.slice' and 'items.drop'.
			enqueue3(items.slice(0, available))

			items.drop(available).each do |item|
				self.<<(item)
			end
		end
	end

	def enqueue4(*items)
		available = @limit - @items.size

		if available > 0 && available >= items.size
			# optimizes best case scenario with Array#concat
			@items.concat(items)

			self.signal unless self.empty?
		else
			# same approach as in #enqueue1
			while !items.empty?
				while limited?
					@full.wait
				end
				
				available = @limit - @items.size
				# One array allocation per cycle.
				@items.concat(items.shift(available))
				
				self.signal unless self.empty?
			end
		end
	end
end
		
Async do |task|
	ARRAY_SIZE = 100_000
	LIMIT = 99_999

	Benchmark.ips do |benchmark|
		array = Array.new(ARRAY_SIZE) { rand(10) }

		benchmark.report("#enqueue0") do |count|
			queue = Async::LimitedQueue.new(LIMIT)

			task.async do
				queue.enqueue0(*array)
			end

			task.async do
				ARRAY_SIZE.times { queue.dequeue }
			end
		end
		
		benchmark.report("#enqueue1") do |count|
			queue = Async::LimitedQueue.new(LIMIT)

			task.async do
				queue.enqueue1(*array)
			end

			task.async do
				ARRAY_SIZE.times { queue.dequeue }
			end
		end

		benchmark.report("#enqueue2") do |count|
			queue = Async::LimitedQueue.new(LIMIT)

			task.async do
				queue.enqueue2(*array)
			end

			task.async do
				ARRAY_SIZE.times { queue.dequeue }
			end
		end
		
		benchmark.report("#enqueue3") do |count|
			queue = Async::LimitedQueue.new(LIMIT)

			task.async do
				queue.enqueue3(*array)
			end

			task.async do
				ARRAY_SIZE.times { queue.dequeue }
			end
		end

		benchmark.report("#enqueue4") do |count|
			queue = Async::LimitedQueue.new(LIMIT)

			task.async do
				queue.enqueue4(*array)
			end

			task.async do
				ARRAY_SIZE.times { queue.dequeue }
			end
		end

		benchmark.compare!
	end
end

Benchmark results:

limited_queue_benchmark_results.txt

@ioquatix ioquatix merged commit 426c42e into socketry:main May 4, 2022
@ioquatix
Copy link
Member

ioquatix commented May 4, 2022

Sorry for taking so long to process this PR. I'll also backport it to Async 1.x

ioquatix added a commit that referenced this pull request May 4, 2022
Co-authored-by: Samuel Williams <samuel.williams@oriontransfer.co.nz>
@bruno- bruno- deleted the queue-enqueue-multiple-items branch May 4, 2022 10:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants