Skip to content

Commit

Permalink
After many requests, I have finally added delay_adjust(). This resets
Browse files Browse the repository at this point in the history
a specific timer (as set with delay_set) to be a new number of seconds
into the future.  This is good for watchdogs and timeouts.
  • Loading branch information
rcaputo committed Jul 28, 2003
1 parent 084fb93 commit e4bfb3f
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 34 deletions.
45 changes: 44 additions & 1 deletion lib/POE/Kernel.pm
Expand Up @@ -1610,7 +1610,8 @@ sub alarm_remove {
}

# Move an alarm to a new time. This virtually removes the alarm and
# re-adds it somewhere else.
# re-adds it somewhere else. In reality, adjust_priority() is
# optimized for this sort of thing.

sub alarm_adjust {
my ($self, $alarm_id, $delta) = @_;
Expand Down Expand Up @@ -1665,6 +1666,30 @@ sub delay_set {
);
}

# Move a delay to a new offset from time(). As with alarm_adjust(),
# this is optimized internally for this sort of activity.

sub delay_adjust {
my ($self, $alarm_id, $seconds) = @_;

unless (defined $alarm_id) {
$self->_explain_usage("undefined delay id in delay_adjust()");
$! = EINVAL;
return;
}

unless (defined $seconds) {
$self->_explain_usage("undefined delay seconds in delay_abjust()");
$! = EINVAL;
return;
}

my $my_delay = sub {
$_[0]->[EV_SESSION] == $kr_active_session;
};
return $kr_queue->set_priority($alarm_id, $my_delay, time() + $seconds);
}

# Remove all alarms for the current session.

sub alarm_remove_all {
Expand Down Expand Up @@ -2163,6 +2188,9 @@ June 2001 alarm and delay methods:
# Adjust an existing alarm by a number of seconds.
$kernel->alarm_adjust( $alarm_id, $number_of_seconds );
# Refresh an existing delay to a number of seconds in the future.
$kernel->delay_adjust( $delay_id, $number_of_seconds_hence );
# Remove a specific alarm, regardless whether it shares a name with
# others.
$kernel->alarm_remove( $alarm_id );
Expand Down Expand Up @@ -2712,6 +2740,21 @@ get the time the delay will be dispatched. It uses whichever time()
POE::Kernel does, which may be Time::HiRes' high-resolution timer, if
that's available.
=item delay_adjust DELAY_ID, SECONDS
delay_adjust adjusts an existing delay to be a number of seconds in
the future. It is useful for refreshing watchdog timers, for
instance.
# Refresh a delay for 10 seconds into the future.
$new_time = $kernel->delay_adjust( $delay_id, 10 );
On failure, it returns false and sets $! to a reason for the failure.
That may be EINVAL if the delay ID or the seconds are bad values. It
could also be ESRCH if the delay doesn't exist (perhaps it already was
dispatched). $! may also contain EPERM if the delay doesn't belong to
the session trying to adjust it.
=back
=head2 Numeric Session IDs and Symbolic Session Names (Aliases)
Expand Down
10 changes: 9 additions & 1 deletion lib/POE/Queue.pm
Expand Up @@ -38,6 +38,7 @@ POE::Queue - documentation for POE's priority queue interface
@items = $q->peek_items(\&filter, $count); # $count is optional
$new_priority = $q->adjust_priority($id, \&filter, $delta);
$new_priority = $q->set_priority($id, \&filter, $priority);
=head1 DESCRIPTION
Expand Down Expand Up @@ -147,7 +148,7 @@ returned by peek_items() if $count is too low.
=item $new_priority = $q->adjust_priority($id, \&filter, $delta);
Changes the priority of an item by +$delta (which can be negative).
Changes the priority of an item by $delta (which can be negative).
The item is identified by its $id, but the change will only happen if
the supplied filter function returns true. Returns $new_priority,
which is the priority of the item after it has been adjusted.
Expand All @@ -156,6 +157,13 @@ This filter function allows anything to be removed.
sub filter { 1 }
=item $new_priority = $q->set_priority($id, \&filter, $priority);
Changes the priority of an item to $priority. The item is identified
by its $id, but the change will only happen if the supplied filter
function returns true when applied to the event payload. Returns
$new_priority, which should match $priority.
=back
=head1 SEE ALSO
Expand Down
103 changes: 95 additions & 8 deletions lib/POE/Queue/Array.pm
Expand Up @@ -10,6 +10,8 @@ use vars qw(@ISA);

