New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Queue#peek to fetch the head object in queue, without removing it. #1698

Open
wants to merge 2 commits into
base: trunk
from

Conversation

4 participants
@tagomoris

tagomoris commented Sep 18, 2017

This feature is required for cases like below:

    queue = Queue.new

    10.times do
      Thread.new do
        queue << Task.new(at: Time.now + 30)
      end
    end

    Thread.new do # executor
      while true
        if queue.peek.to_be_fired?(Time.now)
          task = queue.pop
          # processing...
        end
        sleep INTERVAL
      end
    end

I have many use cases to enqueue tasks, which have conditions to be fetched or not. Queue#peek provides possibility to handle such requirement.

@drbrain

This comment has been minimized.

Show comment
Hide comment
@drbrain

drbrain Sep 18, 2017

Member

What if another thread pops the peek’d object and the condition is invalid? I think this is only safe for a single consumer so the documentation should have a note

Member

drbrain commented Sep 18, 2017

What if another thread pops the peek’d object and the condition is invalid? I think this is only safe for a single consumer so the documentation should have a note

@tagomoris

This comment has been minimized.

Show comment
Hide comment
@tagomoris

tagomoris Sep 19, 2017

@drbrain Correct - it's just for a single consumer.

But right now, I have another idea to get a solution for my problem by adding block argument to Queue#pop, instead of adding Queue#peek:

Thread.new do # executor
  while true
    if task = queue.pop{|task| task.to_be_fired?(Time.now) } # block is evaluated in mutex lock
      # processing...
    end
    sleep INTERVAL
  end
end

This idea can be applied for the case of multiple consumers.
But this fix requires to add mutex synchronization everywhere in code of Queue... Hmm.

tagomoris commented Sep 19, 2017

@drbrain Correct - it's just for a single consumer.

But right now, I have another idea to get a solution for my problem by adding block argument to Queue#pop, instead of adding Queue#peek:

Thread.new do # executor
  while true
    if task = queue.pop{|task| task.to_be_fired?(Time.now) } # block is evaluated in mutex lock
      # processing...
    end
    sleep INTERVAL
  end
end

This idea can be applied for the case of multiple consumers.
But this fix requires to add mutex synchronization everywhere in code of Queue... Hmm.

@tagomoris

This comment has been minimized.

Show comment
Hide comment
@tagomoris

tagomoris Sep 26, 2017

The approach to accept block (to pop items) requires too large patch (to rewrite almost entire Queue), so it looks unacceptable.
I pushed another commit on this pull-request to add thread unsafe.

What do you think about it?

tagomoris commented Sep 26, 2017

The approach to accept block (to pop items) requires too large patch (to rewrite almost entire Queue), so it looks unacceptable.
I pushed another commit on this pull-request to add thread unsafe.

What do you think about it?

@matthewd

This comment has been minimized.

Show comment
Hide comment
@matthewd

matthewd Sep 26, 2017

IMO that makes it sound too unsafe... by itself, it's no more "unsafe" than something like q.pop unless q.empty?

The above example is indeed unsafe for multiple consumers, but that's not being included in the documentation. (And could be fixed by the caller adding a separate consumer-only mutex around the peek-check-pop.)

matthewd commented Sep 26, 2017

IMO that makes it sound too unsafe... by itself, it's no more "unsafe" than something like q.pop unless q.empty?

The above example is indeed unsafe for multiple consumers, but that's not being included in the documentation. (And could be fixed by the caller adding a separate consumer-only mutex around the peek-check-pop.)

@eregon

This comment has been minimized.

Show comment
Hide comment
@eregon

eregon Sep 26, 2017

Member

Isn't the non-block version Queue.pop(true) enough for this?
Then you immediately get an exception if the queue was empty, or its pops an element if there was one available.

Member

eregon commented Sep 26, 2017

Isn't the non-block version Queue.pop(true) enough for this?
Then you immediately get an exception if the queue was empty, or its pops an element if there was one available.

@eregon

This comment has been minimized.

Show comment
Hide comment
@eregon

eregon Sep 26, 2017

Member

We could also have a blocking version of Queue#pop with a timeout, that's easy to implement and safer.

Member

eregon commented Sep 26, 2017

We could also have a blocking version of Queue#pop with a timeout, that's easy to implement and safer.

@matthewd

This comment has been minimized.

Show comment
Hide comment
@matthewd

matthewd Sep 26, 2017

@eregon that doesn't put the item back onto the head of the queue if the condition is false

(a timeout on Queue#pop would certainly be nice -- but it seems unrelated to this use case)

matthewd commented Sep 26, 2017

@eregon that doesn't put the item back onto the head of the queue if the condition is false

(a timeout on Queue#pop would certainly be nice -- but it seems unrelated to this use case)

@eregon

This comment has been minimized.

Show comment
Hide comment
@eregon

eregon Sep 26, 2017

Member

Right, I missed that part.

In such a case which only works with one consumer, maybe the simplest is just to use a custom class using an Array and a Mutex, which synchronizes push and pop:

def push(e)
  @mutex.synchronize do
    @array.push(e)
  end
end

def pop
  @mutex.synchronize do
    unless @array.empty?
      if yield @array.first
        @array.shift
      end
    end
  end
end

That seems better than adding a method to Queue that would only make sense when there is only one consumer.

Member

eregon commented Sep 26, 2017

Right, I missed that part.

In such a case which only works with one consumer, maybe the simplest is just to use a custom class using an Array and a Mutex, which synchronizes push and pop:

def push(e)
  @mutex.synchronize do
    @array.push(e)
  end
end

def pop
  @mutex.synchronize do
    unless @array.empty?
      if yield @array.first
        @array.shift
      end
    end
  end
end

That seems better than adding a method to Queue that would only make sense when there is only one consumer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment