Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Extract POE's queue code into POE::Queue::Array.

  • Loading branch information...
commit cbd936e9b53f517753787c71c752b2208061cc6d 1 parent 46650b6
@rcaputo authored
View
2  MANIFEST
@@ -31,6 +31,7 @@ POE/Pipe.pm
POE/Pipe/OneWay.pm
POE/Pipe/TwoWay.pm
POE/Preprocessor.pm
+POE/Queue/Array.pm
POE/Session.pm
POE/Wheel.pm
POE/Wheel/Curses.pm
@@ -87,6 +88,7 @@ samples/udp.perl
samples/watermarks.perl
samples/wheels.perl
samples/wheels2.perl
+t/001_queue.t
t/00_coverage.t
t/01_sessions.t
t/02_alarms.t
View
619 lib/POE/Kernel.pm
@@ -7,13 +7,14 @@ use strict;
use vars qw($VERSION);
$VERSION = (qw($Revision$ ))[1];
+use POE::Queue::Array;
use POSIX qw(errno_h fcntl_h sys_wait_h);
use Carp qw(carp croak confess);
use Sys::Hostname qw(hostname);
# People expect these to be lexical.
-use vars qw( $poe_kernel $poe_main_window );
+use vars qw($poe_kernel $poe_main_window);
#------------------------------------------------------------------------------
# A cheezy exporter to avoid using Exporter.
@@ -54,14 +55,6 @@ BEGIN {
# Globals, or at least package-scoped things. Data structurse were
# moved into lexicals in 0.1201.
-# Translate event IDs to absolute event due time. This is used by the
-# alarm functions to speed up finding alarms by ID.
-#
-# { $event_id => $event_due_time,
-# ...,
-# }
-my %kr_event_ids;
-
# Translate session IDs to blessed session references. Used for
# session ID to reference lookups in alias_resolve.
#
@@ -111,6 +104,9 @@ my $kr_id_index = 1;
# functions that act on the current session.
my $kr_active_session;
+# The Kernel's master queue.
+my $kr_queue;
+
# Filehandle vector sub-fields. These are used in a few places.
sub VEC_RD () { 0 }
sub VEC_WR () { 1 }
@@ -128,13 +124,12 @@ sub KR_FILENOS () { 1 } # \%kr_filenos,
sub KR_SIGNALS () { 2 } # \%kr_signals,
sub KR_ALIASES () { 3 } # \%kr_aliases,
sub KR_ACTIVE_SESSION () { 4 } # \$kr_active_session,
-sub KR_EVENTS () { 5 } # \@kr_events,
+sub KR_QUEUE () { 5 } # \$kr_queue,
sub KR_ID () { 6 } # $unique_kernel_id,
sub KR_SESSION_IDS () { 7 } # \%kr_session_ids,
sub KR_ID_INDEX () { 8 } # \$kr_id_index,
sub KR_EXTRA_REFS () { 9 } # \$kr_extra_refs,
-sub KR_EVENT_IDS () { 10 } # \%kr_event_ids,
-sub KR_SIZE () { 11 } # XXX UNUSED ???
+sub KR_SIZE () { 10 } # XXX UNUSED ???
# ]
# This flag indicates that POE::Kernel's run() method was called.
@@ -222,27 +217,25 @@ sub HS_PAUSED () { 0x01 } # The file temporarily stopped making events.
sub HS_RUNNING () { 0x02 } # The file is running and can generate events.
#------------------------------------------------------------------------------
-# Events themselves. TODO: Rename them to EV_* instead of the old
-# ST_* "state" names.
+# Events themselves.
-my @kr_events;
-
-sub ST_SESSION () { 0 } # [ $destination_session,
-sub ST_SOURCE () { 1 } # $sender_session,
-sub ST_NAME () { 2 } # $event_name,
-sub ST_TYPE () { 3 } # $event_type,
-sub ST_ARGS () { 4 } # \@event_parameters_arg0_etc,
+sub EV_SESSION () { 0 } # [ $destination_session,
+sub EV_SOURCE () { 1 } # $sender_session,
+sub EV_NAME () { 2 } # $event_name,
+sub EV_TYPE () { 3 } # $event_type,
+sub EV_ARGS () { 4 } # \@event_parameters_arg0_etc,
#
# (These fields go towards the end
# because they are optional in some
# cases. TODO: Is this still true?)
#
-sub ST_TIME () { 5 } # $event_due_time,
-sub ST_OWNER_FILE () { 6 } # $caller_filename_where_enqueued,
-sub ST_OWNER_LINE () { 7 } # $caller_line_where_enqueued,
-sub ST_SEQ () { 8 } # $unique_event_id,
+sub EV_OWNER_FILE () { 5 } # $caller_filename_where_enqueued,
+sub EV_OWNER_LINE () { 6 } # $caller_line_where_enqueued,
# ]
+sub EV_TIME () { 7 } # Maintained by POE::Queue
+sub EV_SEQ () { 8 } # Maintained by POE::Queue
+
# These are the names of POE's internal events. They're in constants
# so we don't mistype them again.
@@ -404,6 +397,7 @@ sub sig_remove {
sub sid {
my $session = shift;
+ confess unless ref($session);
"session " . $session->ID . " (" .
( (keys %{$kr_sessions{$session}->[SS_ALIASES]})
? join(", ", keys(%{$kr_sessions{$session}->[SS_ALIASES]}) )
@@ -414,7 +408,7 @@ sub sid {
sub assert_session_refcount {
my ($session, $refcount_index) = @_;
if (ASSERT_REFCOUNT) {
- die sid($session), " reference count $refcount_index went below zero"
+ confess sid($session), " reference count $refcount_index went below zero"
if $kr_sessions{$session}->[$refcount_index] < 0;
}
}
@@ -556,7 +550,7 @@ sub explain_usage {
sub test_for_idle_poe_kernel {
if (TRACE_REFCOUNT) {
warn( ",----- Kernel Activity -----\n",
- "| Events : ", scalar(@kr_events), "\n",
+ "| Events : ", $kr_queue->get_item_count(), "\n",
"| Files : ", scalar(keys %kr_filenos), "\n",
"| `--> : ", join(', ', sort { $a <=> $b } keys %kr_filenos), "\n",
"| Extra : $kr_extra_refs\n",
@@ -566,15 +560,15 @@ sub test_for_idle_poe_kernel {
);
}
- unless ( @kr_events > 1 or # > 1 for signal poll loop
+ unless ( $kr_queue->get_item_count() > 1 or # > 1 for signal poll loop
scalar(keys %kr_filenos) or
- $kr_extra_refs or
+ $kr_extra_refs or
$kr_child_procs
) {
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL, [ 'IDLE' ],
- time(), __FILE__, __LINE__
+ ( time(),
+ $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ 'IDLE' ],
+ __FILE__, __LINE__
) if keys %kr_sessions;
}
}
@@ -582,21 +576,21 @@ sub test_for_idle_poe_kernel {
sub post_plain_signal {
my ($destination, $signal_name) = @_;
$poe_kernel->_enqueue_event
- ( $destination, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL, [ $signal_name ],
- time(), __FILE__, __LINE__
+ ( time(),
+ $destination, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $signal_name ],
+ __FILE__, __LINE__
);
}
sub dispatch_due_events {
# Pull due events off the queue, and dispatch them.
my $now = time();
- while ( @kr_events and ($kr_events[0]->[ST_TIME] <= $now) ) {
- my $event = shift @kr_events;
- delete $kr_event_ids{$event->[ST_SEQ]};
- ses_refcount_dec2($event->[ST_SESSION], SS_EVCOUNT);
- ses_refcount_dec2($event->[ST_SOURCE], SS_POST_COUNT);
- $poe_kernel->_dispatch_event(@$event);
+ while (defined(my $next_time = $kr_queue->get_next_priority())) {
+ last if $next_time > $now;
+ my ($priority, $id, $event) = $kr_queue->dequeue_next();
+ ses_refcount_dec2($event->[EV_SESSION], SS_EVCOUNT);
+ ses_refcount_dec2($event->[EV_SOURCE], SS_POST_COUNT);
+ $poe_kernel->_dispatch_event(@$event, $priority, $id);
}
}
@@ -615,10 +609,10 @@ sub enqueue_ready_selects {
foreach my $select (@selects) {
$poe_kernel->_enqueue_event
- ( $select->[HSS_SESSION], $select->[HSS_SESSION],
- $select->[HSS_STATE], ET_SELECT,
- [ $select->[HSS_HANDLE], $vector ],
- time(), __FILE__, __LINE__,
+ ( time(),
+ $select->[HSS_SESSION], $select->[HSS_SESSION],
+ $select->[HSS_STATE], ET_SELECT, [ $select->[HSS_HANDLE], $vector ],
+ __FILE__, __LINE__,
);
unless ($kr_fno_vec->[FVC_EV_COUNT]++) {
@@ -704,9 +698,9 @@ sub signal {
}
$self->_enqueue_event
- ( $session, $kr_active_session,
+ ( time(), $session, $kr_active_session,
EN_SIGNAL, ET_SIGNAL, [ $signal, @etc ],
- time(), (caller)[1,2]
+ (caller)[1,2]
);
}
@@ -743,18 +737,20 @@ sub new {
# Be careful, though. Its apples bite back.
unless (defined $poe_kernel) {
+ # Create our master queue.
+ $kr_queue = POE::Queue::Array->new();
+
my $self = $poe_kernel = bless
[ \%kr_sessions, # KR_SESSIONS
\%kr_filenos, # KR_FILENOS
\%kr_signals, # KR_SIGNALS
\%kr_aliases, # KR_ALIASES
\$kr_active_session, # KR_ACTIVE_SESSION
- \@kr_events, # KR_EVENTS
+ \$kr_queue, # KR_QUEUE
undef, # KR_ID
\%kr_session_ids, # KR_SESSION_IDS
\$kr_id_index, # KR_ID_INDEX
\$kr_extra_refs, # KR_EXTRA_REFS
- \%kr_event_ids, # KR_EVENT_IDS
], $type;
# Kernel ID, based on Philip Gwyn's code. I hope he still can
@@ -781,9 +777,8 @@ sub new {
}
sub _get_kr_sessions_ref { \%kr_sessions }
-sub _get_kr_events_ref { \@kr_events }
-sub _get_kr_event_ids_ref { \%kr_event_ids }
sub _get_kr_filenos_ref { \%kr_filenos }
+sub _get_kr_queue_ref { $kr_queue }
#------------------------------------------------------------------------------
# Send an event to a session right now. Used by _disp_select to
@@ -1159,20 +1154,14 @@ sub _dispatch_event {
# Free any events that the departing session has in its queue.
# Also free the events this session has posted.
- my $index = @kr_events;
- while ( $index-- &&
- ( $kr_sessions{$session}->[SS_EVCOUNT]
- or $kr_sessions{$session}->[SS_POST_COUNT]
- )
- ) {
- if ( $kr_events[$index]->[ST_SESSION] == $session
- or $kr_events[$index]->[ST_SOURCE] == $session
- ) {
- ses_refcount_dec2($kr_events[$index]->[ST_SESSION], SS_EVCOUNT);
- ses_refcount_dec2($kr_events[$index]->[ST_SOURCE], SS_POST_COUNT);
- my $removed_event = splice(@kr_events, $index, 1);
- delete $kr_event_ids{$removed_event->[ST_SEQ]};
- }
+ my $my_event = sub {
+ ($_[0]->[EV_SESSION] == $session) || ($_[0]->[EV_SOURCE] == $session)
+ };
+
+ my @removed = $kr_queue->remove_items($my_event);
+ foreach (@removed) {
+ ses_refcount_dec2($_->[ITEM_PAYLOAD]->[EV_SESSION], SS_EVCOUNT);
+ ses_refcount_dec2($_->[ITEM_PAYLOAD]->[EV_SOURCE], SS_POST_COUNT);
}
# Close any lingering extra references.
@@ -1362,9 +1351,7 @@ sub finalize_kernel {
# Let's make sure POE isn't leaking things.
if (ASSERT_GARBAGE) {
- my %kernel_arrays =
- ( kr_events => \@kr_events
- );
+ my %kernel_arrays = ( );
while (my ($array_name, $array_ref) = each(%kernel_arrays)) {
if (my $leaked = @$array_ref) {
@@ -1378,7 +1365,6 @@ sub finalize_kernel {
kr_signals => \%kr_signals,
kr_aliases => \%kr_aliases,
kr_session_ids => \%kr_session_ids,
- kr_event_ids => \%kr_event_ids,
kr_filenos => \%kr_filenos,
);
@@ -1499,9 +1485,8 @@ sub _invoke_state {
warn "POE::Kernel detected SIGCHLD (pid=$pid; exit=$?)\n";
$self->_enqueue_event
- ( $self, $self,
- EN_SIGNAL, ET_SIGNAL, [ 'CHLD', $pid, $? ],
- time(), __FILE__, __LINE__
+ ( time(), $self, $self, EN_SIGNAL, ET_SIGNAL, [ 'CHLD', $pid, $? ],
+ __FILE__, __LINE__
);
}
else {
@@ -1551,7 +1536,11 @@ sub _invoke_state {
# The poll loop is over. Resume slowly polling for signals.
TRACE_SIGNALS and warn "POE::Kernel will poll again after a delay.\n";
- loop_resume_watching_child_signals();
+
+ $poe_kernel->_enqueue_event
+ ( time() + 1, $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
+ ) if keys(%kr_sessions) > 1;
}
# A signal was posted. Because signals propagate depth-first, this
@@ -1560,11 +1549,10 @@ sub _invoke_state {
elsif ($event eq EN_SIGNAL) {
if ($etc->[0] eq 'IDLE') {
- unless (@kr_events > 1 or scalar(keys %kr_filenos)) {
+ unless ($kr_queue->get_item_count() > 1 or scalar(keys %kr_filenos)) {
$self->_enqueue_event
- ( $self, $self,
- EN_SIGNAL, ET_SIGNAL, [ 'ZOMBIE' ],
- time(), __FILE__, __LINE__
+ ( time(), $self, $self, EN_SIGNAL, ET_SIGNAL, [ 'ZOMBIE' ],
+ __FILE__, __LINE__
);
}
}
@@ -1597,9 +1585,8 @@ sub session_alloc {
time(), __FILE__, __LINE__, undef
);
$self->_enqueue_event
- ( $session, $kr_active_session,
- EN_GC, ET_GC, [],
- time(), __FILE__, __LINE__
+ ( time(), $session, $kr_active_session, EN_GC, ET_GC, [],
+ __FILE__, __LINE__
);
}
@@ -1803,8 +1790,9 @@ sub get_active_session {
my $queue_seqnum = 0;
sub _enqueue_event {
- my ( $self, $session, $source_session, $event, $type, $etc, $time,
- $file, $line
+ my ( $self, $time,
+ $session, $source_session, $event, $type, $etc,
+ $file, $line,
) = @_;
if (TRACE_EVENTS) {
@@ -1813,111 +1801,29 @@ sub _enqueue_event {
);
}
- if (exists $kr_sessions{$session}) {
-
- # This is awkward, but faster than enumerating the fields
- # individually.
- my $event_to_enqueue = [ @_[1..8], ++$queue_seqnum ];
-
- # Special case: No events in the queue. Put the new event in the
- # queue, and resume watching time.
- unless (@kr_events) {
- $kr_events[0] = $event_to_enqueue;
- loop_resume_time_watcher($kr_events[0]->[ST_TIME]);
- }
-
- # Special case: The new event belongs at the end of the queue.
- elsif ($time >= $kr_events[-1]->[ST_TIME]) {
- push @kr_events, $event_to_enqueue;
- }
-
- # Special case: New event comes before earliest event. Since
- # there is an active time watcher, it must be reset.
- elsif ($time < $kr_events[0]->[ST_TIME]) {
- unshift @kr_events, $event_to_enqueue;
- loop_reset_time_watcher($kr_events[0]->[ST_TIME]);
- }
-
- # Special case: If there are only two events in the queue, and we
- # failed the last two tests, the new event goes between them.
- elsif (@kr_events == 2) {
- splice @kr_events, 1, 0, $event_to_enqueue;
- }
-
- # Small queue. Perform a reverse linear search on the assumption
- # that (a) a linear search is fast enough on small queues; and (b)
- # most events will be posted for "now" or some future time, which
- # tends to be towards the end of the queue.
- elsif (@kr_events < LARGE_QUEUE_SIZE) {
- my $index = @kr_events;
- $index--
- while ( $index and
- $time < $kr_events[$index-1]->[ST_TIME]
- );
- splice @kr_events, $index, 0, $event_to_enqueue;
- }
-
- # And finally, we have this large queue, and the program has
- # already wasted enough time. -><- It would be neat for POE to
- # determine the break-even point between "large" and "small" event
- # queues at start-up and tune itself accordingly.
- else {
- my $upper = @kr_events - 1;
- my $lower = 0;
- while ('true') {
- my $midpoint = ($upper + $lower) >> 1;
-
- # Upper and lower bounds crossed. No match; insert at the
- # lower bound point.
- if ($upper < $lower) {
- splice @kr_events, $lower, 0, $event_to_enqueue;
- last;
- }
-
- # The key at the midpoint is too high. The element just below
- # the midpoint becomes the new upper bound.
- if ($time < $kr_events[$midpoint]->[ST_TIME]) {
- $upper = $midpoint - 1;
- next;
- }
-
- # The key at the midpoint is too low. The element just above
- # the midpoint becomes the new lower bound.
- if ($time > $kr_events[$midpoint]->[ST_TIME]) {
- $lower = $midpoint + 1;
- next;
- }
-
- # The key matches the one at the midpoint. Scan towards
- # higher keys until the midpoint points to an element with a
- # higher key. Insert the new event before it.
- $midpoint++
- while ( ($midpoint < @kr_events)
- and ($time == $kr_events[$midpoint]->[ST_TIME])
- );
- splice @kr_events, $midpoint, 0, $event_to_enqueue;
- last;
- }
- }
+ unless (exists $kr_sessions{$session}) {
+ warn ">>>>> ", join('; ', keys(%kr_sessions)), " <<<<<\n";
+ croak "can't enqueue event($event) for nonexistent session($session)\a\n";
+ }
- # Manage reference counts.
- ses_refcount_inc2($session, SS_EVCOUNT);
- ses_refcount_inc2($source_session, SS_POST_COUNT);
+ # This is awkward, but faster than enumerating the fields
+ # individually.
+ my $event_to_enqueue = [ @_[2..8], ++$queue_seqnum ];
- # Users know timers by their IDs; the queue knows them by their
- # times. Map the ID to the time so we can binary search the queue
- # for events that will be removed or altered later.
- my $new_event_id = $event_to_enqueue->[ST_SEQ];
- $kr_event_ids{$new_event_id} = $time;
+ my $old_head_priority = $kr_queue->get_next_priority();
+ my $new_id = $kr_queue->enqueue($time, $event_to_enqueue);
- # Return the new event ID. Man, this rocks. I forgot POE was
- # maintaining event sequence numbers.
- return $new_event_id;
+ if ($kr_queue->get_item_count() == 1) {
+ loop_resume_time_watcher($time);
+ }
+ elsif ($time < $old_head_priority) {
+ loop_reset_time_watcher($time);
}
- # This function already has returned if everything went well.
- warn ">>>>> ", join('; ', keys(%kr_sessions)), " <<<<<\n";
- croak "can't enqueue event($event) for nonexistent session($session)\a\n";
+ ses_refcount_inc2($session, SS_EVCOUNT);
+ ses_refcount_inc2($source_session, SS_POST_COUNT);
+
+ return $new_id;
}
#------------------------------------------------------------------------------
@@ -1947,9 +1853,8 @@ sub post {
# time-ordered queue.
$self->_enqueue_event
- ( $session, $kr_active_session,
- $event_name, ET_USER, \@etc,
- time(), (caller)[1,2]
+ ( time(), $session, $kr_active_session, $event_name, ET_USER, \@etc,
+ (caller)[1,2]
);
return 1;
}
@@ -1968,9 +1873,9 @@ sub yield {
};
$self->_enqueue_event
- ( $kr_active_session, $kr_active_session,
+ ( time(), $kr_active_session, $kr_active_session,
$event_name, ET_USER, \@etc,
- time(), (caller)[1,2]
+ (caller)[1,2]
);
undef;
@@ -2037,20 +1942,19 @@ sub call {
# my @pending_timed_events = $kernel->queue_peek_alarms();
sub queue_peek_alarms {
- my ($self) = @_;
- my @pending_alarms;
+ my $self = shift;
my $alarm_count = $kr_sessions{$kr_active_session}->[SS_EVCOUNT];
- foreach my $alarm (@kr_events) {
- last unless $alarm_count;
- next unless $alarm->[ST_SESSION] == $kr_active_session;
- next unless $alarm->[ST_TYPE] & ET_ALARM;
- push @pending_alarms, $alarm->[ST_NAME];
- $alarm_count--;
- }
+ my $my_alarm = sub {
+ return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
+ return 0 unless $_[0]->[EV_SESSION] == $kr_active_session;
+ return 1;
+ };
- @pending_alarms;
+ return( map { $_->[ITEM_PAYLOAD]->[EV_NAME] }
+ $kr_queue->peek_items($my_alarm, $alarm_count)
+ );
}
#==============================================================================
@@ -2072,31 +1976,30 @@ sub alarm {
return EINVAL;
}
- my $index = @kr_events;
- while ($index--) {
- if ( ($kr_events[$index]->[ST_TYPE] & ET_ALARM) &&
- ($kr_events[$index]->[ST_SESSION] == $kr_active_session) &&
- ($kr_events[$index]->[ST_NAME] eq $event_name)
- ) {
- ses_refcount_dec2($kr_active_session, SS_EVCOUNT);
- ses_refcount_dec2($kr_active_session, SS_POST_COUNT);
- my $removed_alarm = splice(@kr_events, $index, 1);
- delete $kr_event_ids{$removed_alarm->[ST_SEQ]};
- }
+ my $my_alarm = sub {
+ return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
+ return 0 unless $_[0]->[EV_SESSION] == $kr_active_session;
+ return 0 unless $_[0]->[EV_NAME] eq $event_name;
+ return 1;
+ };
+
+ foreach ($kr_queue->remove_items($my_alarm)) {
+ ses_refcount_dec2($kr_active_session, SS_EVCOUNT);
+ ses_refcount_dec2($kr_active_session, SS_POST_COUNT);
}
# Add the new alarm if it includes a time. Calling _enqueue_event
# directly is faster than calling alarm_set to enqueue it.
if (defined $time) {
$self->_enqueue_event
- ( $kr_active_session, $kr_active_session,
+ ( $time, $kr_active_session, $kr_active_session,
$event_name, ET_ALARM, [ @etc ],
- $time, (caller)[1,2]
+ (caller)[1,2]
);
}
else {
# The event queue has become empty? Stop the time watcher.
- unless (@kr_events) {
+ unless ($kr_queue->get_item_count()) {
loop_pause_time_watcher();
}
}
@@ -2122,9 +2025,9 @@ sub alarm_add {
}
$self->_enqueue_event
- ( $kr_active_session, $kr_active_session,
+ ( $time, $kr_active_session, $kr_active_session,
$event_name, ET_ALARM, [ @etc ],
- $time, (caller)[1,2]
+ (caller)[1,2]
);
return 0;
@@ -2207,88 +2110,12 @@ sub alarm_set {
}
return $self->_enqueue_event
- ( $kr_active_session, $kr_active_session,
+ ( $time, $kr_active_session, $kr_active_session,
$event_name, ET_ALARM, [ @etc ],
- $time, (caller)[1,2]
+ (caller)[1,2]
);
}
-# This is an event helper: it finds an event in the queue. Special
-# cases don't count here because we assume the event exists. It dies
-# outright if there's a problem because its parameters have been
-# verified good before it's called. Failure is not an option here.
-
-# A lot of the code here is duplicated in _enqueue_event.
-
-# THIS IS A STATIC FUNCTION!
-
-sub _event_find {
- my ($time, $id) = @_;
-
- # Small queue. Find the event with a linear seek on the assumption
- # that the overhead of a binary seek would be more than a linear
- # search at this point. The actual break-even point is unknown, and
- # it probably varies from system to system.
- if (@kr_events < LARGE_QUEUE_SIZE) {
- my $index = @kr_events;
- while ($index--) {
- return $index if $id == $kr_events[$index]->[ST_SEQ];
- }
- die "internal inconsistency: event should have been found";
- }
-
- # Use a binary seek to find events in a large queue.
-
- else {
- my $upper = @kr_events - 1;
- my $lower = 0;
- while ('true') {
- my $midpoint = ($upper + $lower) >> 1;
-
- # The streams have crossed. That's bad.
- die "internal inconsistency: event should have been found"
- if $upper < $lower;
-
- # The key at the midpoint is too high. The element just below
- # the midpoint becomes the new upper bound.
- if ($time < $kr_events[$midpoint]->[ST_TIME]) {
- $upper = $midpoint - 1;
- next;
- }
-
- # The key at the midpoint is too low. The element just above
- # the midpoint becomes the new lower bound.
- if ($time > $kr_events[$midpoint]->[ST_TIME]) {
- $lower = $midpoint + 1;
- next;
- }
-
- # The key (time) matches the one at the midpoint. This may be
- # in the middle of a pocket of events with the same time, so
- # we'll have to search back and forth for one with the ID we're
- # looking for. Unfortunately.
- my $linear_point = $midpoint;
- while ( $linear_point >= 0 and
- $time == $kr_events[$linear_point]->[ST_TIME]
- ) {
- return $linear_point if $kr_events[$linear_point]->[ST_SEQ] == $id;
- $linear_point--;
- }
- $linear_point = $midpoint;
- while ( (++$linear_point < @kr_events) and
- ($time == $kr_events[$linear_point]->[ST_TIME])
- ) {
- return $linear_point if $kr_events[$linear_point]->[ST_SEQ] == $id;
- }
-
- # If we get this far, then the event hasn't been found.
- die "internal inconsistency: event should have been found";
- }
- }
-
- die "this message should never be reached";
-}
-
# Remove an alarm by its ID. -><- Now that alarms and events have
# been recombined, this will remove an event by its ID. However,
# nothing returns an event ID, so nobody knows what to remove.
@@ -2302,27 +2129,14 @@ sub alarm_remove {
return;
}
- my $alarm_time = $kr_event_ids{$alarm_id};
- unless (defined $alarm_time) {
- explain_usage("unknown alarm id in alarm_remove()");
- $! = ESRCH;
- return;
- }
-
- # Find the alarm by time.
- my $alarm_index = _event_find( $alarm_time, $alarm_id );
-
- # Ensure that the alarm belongs to this session, eh?
- if ($kr_events[$alarm_index]->[ST_SESSION] != $kr_active_session) {
- explain_usage("alarm $alarm_id is not for the session");
- $! = EPERM;
- return;
- }
+ my $my_alarm = sub {
+ $_[0]->[EV_SESSION] == $kr_active_session;
+ };
+ my ($time, $id, $event) = $kr_queue->remove_item($alarm_id, $my_alarm);
+ return unless defined $time;
- my $old_alarm = splice( @kr_events, $alarm_index, 1 );
ses_refcount_dec2($kr_active_session, SS_EVCOUNT);
ses_refcount_dec2($kr_active_session, SS_POST_COUNT);
- delete $kr_event_ids{$old_alarm->[ST_SEQ]};
# In a list context, return the alarm that was removed. In a scalar
# context, return a reference to the alarm that was removed. In a
@@ -2330,9 +2144,8 @@ sub alarm_remove {
# value when someone needs something useful from it.
return unless defined wantarray;
- return ( @$old_alarm[ST_NAME, ST_TIME], @{$old_alarm->[ST_ARGS]} )
- if wantarray;
- return [ @$old_alarm[ST_NAME, ST_TIME], @{$old_alarm->[ST_ARGS]} ];
+ return ( $event->[EV_NAME], $time, @{$event->[EV_ARGS]} ) if wantarray;
+ return [ $event->[EV_NAME], $time, @{$event->[EV_ARGS]} ];
}
# Move an alarm to a new time. This virtually removes the alarm and
@@ -2353,132 +2166,10 @@ sub alarm_adjust {
return;
}
- my $alarm_time = $kr_event_ids{$alarm_id};
- unless (defined $alarm_time) {
- explain_usage("unknown alarm id in alarm_adjust()");
- $! = ESRCH;
- return;
- }
-
- # Find the alarm by time.
- my $alarm_index = _event_find( $alarm_time, $alarm_id );
-
- # Ensure that the alarm belongs to this session, eh?
- if ($kr_events[$alarm_index]->[ST_SESSION] != $kr_active_session) {
- explain_usage("alarm $alarm_id is not for the session");
- $! = EPERM;
- return;
- }
-
- # Nothing to do if the delta is zero.
- return $kr_events[$alarm_index]->[ST_TIME] unless $delta;
-
- # Remove the old alarm and adjust its time.
- my $old_alarm = splice( @kr_events, $alarm_index, 1 );
- my $new_time = $old_alarm->[ST_TIME] += $delta;
- $kr_event_ids{$alarm_id} = $new_time;
-
- # Now insert it back.
-
- # Special case: No events in the queue. Put the new alarm in the
- # queue, and be done with it.
- unless (@kr_events) {
- $kr_events[0] = $old_alarm;
- }
-
- # Special case: New event belongs at the end of the queue. Push
- # it, and be done with it.
- elsif ($new_time >= $kr_events[-1]->[ST_TIME]) {
- push @kr_events, $old_alarm;
- }
-
- # Special case: New event comes before earliest event. Unshift
- # it, and be done with it.
- elsif ($new_time < $kr_events[0]->[ST_TIME]) {
- unshift @kr_events, $old_alarm;
- }
-
- # Special case: Two events in the queue. The new event enters
- # between them, because it's not before the first one or after the
- # last one.
- elsif (@kr_events == 2) {
- splice @kr_events, 1, 0, $old_alarm;
- }
-
- # Small queue. Perform a reverse linear search on the assumption
- # that (a) a linear search is fast enough on small queues; and (b)
- # most events will be posted for "now" or some future time, which
- # tends to be towards the end of the queue.
- elsif ($delta > 0 and (@kr_events - $alarm_index) < LARGE_QUEUE_SIZE) {
- my $index = $alarm_index;
- $index++
- while ( $index < @kr_events and
- $new_time >= $kr_events[$index]->[ST_TIME]
- );
- splice @kr_events, $index, 0, $old_alarm;
- }
-
- elsif ($delta < 0 and $alarm_index < LARGE_QUEUE_SIZE) {
- my $index = $alarm_index;
- $index--
- while ( $index and
- $new_time < $kr_events[$index-1]->[ST_TIME]
- );
- splice @kr_events, $index, 0, $old_alarm;
- }
-
- # And finally, we have this large queue, and the program has already
- # wasted enough time. -><- It would be neat for POE to determine
- # the break-even point between "large" and "small" alarm queues at
- # start-up and tune itself accordingly.
- else {
- my ($upper, $lower);
- if ($delta > 0) {
- $upper = @kr_events - 1;
- $lower = $alarm_index;
- }
- else {
- $upper = $alarm_index;
- $lower = 0;
- }
-
- while ('true') {
- my $midpoint = ($upper + $lower) >> 1;
-
- # Upper and lower bounds crossed. No match; insert at the
- # lower bound point.
- if ($upper < $lower) {
- splice @kr_events, $lower, 0, $old_alarm;
- last;
- }
-
- # The key at the midpoint is too high. The element just below
- # the midpoint becomes the new upper bound.
- if ($new_time < $kr_events[$midpoint]->[ST_TIME]) {
- $upper = $midpoint - 1;
- next;
- }
-
- # The key at the midpoint is too low. The element just above
- # the midpoint becomes the new lower bound.
- if ($new_time > $kr_events[$midpoint]->[ST_TIME]) {
- $lower = $midpoint + 1;
- next;
- }
-
- # The key matches the one at the midpoint. Scan towards
- # higher keys until the midpoint points to an element with a
- # higher key. Insert the new event before it.
- $midpoint++
- while ( ($midpoint < @kr_events) and
- ($new_time == $kr_events[$midpoint]->[ST_TIME])
- );
- splice @kr_events, $midpoint, 0, $old_alarm;
- last;
- }
- }
-
- return $new_time;
+ my $my_alarm = sub {
+ $_[0]->[EV_SESSION] == $kr_active_session;
+ };
+ return $kr_queue->adjust_priority($alarm_id, $my_alarm, $delta);
}
# A convenient function for setting alarms relative to now. It also
@@ -2507,9 +2198,9 @@ sub delay_set {
}
return $self->_enqueue_event
- ( $kr_active_session, $kr_active_session,
+ ( time() + $seconds, $kr_active_session, $kr_active_session,
$event_name, ET_ALARM, [ @etc ],
- time() + $seconds, (caller)[1,2]
+ (caller)[1,2]
);
}
@@ -2517,7 +2208,6 @@ sub delay_set {
sub alarm_remove_all {
my $self = shift;
- my @removed;
# This should never happen, actually.
croak "unknown session in alarm_remove_all call"
@@ -2526,19 +2216,18 @@ sub alarm_remove_all {
# Free every alarm owned by the session. This code is ripped off
# from the _stop code to flush everything.
- my $index = @kr_events;
- while ($index-- && $kr_sessions{$kr_active_session}->[SS_EVCOUNT]) {
- if ( $kr_events[$index]->[ST_SESSION] == $kr_active_session and
- $kr_events[$index]->[ST_TYPE] & ET_ALARM
- ) {
- ses_refcount_dec2($kr_active_session, SS_EVCOUNT);
- ses_refcount_dec2($kr_active_session, SS_POST_COUNT);
- my $removed_alarm = splice(@kr_events, $index, 1);
- delete $kr_event_ids{$removed_alarm->[ST_SEQ]};
- push( @removed,
- [ @$removed_alarm[ST_NAME, ST_TIME], @{$removed_alarm->[ST_ARGS]} ]
- );
- }
+ my $my_alarm = sub {
+ return 0 unless $_[0]->[EV_TYPE] & ET_ALARM;
+ return 0 unless $_[0]->[EV_SESSION] == $kr_active_session;
+ return 1;
+ };
+
+ my @removed;
+ foreach ($kr_queue->remove_items($my_alarm)) {
+ ses_refcount_dec2($kr_active_session, SS_EVCOUNT);
+ ses_refcount_dec2($kr_active_session, SS_POST_COUNT);
+ my ($time, $id, $event) = @$_;
+ push @removed, [ $event->[EV_NAME], $time, @{$event->[EV_ARGS]} ];
}
return unless defined wantarray;
@@ -2722,20 +2411,16 @@ sub _internal_select {
# Remove any events destined for that handle.
- my $index = @kr_events;
- while ( $kr_fno_vec->[FVC_EV_COUNT] and
- $index-- and
- $kr_sessions{$kr_active_session}->[SS_EVCOUNT]
- ) {
- next unless ( $kr_events[$index]->[ST_SESSION] == $kill_session and
- $kr_events[$index]->[ST_NAME] eq $kill_event
- );
- ses_refcount_dec2($kr_events[$index]->[ST_SESSION], SS_EVCOUNT);
- ses_refcount_dec2($kr_events[$index]->[ST_SOURCE], SS_POST_COUNT);
-
- my $removed_event = splice(@kr_events, $index, 1);
- delete $kr_event_ids{$removed_event->[ST_SEQ]};
+ my $my_select = sub {
+ return 0 unless $_[0]->[EV_SESSION] == $kill_session;
+ return 0 unless $_[0]->[EV_NAME] eq $kill_event;
+ return 1;
+ };
+ foreach ($kr_queue->remove_items($my_select)) {
+ my ($priority, $time, $event) = @$_;
+ ses_refcount_dec2($event->[EV_SESSION], SS_EVCOUNT);
+ ses_refcount_dec2($event->[EV_SOURCE], SS_POST_COUNT);
$kr_fno_vec->[FVC_EV_COUNT]--;
}
View
54 lib/POE/Loop/Event.pm
@@ -29,7 +29,7 @@ my $_watcher_timer;
my @fileno_watcher;
my %signal_watcher;
-my ($kr_sessions, $kr_events);
+my ($kr_sessions, $kr_queue);
#------------------------------------------------------------------------------
# Loop construction and destruction.
@@ -37,7 +37,7 @@ my ($kr_sessions, $kr_events);
sub loop_initialize {
my $kernel = shift;
$kr_sessions = $kernel->_get_kr_sessions_ref();
- $kr_events = $kernel->_get_kr_events_ref();
+ $kr_queue = $kernel->_get_kr_queue_ref();
$_watcher_timer =
Event->timer
@@ -60,31 +60,28 @@ sub loop_finalize {
sub _loop_signal_handler_generic {
TRACE_SIGNALS and warn "\%\%\% Enqueuing generic SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL,
- [ $_[0]->w->signal ],
- time(), __FILE__, __LINE__
+ ( time(),
+ $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0]->w->signal ],
+ __FILE__, __LINE__
);
}
sub _loop_signal_handler_pipe {
TRACE_SIGNALS and warn "\%\%\% Enqueuing PIPE-like SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel->[KR_ACTIVE_SESSION],
- $poe_kernel,
- EN_SIGNAL, ET_SIGNAL,
- [ $_[0]->w->signal ],
- time(), __FILE__, __LINE__
+ ( time(),
+ $poe_kernel->[KR_ACTIVE_SESSION], $poe_kernel,
+ EN_SIGNAL, ET_SIGNAL, [ $_[0]->w->signal ],
+ __FILE__, __LINE__
);
}
sub _loop_signal_handler_child {
TRACE_SIGNALS and warn "\%\%\% Enqueuing CHLD-like SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL,
- [ ],
- time(), __FILE__, __LINE__
+ ( time(),
+ $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
);
}
@@ -101,10 +98,8 @@ sub loop_watch_signal {
# CHLD doesn't exist.
$SIG{$signal} = 'DEFAULT';
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL,
- [ ],
- time() + 1, __FILE__, __LINE__
+ ( time() + 1, $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
) if $signal eq 'CHLD' or not exists $SIG{CHLD};
return;
@@ -129,21 +124,6 @@ sub loop_watch_signal {
);
}
-sub loop_resume_watching_child_signals {
- # For SIGCHLD triggered polling loop.
- # nothing to do
-
- # For constant polling loop.
- $SIG{CHLD} = 'DEFAULT' if exists $SIG{CHLD};
- $SIG{CLD} = 'DEFAULT' if exists $SIG{CLD};
- $poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL,
- [ ],
- time() + 1, __FILE__, __LINE__
- ) if keys(%$kr_sessions) > 1;
-}
-
sub loop_ignore_signal {
my $signal = shift;
if (defined $signal_watcher{$signal}) {
@@ -230,8 +210,8 @@ sub _loop_event_callback {
# Register the next timed callback if there are events left.
- if (@$kr_events) {
- $_watcher_timer->at( $kr_events->[0]->[ST_TIME] );
+ if ($kr_queue->get_item_count()) {
+ $_watcher_timer->at($kr_queue->get_next_priority());
$_watcher_timer->start();
# POE::Kernel's signal polling loop always keeps oe event in the
@@ -240,7 +220,7 @@ sub _loop_event_callback {
# vs. kernel events, and GC the kernel when the user events drop
# to 0.
- if (@$kr_events == 1) {
+ if ($kr_queue->get_item_count() == 1) {
test_for_idle_poe_kernel();
}
}
View
48 lib/POE/Loop/Gtk.pm
@@ -29,7 +29,7 @@ sub POE_LOOP () { LOOP_GTK }
my $_watcher_timer;
my @fileno_watcher;
-my ($kr_sessions, $kr_events);
+my ($kr_sessions, $kr_queue);
#------------------------------------------------------------------------------
# Loop construction and destruction.
@@ -37,7 +37,7 @@ my ($kr_sessions, $kr_events);
sub loop_initialize {
my $kernel = shift;
$kr_sessions = $kernel->_get_kr_sessions_ref();
- $kr_events = $kernel->_get_kr_events_ref();
+ $kr_queue = $kernel->_get_kr_queue_ref();
Gtk->init;
}
@@ -55,10 +55,8 @@ sub loop_finalize {
sub _loop_signal_handler_generic {
TRACE_SIGNALS and warn "\%\%\% Enqueuing generic SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL,
- [ $_[0] ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0] ],
+ __FILE__, __LINE__
);
$SIG{$_[0]} = \&_loop_signal_handler_generic;
}
@@ -66,10 +64,8 @@ sub _loop_signal_handler_generic {
sub _loop_signal_handler_pipe {
TRACE_SIGNALS and warn "\%\%\% Enqueuing PIPE-like SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL,
- [ $_[0] ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0] ],
+ __FILE__, __LINE__
);
$SIG{$_[0]} = \&_loop_signal_handler_pipe;
}
@@ -80,10 +76,8 @@ sub _loop_signal_handler_child {
TRACE_SIGNALS and warn "\%\%\% Enqueuing CHLD-like SIG$_[0] event...\n";
$SIG{$_[0]} = 'DEFAULT';
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL,
- [ ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
);
}
@@ -103,10 +97,8 @@ sub loop_watch_signal {
# CHLD doesn't exist.
$SIG{$signal} = 'DEFAULT';
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL,
- [ ],
- time() + 1, __FILE__, __LINE__
+ ( time() + 1, $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
) if $signal eq 'CHLD' or not exists $SIG{CHLD};
return;
@@ -128,22 +120,6 @@ sub loop_watch_signal {
$SIG{$signal} = \&_loop_signal_handler_generic;
}
-sub loop_resume_watching_child_signals {
- # For SIGCHLD triggered polling loop.
- # $SIG{CHLD} = \&_loop_signal_handler_child if exists $SIG{CHLD};
- # $SIG{CLD} = \&_loop_signal_handler_child if exists $SIG{CLD};
-
- # For constant polling loop.
- $SIG{CHLD} = 'DEFAULT' if exists $SIG{CHLD};
- $SIG{CLD} = 'DEFAULT' if exists $SIG{CLD};
- $poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL,
- [ ],
- time() + 1, __FILE__, __LINE__
- ) if keys(%$kr_sessions) > 1;
-}
-
sub loop_ignore_signal {
my $signal = shift;
$SIG{$signal} = "DEFAULT";
@@ -191,7 +167,7 @@ sub loop_reset_time_watcher {
sub _loop_resume_timer {
Gtk->idle_remove($_watcher_timer);
- loop_resume_time_watcher($poe_kernel->[KR_EVENTS]->[0]->[ST_TIME]);
+ loop_resume_time_watcher($kr_queue->get_next_priority());
}
sub loop_pause_time_watcher {
@@ -296,7 +272,7 @@ sub _loop_event_callback {
undef $_watcher_timer;
# Register the next timeout if there are events left.
- if (@$kr_events) {
+ if ($kr_queue->get_item_count()) {
$_watcher_timer = Gtk->idle_add(\&_loop_resume_timer);
}
View
61 lib/POE/Loop/IO_Poll.pm
@@ -33,7 +33,7 @@ use IO::Poll qw( POLLRDNORM POLLWRNORM POLLRDBAND
sub MINIMUM_POLL_TIMEOUT () { 0 }
-my ($kr_sessions, $kr_events, $kr_event_ids);
+my ($kr_sessions, $kr_queue);
my %poll_fd_masks;
#------------------------------------------------------------------------------
@@ -42,8 +42,7 @@ my %poll_fd_masks;
sub loop_initialize {
my $kernel = shift;
$kr_sessions = $kernel->_get_kr_sessions_ref();
- $kr_events = $kernel->_get_kr_events_ref();
- $kr_event_ids = $kernel->_get_kr_event_ids_ref();
+ $kr_queue = $kernel->_get_kr_queue_ref();
%poll_fd_masks = ();
}
@@ -58,10 +57,8 @@ sub loop_finalize {
sub _loop_signal_handler_generic {
TRACE_SIGNALS and warn "\%\%\% Enqueuing generic SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL,
- [ $_[0] ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0] ],
+ __FILE__, __LINE__
);
$SIG{$_[0]} = \&_loop_signal_handler_generic;
}
@@ -69,10 +66,8 @@ sub _loop_signal_handler_generic {
sub _loop_signal_handler_pipe {
TRACE_SIGNALS and warn "\%\%\% Enqueuing PIPE-like SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL,
- [ $_[0] ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0] ],
+ __FILE__, __LINE__
);
$SIG{$_[0]} = \&_loop_signal_handler_pipe;
}
@@ -83,9 +78,8 @@ sub _loop_signal_handler_child {
TRACE_SIGNALS and warn "\%\%\% Enqueuing CHLD-like SIG$_[0] event...\n";
$SIG{$_[0]} = 'DEFAULT';
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL, [ ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
);
}
@@ -102,10 +96,8 @@ sub loop_watch_signal {
# CHLD doesn't exist.
$SIG{$signal} = 'DEFAULT';
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL,
- [ ],
- time() + 1, __FILE__, __LINE__
+ ( time() + 1, $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
) if $signal eq 'CHLD' or not exists $SIG{CHLD};
return;
@@ -127,16 +119,6 @@ sub loop_watch_signal {
$SIG{$signal} = \&_loop_signal_handler_generic;
}
-sub loop_resume_watching_child_signals {
- $SIG{CHLD} = 'DEFAULT' if exists $SIG{CHLD};
- $SIG{CLD} = 'DEFAULT' if exists $SIG{CLD};
- $poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL, [ ],
- time() + 1, __FILE__, __LINE__
- ) if keys(%$kr_sessions) > 1;
-}
-
sub loop_ignore_signal {
my $signal = shift;
$SIG{$signal} = "DEFAULT";
@@ -279,8 +261,8 @@ sub loop_do_timeslice {
my $now = time();
my $timeout;
- if (@$kr_events) {
- $timeout = $kr_events->[0]->[ST_TIME] - $now;
+ if ($kr_queue->get_item_count()) {
+ $timeout = $kr_queue->get_next_priority() - $now;
$timeout = MINIMUM_POLL_TIMEOUT if $timeout < MINIMUM_POLL_TIMEOUT;
}
else {
@@ -293,25 +275,6 @@ sub loop_do_timeslice {
$now-$^T, $timeout, ($now-$^T)+$timeout
)
);
- warn( '*** Event times: ' .
- join( ', ',
- map { sprintf('%d=%.4f',
- $_->[ST_SEQ], $_->[ST_TIME] - $now
- )
- } @$kr_events
- ) .
- "\n"
- );
- }
-
- # Ensure that the event queue remains in time order.
- if (ASSERT_EVENTS and @$kr_events) {
- my $previous_time = $kr_events->[0]->[ST_TIME];
- foreach (@$kr_events) {
- die "event $_->[ST_SEQ] is out of order"
- if $_->[ST_TIME] < $previous_time;
- $previous_time = $_->[ST_TIME];
- }
}
my @filenos = %poll_fd_masks;
View
63 lib/POE/Loop/Select.pm
@@ -35,7 +35,7 @@ BEGIN {
eval "sub MINIMUM_SELECT_TIMEOUT () { $timeout }";
};
-my ($kr_sessions, $kr_events, $kr_event_ids, $kr_filenos);
+my ($kr_sessions, $kr_queue, $kr_filenos);
# select() vectors. They're stored in an array so that the VEC_RD,
# VEC_WR, and VEC_EX offsets work. This saves some code, but it makes
@@ -54,8 +54,7 @@ sub loop_initialize {
my $kernel = shift;
$kr_sessions = $kernel->_get_kr_sessions_ref();
- $kr_events = $kernel->_get_kr_events_ref();
- $kr_event_ids = $kernel->_get_kr_event_ids_ref();
+ $kr_queue = $kernel->_get_kr_queue_ref();
$kr_filenos = $kernel->_get_kr_filenos_ref();
# Initialize the vectors as vectors.
@@ -89,10 +88,8 @@ sub loop_finalize {
sub _loop_signal_handler_generic {
TRACE_SIGNALS and warn "\%\%\% Enqueuing generic SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL,
- [ $_[0] ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0] ],
+ __FILE__, __LINE__
);
$SIG{$_[0]} = \&_loop_signal_handler_generic;
}
@@ -100,10 +97,8 @@ sub _loop_signal_handler_generic {
sub _loop_signal_handler_pipe {
TRACE_SIGNALS and warn "\%\%\% Enqueuing PIPE-like SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL,
- [ $_[0] ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0] ],
+ __FILE__, __LINE__
);
$SIG{$_[0]} = \&_loop_signal_handler_pipe;
}
@@ -114,9 +109,8 @@ sub _loop_signal_handler_child {
TRACE_SIGNALS and warn "\%\%\% Enqueuing CHLD-like SIG$_[0] event...\n";
$SIG{$_[0]} = 'DEFAULT';
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL, [ ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
);
}
@@ -133,10 +127,8 @@ sub loop_watch_signal {
# CHLD doesn't exist.
$SIG{$signal} = 'DEFAULT';
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL,
- [ ],
- time() + 1, __FILE__, __LINE__
+ ( time() + 1, $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
) if $signal eq 'CHLD' or not exists $SIG{CHLD};
return;
@@ -158,16 +150,6 @@ sub loop_watch_signal {
$SIG{$signal} = \&_loop_signal_handler_generic;
}
-sub loop_resume_watching_child_signals {
- $SIG{CHLD} = 'DEFAULT' if exists $SIG{CHLD};
- $SIG{CLD} = 'DEFAULT' if exists $SIG{CLD};
- $poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL, [ ],
- time() + 1, __FILE__, __LINE__
- ) if keys(%$kr_sessions) > 1;
-}
-
sub loop_ignore_signal {
my $signal = shift;
$SIG{$signal} = "DEFAULT";
@@ -263,10 +245,10 @@ sub loop_do_timeslice {
# for some constant number of seconds.
my $now = time();
- my $timeout;
+ my $timeout = $kr_queue->get_next_priority();
- if (@$kr_events) {
- $timeout = $kr_events->[0]->[ST_TIME] - $now;
+ if (defined $timeout) {
+ $timeout -= $now;
$timeout = MINIMUM_SELECT_TIMEOUT if $timeout < MINIMUM_SELECT_TIMEOUT;
}
else {
@@ -279,25 +261,6 @@ sub loop_do_timeslice {
$now-$^T, $timeout, ($now-$^T)+$timeout
)
);
- warn( '*** Event times: ' .
- join( ', ',
- map { sprintf('%d=%.4f',
- $_->[ST_SEQ], $_->[ST_TIME] - $now
- )
- } @$kr_events
- ) .
- "\n"
- );
- }
-
- # Ensure that the event queue remains in time order.
- if (ASSERT_EVENTS and @$kr_events) {
- my $previous_time = $kr_events->[0]->[ST_TIME];
- foreach (@$kr_events) {
- die "event $_->[ST_SEQ] is out of order"
- if $_->[ST_TIME] < $previous_time;
- $previous_time = $_->[ST_TIME];
- }
}
# This is heavy. It determines whether there are any files actually
View
44 lib/POE/Loop/Tk.pm
@@ -32,7 +32,7 @@ sub POE_LOOP () { LOOP_TK }
my $_watcher_timer;
-my ($kr_sessions, $kr_events, $kr_filenos);
+my ($kr_sessions, $kr_queue, $kr_filenos);
#------------------------------------------------------------------------------
# Loop construction and destruction.
@@ -40,7 +40,7 @@ my ($kr_sessions, $kr_events, $kr_filenos);
sub loop_initialize {
my $kernel = shift;
$kr_sessions = $kernel->_get_kr_sessions_ref();
- $kr_events = $kernel->_get_kr_events_ref();
+ $kr_queue = $kernel->_get_kr_queue_ref();
$kr_filenos = $kernel->_get_kr_filenos_ref();
$poe_main_window = Tk::MainWindow->new();
@@ -58,10 +58,8 @@ sub loop_finalize {
sub _loop_signal_handler_generic {
TRACE_SIGNALS and warn "\%\%\% Enqueuing generic SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL,
- [ $_[0] ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0] ],
+ __FILE__, __LINE__
);
$SIG{$_[0]} = \&_loop_signal_handler_generic;
}
@@ -69,10 +67,8 @@ sub _loop_signal_handler_generic {
sub _loop_signal_handler_pipe {
TRACE_SIGNALS and warn "\%\%\% Enqueuing PIPE-like SIG$_[0] event...\n";
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SIGNAL, ET_SIGNAL,
- [ $_[0] ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SIGNAL, ET_SIGNAL, [ $_[0] ],
+ __FILE__, __LINE__
);
$SIG{$_[0]} = \&_loop_signal_handler_pipe;
}
@@ -83,9 +79,8 @@ sub _loop_signal_handler_child {
TRACE_SIGNALS and warn "\%\%\% Enqueuing CHLD-like SIG$_[0] event...\n";
$SIG{$_[0]} = 'DEFAULT';
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL, [ ],
- time(), __FILE__, __LINE__
+ ( time(), $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
);
}
@@ -102,9 +97,8 @@ sub loop_watch_signal {
# CHLD doesn't exist.
$SIG{$signal} = 'DEFAULT';
$poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL, [ ],
- time() + 1, __FILE__, __LINE__
+ ( time() + 1, $poe_kernel, $poe_kernel, EN_SCPOLL, ET_SCPOLL, [ ],
+ __FILE__, __LINE__
) if $signal eq 'CHLD' or not exists $SIG{CHLD};
return;
@@ -126,16 +120,6 @@ sub loop_watch_signal {
$SIG{$signal} = \&_loop_signal_handler_generic;
}
-sub loop_resume_watching_child_signals () {
- $SIG{CHLD} = 'DEFAULT' if exists $SIG{CHLD};
- $SIG{CLD} = 'DEFAULT' if exists $SIG{CLD};
- $poe_kernel->_enqueue_event
- ( $poe_kernel, $poe_kernel,
- EN_SCPOLL, ET_SCPOLL, [ ],
- time() + 1, __FILE__, __LINE__
- ) if keys(%$kr_sessions) > 1;
-}
-
sub loop_ignore_signal {
my $signal = shift;
$SIG{$signal} = "DEFAULT";
@@ -321,7 +305,7 @@ sub _loop_event_callback {
# Register the next timed callback if there are events left.
- if (@$kr_events) {
+ if ($kr_queue->get_item_count()) {
# Cancel the Tk alarm that handles alarms.
@@ -338,8 +322,8 @@ sub _loop_event_callback {
$_watcher_timer->cancel();
undef $_watcher_timer;
- if (@$kr_events) {
- my $next_time = $kr_events->[0]->[ST_TIME] - time();
+ if ($kr_queue->get_item_count()) {
+ my $next_time = $kr_queue->get_next_priority() - time();
$next_time = 0 if $next_time < 0;
$_watcher_timer =
@@ -357,7 +341,7 @@ sub _loop_event_callback {
# vs. kernel events, and GC the kernel when the user events drop
# to 0.
- if (@$kr_events == 1) {
+ if ($kr_queue->get_item_count() == 1) {
test_for_idle_poe_kernel();
}
}
View
169 tests/001_queue.t
@@ -0,0 +1,169 @@
+#!/usr/bin/perl -w
+# $Id$
+
+# Tests basic queue operations.
+
+use strict;
+use lib qw(./lib ../lib);
+use TestSetup;
+&test_setup(1061);
+
+use POSIX qw(EPERM ESRCH);
+
+use POE::Queue::Array;
+
+my $q = POE::Queue::Array->new();
+
+ok_if(1, $q->get_item_count == 0);
+ok_if(2, !defined($q->dequeue_next));
+
+ok_if(3, $q->enqueue(1, "one") == 1);
+ok_if(4, $q->enqueue(3, "tre") == 2);
+ok_if(5, $q->enqueue(2, "two") == 3);
+
+sub compare_lists {
+ my ($one, $two) = @_;
+ return 0 unless @$one == @$two;
+ foreach (@$one) {
+ return 0 if $_ ne shift @$two;
+ }
+ return 1;
+}
+
+ok_if(6, compare_lists([$q->dequeue_next()], [1, 1, "one"]));
+ok_if(8, compare_lists([$q->dequeue_next()], [2, 3, "two"]));
+ok_if(7, compare_lists([$q->dequeue_next()], [3, 2, "tre"]));
+ok_if(9, compare_lists([$q->dequeue_next()], []));
+
+ok_if(10, $q->enqueue(1, "a") == 4);
+ok_if(11, $q->enqueue(3, "c") == 5);
+ok_if(12, $q->enqueue(5, "e") == 6);
+ok_if(13, $q->enqueue(2, "b") == 7);
+ok_if(14, $q->enqueue(4, "d") == 8);
+
+sub always_ok { 1 }
+sub never_ok { 0 }
+
+ok_if(15, compare_lists([$q->remove_item(7, \&always_ok)], [2, 7, "b"]));
+ok_if(16, compare_lists([$q->remove_item(5, \&always_ok)], [3, 5, "c"]));
+ok_if(17, compare_lists([$q->remove_item(8, \&always_ok)], [4, 8, "d"]));
+
+$! = 0;
+ok_if(18, compare_lists([$q->remove_item(6, \&never_ok )], []));
+ok_if(19, $!==EPERM);
+
+$! = 0;
+ok_if(20, compare_lists([$q->remove_item(8, \&always_ok)], []));
+ok_if(21, $!==ESRCH);
+
+ok_if(22, compare_lists([$q->dequeue_next()], [1, 4, "a"]));
+ok_if(23, compare_lists([$q->dequeue_next()], [5, 6, "e"]));
+ok_if(24, compare_lists([$q->dequeue_next()], []));
+
+ok_if(25, $q->enqueue(1, "a") == 9);
+ok_if(26, $q->enqueue(3, "c") == 10);
+ok_if(27, $q->enqueue(5, "e") == 11);
+ok_if(28, $q->enqueue(2, "b") == 12);
+ok_if(29, $q->enqueue(4, "d") == 13);
+ok_if(30, $q->enqueue(6, "f") == 14);
+
+ok_if(31, $q->get_item_count() == 6);
+
+sub odd_letters { $_[0] =~ /[ace]/ }
+sub even_letters { $_[0] =~ /[bdf]/ }
+
+my @items;
+
+@items = $q->remove_items(\&odd_letters, 3);
+ok_if(32, @items == 3);
+ok_if(33, compare_lists($items[0], [1, 9, "a"]));
+ok_if(34, compare_lists($items[1], [3, 10, "c"]));
+ok_if(35, compare_lists($items[2], [5, 11, "e"]));
+
+ok_if(36, $q->get_item_count() == 3);
+
+@items = $q->remove_items(\&odd_letters, 3);
+ok_if(37, @items == 0);
+
+@items = $q->remove_items(\&even_letters, 3);
+ok_if(38, @items == 3);
+ok_if(39, compare_lists($items[0], [2, 12, "b"]));
+ok_if(40, compare_lists($items[1], [4, 13, "d"]));
+ok_if(41, compare_lists($items[2], [6, 14, "f"]));
+
+ok_if(42, $q->enqueue(10, "a") == 15);
+ok_if(43, $q->enqueue(20, "b") == 16);
+ok_if(44, $q->enqueue(30, "c") == 17);
+ok_if(45, $q->enqueue(40, "d") == 18);
+ok_if(46, $q->enqueue(50, "e") == 19);
+ok_if(47, $q->enqueue(60, "f") == 20);
+
+ok_if(48, $q->get_item_count() == 6);
+
+@items = $q->peek_items(\&even_letters);
+ok_if(49, $items[0][2] eq "b");
+ok_if(50, $items[1][2] eq "d");
+ok_if(51, $items[2][2] eq "f");
+
+ok_if(52, $q->adjust_priority(19, \&always_ok, -15) == 35);
+ok_if(53, $q->adjust_priority(16, \&always_ok, +15) == 35);
+
+@items = $q->remove_items(\&always_ok);
+ok_if(54, @items == 6);
+
+ok_if(55, compare_lists($items[0], [10, 15, "a"]));
+ok_if(56, compare_lists($items[1], [30, 17, "c"]));
+ok_if(57, compare_lists($items[2], [35, 19, "e"]));
+ok_if(58, compare_lists($items[3], [35, 16, "b"]));
+ok_if(59, compare_lists($items[4], [40, 18, "d"]));
+ok_if(60, compare_lists($items[5], [60, 20, "f"]));
+
+ok_if(61, $q->get_item_count() == 0);
+
+### Large Queue Tests. The only functions that use large queues are
+### enqueue() and adjust_priority(). Large queues are over ~500
+### elements.
+
+sub shuffled_list {
+ my $limit = shift() - 1;
+ my @list = (0..$limit);
+ my $i = @list;
+ while (--$i) {
+ my $j = int rand($i+1);
+ @list[$i,$j] = @list[$j,$i];
+ }
+ @list;
+}
+
+my @ids;
+for my $major (shuffled_list(10)) {
+ for my $minor (shuffled_list(100)) {
+ my $priority = sprintf("%2d%02d", $major, $minor);
+ push @ids, $q->enqueue($priority, $priority);
+ }
+}
+
+sub is_even { !($_[0] % 2) }
+sub is_odd { $_[0] % 2 }
+
+foreach my $id (@ids) { $q->adjust_priority($id, \&is_even, -1000); }
+foreach my $id (@ids) { $q->adjust_priority($id, \&is_odd, 1000); }
+
+my $test_index = 62;
+my $low_priority = -999999;
+
+while (my ($pri, $id, $item) = $q->dequeue_next()) {
+ if ($pri < 0) {
+ ok_if( $test_index++,
+ ($pri > $low_priority) && ($pri - $item == -1000)
+ );
+ }
+ else {
+ ok_if( $test_index++,
+ ($pri > $low_priority) && ($pri - 1000 == $item)
+ );
+ }
+ $low_priority = $pri;
+}
+
+results;
Please sign in to comment.
Something went wrong with that request. Please try again.