From e4bfb3f546803d9c0177240774beaebbd13f7dc3 Mon Sep 17 00:00:00 2001 From: Rocco Caputo Date: Mon, 28 Jul 2003 05:14:14 -0400 Subject: [PATCH] After many requests, I have finally added delay_adjust(). This resets a specific timer (as set with delay_set) to be a new number of seconds into the future. This is good for watchdogs and timeouts. --- lib/POE/Kernel.pm | 45 +++++++++++++++++- lib/POE/Queue.pm | 10 +++- lib/POE/Queue/Array.pm | 103 +++++++++++++++++++++++++++++++++++++---- tests/001_queue.t | 85 ++++++++++++++++++++++++---------- 4 files changed, 209 insertions(+), 34 deletions(-) diff --git a/lib/POE/Kernel.pm b/lib/POE/Kernel.pm index a9a631b33..aa8d1de9c 100644 --- a/lib/POE/Kernel.pm +++ b/lib/POE/Kernel.pm @@ -1610,7 +1610,8 @@ sub alarm_remove { } # Move an alarm to a new time. This virtually removes the alarm and -# re-adds it somewhere else. +# re-adds it somewhere else. In reality, adjust_priority() is +# optimized for this sort of thing. sub alarm_adjust { my ($self, $alarm_id, $delta) = @_; @@ -1665,6 +1666,30 @@ sub delay_set { ); } +# Move a delay to a new offset from time(). As with alarm_adjust(), +# this is optimized internally for this sort of activity. + +sub delay_adjust { + my ($self, $alarm_id, $seconds) = @_; + + unless (defined $alarm_id) { + $self->_explain_usage("undefined delay id in delay_adjust()"); + $! = EINVAL; + return; + } + + unless (defined $seconds) { + $self->_explain_usage("undefined delay seconds in delay_abjust()"); + $! = EINVAL; + return; + } + + my $my_delay = sub { + $_[0]->[EV_SESSION] == $kr_active_session; + }; + return $kr_queue->set_priority($alarm_id, $my_delay, time() + $seconds); +} + # Remove all alarms for the current session. sub alarm_remove_all { @@ -2163,6 +2188,9 @@ June 2001 alarm and delay methods: # Adjust an existing alarm by a number of seconds. $kernel->alarm_adjust( $alarm_id, $number_of_seconds ); + # Refresh an existing delay to a number of seconds in the future. + $kernel->delay_adjust( $delay_id, $number_of_seconds_hence ); + # Remove a specific alarm, regardless whether it shares a name with # others. $kernel->alarm_remove( $alarm_id ); @@ -2712,6 +2740,21 @@ get the time the delay will be dispatched. It uses whichever time() POE::Kernel does, which may be Time::HiRes' high-resolution timer, if that's available. +=item delay_adjust DELAY_ID, SECONDS + +delay_adjust adjusts an existing delay to be a number of seconds in +the future. It is useful for refreshing watchdog timers, for +instance. + + # Refresh a delay for 10 seconds into the future. + $new_time = $kernel->delay_adjust( $delay_id, 10 ); + +On failure, it returns false and sets $! to a reason for the failure. +That may be EINVAL if the delay ID or the seconds are bad values. It +could also be ESRCH if the delay doesn't exist (perhaps it already was +dispatched). $! may also contain EPERM if the delay doesn't belong to +the session trying to adjust it. + =back =head2 Numeric Session IDs and Symbolic Session Names (Aliases) diff --git a/lib/POE/Queue.pm b/lib/POE/Queue.pm index 3fcb62192..85160e43b 100644 --- a/lib/POE/Queue.pm +++ b/lib/POE/Queue.pm @@ -38,6 +38,7 @@ POE::Queue - documentation for POE's priority queue interface @items = $q->peek_items(\&filter, $count); # $count is optional $new_priority = $q->adjust_priority($id, \&filter, $delta); + $new_priority = $q->set_priority($id, \&filter, $priority); =head1 DESCRIPTION @@ -147,7 +148,7 @@ returned by peek_items() if $count is too low. =item $new_priority = $q->adjust_priority($id, \&filter, $delta); -Changes the priority of an item by +$delta (which can be negative). +Changes the priority of an item by $delta (which can be negative). The item is identified by its $id, but the change will only happen if the supplied filter function returns true. Returns $new_priority, which is the priority of the item after it has been adjusted. @@ -156,6 +157,13 @@ This filter function allows anything to be removed. sub filter { 1 } +=item $new_priority = $q->set_priority($id, \&filter, $priority); + +Changes the priority of an item to $priority. The item is identified +by its $id, but the change will only happen if the supplied filter +function returns true when applied to the event payload. Returns +$new_priority, which should match $priority. + =back =head1 SEE ALSO diff --git a/lib/POE/Queue/Array.pm b/lib/POE/Queue/Array.pm index 544c19fab..7e55342fa 100644 --- a/lib/POE/Queue/Array.pm +++ b/lib/POE/Queue/Array.pm @@ -10,6 +10,8 @@ use vars qw(@ISA); use POSIX qw(ESRCH EPERM); +sub DEBUG () { 0 } + ### Helpful offsets. sub ITEM_PRIORITY () { 0 } @@ -63,18 +65,21 @@ sub enqueue { # Special case: No items in the queue. The queue IS the item. unless (@$self) { $self->[0] = $item_to_enqueue; + DEBUG and warn $self->_dump_splice(0); return $item_id; } # Special case: The new item belongs at the end of the queue. if ($priority >= $self->[-1]->[ITEM_PRIORITY]) { push @$self, $item_to_enqueue; + DEBUG and warn $self->_dump_splice(@$self-1); return $item_id; } # Special case: The new item belongs at the head of the queue. if ($priority < $self->[0]->[ITEM_PRIORITY]) { unshift @$self, $item_to_enqueue; + DEBUG and warn $self->_dump_splice(0); return $item_id; } @@ -82,6 +87,7 @@ sub enqueue { # naturally belongs between them. if (@$self == 2) { splice @$self, 1, 0, $item_to_enqueue; + DEBUG and warn $self->_dump_splice(1); return $item_id; } @@ -97,6 +103,7 @@ sub enqueue { $priority < $self->[$index-1]->[ITEM_PRIORITY] ); splice @$self, $index, 0, $item_to_enqueue; + DEBUG and warn $self->_dump_splice($index); return $item_id; } @@ -150,6 +157,7 @@ sub _insert_item { # bound point. if ($upper < $lower) { splice @$self, $lower, 0, $item; + DEBUG and warn $self->_dump_splice($lower); return; } @@ -177,6 +185,7 @@ sub _insert_item { ) ); splice @$self, $midpoint, 0, $item; + DEBUG and warn $self->_dump_splice($midpoint); return; } @@ -209,8 +218,11 @@ sub _find_item { my $midpoint = ($upper + $lower) >> 1; # The streams have crossed. That's bad. - die "internal inconsistency: event should have been found" - if $upper < $lower; + if ($upper < $lower) { + my @priorities = map {$_->[ITEM_PRIORITY]} @$self; + warn "internal inconsistency: event should have been found"; + die "these should be in numeric order: @priorities"; + } # The key at the midpoint is too high. The element just below # the midpoint becomes the new upper bound. @@ -304,20 +316,19 @@ sub remove_items { ### Adjust the priority of an item by a relative amount. Adds $delta ### to the priority of the $id'd object (if it matches $filter), and -### moves it in the queue. This tries to be clever by not scanning -### the queue more than necessary. +### moves it in the queue. sub adjust_priority { my ($self, $id, $filter, $delta) = @_; - my $priority = $item_priority{$id}; - unless (defined $priority) { + my $old_priority = $item_priority{$id}; + unless (defined $old_priority) { $! = ESRCH; return; } # Find that darn item. - my $item_index = $self->_find_item($id, $priority); + my $item_index = $self->_find_item($id, $old_priority); # Test the item against the filter. unless ($filter->($self->[$item_index]->[ITEM_PAYLOAD])) { @@ -325,7 +336,9 @@ sub adjust_priority { return; } - # Nothing to do if the delta is zero. + # Nothing to do if the delta is zero. -><- Actually we may need to + # ensure that the item is moved to the end of its current priority + # bucket, since it should have "moved". return $self->[$item_index]->[ITEM_PRIORITY] unless $delta; # Remove the item, and adjust its priority. @@ -333,6 +346,74 @@ sub adjust_priority { my $new_priority = $item->[ITEM_PRIORITY] += $delta; $item_priority{$id} = $new_priority; + $self->_reinsert_item($new_priority, $delta, $item_index, $item); +} + +### Set the priority to a specific amount. Replaces the item's +### priority with $new_priority (if it matches $filter), and moves it +### to the new location in the queue. + +sub set_priority { + my ($self, $id, $filter, $new_priority) = @_; + + my $old_priority = $item_priority{$id}; + unless (defined $old_priority) { + $! = ESRCH; + return; + } + + # Nothing to do if the old and new priorities match. -><- Actually + # we may need to ensure that the item is moved to the end of its + # current priority bucket, since it should have "moved". + return $new_priority if $new_priority == $old_priority; + + # Find that darn item. + my $item_index = $self->_find_item($id, $old_priority); + + # Test the item against the filter. + unless ($filter->($self->[$item_index]->[ITEM_PAYLOAD])) { + $! = EPERM; + return; + } + + # Remove the item, and calculate the delta. + my $item = splice(@$self, $item_index, 1); + my $delta = $new_priority - $old_priority; + $item->[ITEM_PRIORITY] = $item_priority{$id} = $new_priority; + + $self->_reinsert_item($new_priority, $delta, $item_index, $item); +} + +### Sanity-check the results of an item insert. Verify that it +### belongs where it was put. Only called during debugging. + +sub _dump_splice { + my ($self, $index) = @_; + my @return; + my $at = $self->[$index]->[ITEM_PRIORITY]; + if ($index > 0) { + my $before = $self->[$index-1]->[ITEM_PRIORITY]; + push @return, "before($before)"; + Carp::confess "out of order: $before should be < $at" if $before > $at; + } + push @return, "at($at)"; + if ($index < $#$self) { + my $after = $self->[$index+1]->[ITEM_PRIORITY]; + push @return, "after($after)"; + my @priorities = map {$_->[ITEM_PRIORITY]} @$self; + Carp::confess "out of order: $at should be < $after (@priorities)" + if $at >= $after; + } + return "@return"; +} + +### Reinsert an item into the queue. It has just been removed by +### adjust_priority() or set_priority() and needs to be replaced. +### This tries to be clever by not doing more work than necessary. + +sub _reinsert_item { + my ($self, $new_priority, $delta, $item_index, $item) = @_; + # Now insert it back. The special cases are duplicates from # enqueue(), but the small and large queue cases avoid unnecessarily # scanning the queue. @@ -340,18 +421,21 @@ sub adjust_priority { # Special case: No events in the queue. The queue IS the item. unless (@$self) { $self->[0] = $item; + DEBUG and warn $self->_dump_splice(0); return $new_priority; } # Special case: The item belongs at the end of the queue. if ($new_priority >= $self->[-1]->[ITEM_PRIORITY]) { push @$self, $item; + DEBUG and warn $self->_dump_splice(@$self-1); return $new_priority; } # Special case: The item blenogs at the head of the queue. if ($new_priority < $self->[0]->[ITEM_PRIORITY]) { unshift @$self, $item; + DEBUG and warn $self->_dump_splice(0); return $new_priority; } @@ -360,6 +444,7 @@ sub adjust_priority { if (@$self == 2) { splice @$self, 1, 0, $item; + DEBUG and warn $self->_dump_splice(1); return $new_priority; } @@ -376,6 +461,7 @@ sub adjust_priority { $new_priority >= $self->[$index]->[ITEM_PRIORITY] ); splice @$self, $index, 0, $item; + DEBUG and warn $self->_dump_splice($index); return $new_priority; } @@ -387,6 +473,7 @@ sub adjust_priority { $new_priority < $self->[$index-1]->[ITEM_PRIORITY] ); splice @$self, $index, 0, $item; + DEBUG and warn $self->_dump_splice($index); return $new_priority; } diff --git a/tests/001_queue.t b/tests/001_queue.t index 35fe4eecf..114bde6df 100644 --- a/tests/001_queue.t +++ b/tests/001_queue.t @@ -12,7 +12,7 @@ sub POE::Kernel::ASSERT_DEFAULT () { 1 } sub POE::Kernel::TRACE_DEFAULT () { 1 } sub POE::Kernel::TRACE_FILENAME () { "./test-output.err" } -test_setup(1061); +test_setup(2061); use POSIX qw(EPERM ESRCH); @@ -127,8 +127,8 @@ ok_if(60, compare_lists($items[5], [60, 20, "f"])); ok_if(61, $q->get_item_count() == 0); ### Large Queue Tests. The only functions that use large queues are -### enqueue() and adjust_priority(). Large queues are over ~500 -### elements. +### enqueue(), adjust_priority(), and set_priority(). Large queues +### are over ~500 elements. sub shuffled_list { my $limit = shift() - 1; @@ -141,35 +141,72 @@ sub shuffled_list { @list; } -my @ids; -for my $major (shuffled_list(10)) { - for my $minor (shuffled_list(100)) { - my $priority = sprintf("%2d%02d", $major, $minor); - push @ids, $q->enqueue($priority, $priority); +sub is_even { !($_[0] % 2) } +sub is_odd { $_[0] % 2 } + +my $test_index = 62; + +sub verify_queue { + my $start_index = shift; + my $low_priority = -999999; + + while (my ($pri, $id, $item) = $q->dequeue_next()) { + if ($pri < 0) { + ok_if( $test_index++, + ($pri > $low_priority) && ($item - $pri == 1000), + "$item - $pri != 1000" + ); + } + else { + ok_if( $test_index++, + ($pri > $low_priority) && ($pri - $item == 1000), + "$pri - $item != 1000" + ); + } + $low_priority = $pri; } } -sub is_even { !($_[0] % 2) } -sub is_odd { $_[0] % 2 } +# Verify adjust_priority(). + +{ + my @ids; + for my $major (shuffled_list(10)) { + for my $minor (shuffled_list(100)) { + my $priority = sprintf("%2d%02d", $major, $minor); + push @ids, $q->enqueue($priority, $priority); + } + } -foreach my $id (@ids) { $q->adjust_priority($id, \&is_even, -1000); } -foreach my $id (@ids) { $q->adjust_priority($id, \&is_odd, 1000); } + foreach my $id (@ids) { $q->adjust_priority($id, \&is_even, -1000); } + foreach my $id (@ids) { $q->adjust_priority($id, \&is_odd, 1000); } -my $test_index = 62; -my $low_priority = -999999; + verify_queue($test_index); +} + +# Verify set_priority(). -while (my ($pri, $id, $item) = $q->dequeue_next()) { - if ($pri < 0) { - ok_if( $test_index++, - ($pri > $low_priority) && ($pri - $item == -1000) - ); +{ + my @id_recs; + for my $major (shuffled_list(10)) { + for my $minor (shuffled_list(100)) { + my $priority = sprintf("%2d%02d", $major, $minor); + push @id_recs, [ $q->enqueue($priority, $priority), $priority ]; + } } - else { - ok_if( $test_index++, - ($pri > $low_priority) && ($pri - 1000 == $item) - ); + + foreach my $id_rec (@id_recs) { + my ($id, $pri) = @$id_rec; + $q->set_priority($id, \&is_even, $pri - 1000); + } + foreach my $id_rec (@id_recs) { + my ($id, $pri) = @$id_rec; + $q->set_priority($id, \&is_odd, $pri + 1000); } - $low_priority = $pri; + + verify_queue($test_index); } +### And we're done. Do the results thing. + results;