use POSIX qw(ESRCH EPERM);

sub DEBUG () { 0 }

### Helpful offsets.

sub ITEM_PRIORITY () { 0 }
Expand Down Expand Up @@ -63,25 +65,29 @@ sub enqueue {
# Special case: No items in the queue. The queue IS the item.
unless (@$self) {
$self->[0] = $item_to_enqueue;
DEBUG and warn $self->_dump_splice(0);
return $item_id;
}

# Special case: The new item belongs at the end of the queue.
if ($priority >= $self->[-1]->[ITEM_PRIORITY]) {
push @$self, $item_to_enqueue;
DEBUG and warn $self->_dump_splice(@$self-1);
return $item_id;
}

# Special case: The new item belongs at the head of the queue.
if ($priority < $self->[0]->[ITEM_PRIORITY]) {
unshift @$self, $item_to_enqueue;
DEBUG and warn $self->_dump_splice(0);
return $item_id;
}

# Special case: There are only two items in the queue. This item
# naturally belongs between them.
if (@$self == 2) {
splice @$self, 1, 0, $item_to_enqueue;
DEBUG and warn $self->_dump_splice(1);
return $item_id;
}

Expand All @@ -97,6 +103,7 @@ sub enqueue {
$priority < $self->[$index-1]->[ITEM_PRIORITY]
);
splice @$self, $index, 0, $item_to_enqueue;
DEBUG and warn $self->_dump_splice($index);
return $item_id;
}

Expand Down Expand Up @@ -150,6 +157,7 @@ sub _insert_item {
# bound point.
if ($upper < $lower) {
splice @$self, $lower, 0, $item;
DEBUG and warn $self->_dump_splice($lower);
return;
}

Expand Down Expand Up @@ -177,6 +185,7 @@ sub _insert_item {
)
);
splice @$self, $midpoint, 0, $item;
DEBUG and warn $self->_dump_splice($midpoint);
return;
}

Expand Down Expand Up @@ -209,8 +218,11 @@ sub _find_item {
my $midpoint = ($upper + $lower) >> 1;

# The streams have crossed. That's bad.
die "internal inconsistency: event should have been found"
if $upper < $lower;
if ($upper < $lower) {
my @priorities = map {$_->[ITEM_PRIORITY]} @$self;
warn "internal inconsistency: event should have been found";
die "these should be in numeric order: @priorities";
}

# The key at the midpoint is too high. The element just below
# the midpoint becomes the new upper bound.
Expand Down Expand Up @@ -304,54 +316,126 @@ sub remove_items {

### Adjust the priority of an item by a relative amount. Adds $delta
### to the priority of the $id'd object (if it matches $filter), and
### moves it in the queue. This tries to be clever by not scanning
### the queue more than necessary.
### moves it in the queue.

sub adjust_priority {
my ($self, $id, $filter, $delta) = @_;

my $priority = $item_priority{$id};
unless (defined $priority) {
my $old_priority = $item_priority{$id};
unless (defined $old_priority) {
$! = ESRCH;
return;
}

# Find that darn item.
my $item_index = $self->_find_item($id, $priority);
my $item_index = $self->_find_item($id, $old_priority);

# Test the item against the filter.
unless ($filter->($self->[$item_index]->[ITEM_PAYLOAD])) {
$! = EPERM;
return;
}

# Nothing to do if the delta is zero.
# Nothing to do if the delta is zero. -><- Actually we may need to
# ensure that the item is moved to the end of its current priority
# bucket, since it should have "moved".
return $self->[$item_index]->[ITEM_PRIORITY] unless $delta;

# Remove the item, and adjust its priority.
my $item = splice(@$self, $item_index, 1);
my $new_priority = $item->[ITEM_PRIORITY] += $delta;
$item_priority{$id} = $new_priority;

$self->_reinsert_item($new_priority, $delta, $item_index, $item);
}

### Set the priority to a specific amount. Replaces the item's
### priority with $new_priority (if it matches $filter), and moves it
### to the new location in the queue.

sub set_priority {
my ($self, $id, $filter, $new_priority) = @_;

my $old_priority = $item_priority{$id};
unless (defined $old_priority) {
$! = ESRCH;
return;
}

# Nothing to do if the old and new priorities match. -><- Actually
# we may need to ensure that the item is moved to the end of its
# current priority bucket, since it should have "moved".
return $new_priority if $new_priority == $old_priority;

# Find that darn item.
my $item_index = $self->_find_item($id, $old_priority);

# Test the item against the filter.
unless ($filter->($self->[$item_index]->[ITEM_PAYLOAD])) {
$! = EPERM;
return;
}

# Remove the item, and calculate the delta.
my $item = splice(@$self, $item_index, 1);
my $delta = $new_priority - $old_priority;
$item->[ITEM_PRIORITY] = $item_priority{$id} = $new_priority;

$self->_reinsert_item($new_priority, $delta, $item_index, $item);
}

### Sanity-check the results of an item insert. Verify that it
### belongs where it was put. Only called during debugging.

sub _dump_splice {
my ($self, $index) = @_;
my @return;
my $at = $self->[$index]->[ITEM_PRIORITY];
if ($index > 0) {
my $before = $self->[$index-1]->[ITEM_PRIORITY];
push @return, "before($before)";
Carp::confess "out of order: $before should be < $at" if $before > $at;
}
push @return, "at($at)";
if ($index < $#$self) {
my $after = $self->[$index+1]->[ITEM_PRIORITY];
push @return, "after($after)";
my @priorities = map {$_->[ITEM_PRIORITY]} @$self;
Carp::confess "out of order: $at should be < $after (@priorities)"
if $at >= $after;
}
return "@return";
}

### Reinsert an item into the queue. It has just been removed by
### adjust_priority() or set_priority() and needs to be replaced.
### This tries to be clever by not doing more work than necessary.

sub _reinsert_item {
my ($self, $new_priority, $delta, $item_index, $item) = @_;

# Now insert it back. The special cases are duplicates from
# enqueue(), but the small and large queue cases avoid unnecessarily
# scanning the queue.

# Special case: No events in the queue. The queue IS the item.
unless (@$self) {
$self->[0] = $item;
DEBUG and warn $self->_dump_splice(0);
return $new_priority;
}

# Special case: The item belongs at the end of the queue.
if ($new_priority >= $self->[-1]->[ITEM_PRIORITY]) {
push @$self, $item;
DEBUG and warn $self->_dump_splice(@$self-1);
return $new_priority;
}

# Special case: The item blenogs at the head of the queue.
if ($new_priority < $self->[0]->[ITEM_PRIORITY]) {
unshift @$self, $item;
DEBUG and warn $self->_dump_splice(0);
return $new_priority;
}

Expand All @@ -360,6 +444,7 @@ sub adjust_priority {

if (@$self == 2) {
splice @$self, 1, 0, $item;
DEBUG and warn $self->_dump_splice(1);
return $new_priority;
}

Expand All @@ -376,6 +461,7 @@ sub adjust_priority {
$new_priority >= $self->[$index]->[ITEM_PRIORITY]
);
splice @$self, $index, 0, $item;
DEBUG and warn $self->_dump_splice($index);
return $new_priority;
}

Expand All @@ -387,6 +473,7 @@ sub adjust_priority {
$new_priority < $self->[$index-1]->[ITEM_PRIORITY]
);
splice @$self, $index, 0, $item;
DEBUG and warn $self->_dump_splice($index);
return $new_priority;
}

Expand Down

0 comments on commit e4bfb3f

Please sign in to comment.