Skip to content
Browse files

Matt Sergeant contributed POE::Kernel::Poll, an event loop substrate

based on IO::Poll.  IO::Poll is potentially faster than select() in
large scale servers (and other multi-connection programs).  I also
tweaked a description in the 5005 Makefile so the intent of some
modules is more clear.  Test 27_poll.t is neat, and I'm glad that it
works.  It reruns test 04_selects.t with IO::Poll loaded, forcing
POE::Kernel::Poll to be used instead of POE::Kernel::Select.
  • Loading branch information...
1 parent d02c44e commit da6a2f20a260a910c2ccdc452fa170d8059eebf9 @rcaputo committed May 30, 2002
Showing with 486 additions and 11 deletions.
  1. +2 −0 MANIFEST
  2. +7 −2 lib/POE.pm
  3. +27 −6 lib/POE/Kernel.pm
  4. +427 −0 lib/POE/Loop/IO_Poll.pm
  5. +5 −1 mylib/Makefile-5005.pm
  6. +2 −2 tests/08_errors.t
  7. +16 −0 tests/27_poll.t
View
2 MANIFEST
@@ -22,6 +22,7 @@ POE/Filter/Stream.pm
POE/Kernel.pm
POE/Kernel/Event.pm
POE/Kernel/Gtk.pm
+POE/Kernel/Poll.pm
POE/Kernel/Select.pm
POE/Kernel/Tk.pm
POE/NFA.pm
@@ -105,3 +106,4 @@ t/23_nfa.t
t/24_filter_stack.t
t/25_detach.t
t/26_comp_tcp.t
+t/27_poll.t
View
9 lib/POE.pm
@@ -754,8 +754,8 @@ from this list, please let Rocco know.
Ann Barcomb is <kudra@domaintje.com>, aka C<kudra>. Ann contributed
large portions of POE::Simple and the code that became the ReadWrite
-support in POE::Component::Server::TCP. Her ideas were also used in
-the Client::TCP component introduced in version 0.1702.
+support in POE::Component::Server::TCP. Her ideas also inspired
+Client::TCP component, introduced in version 0.1702.
=item Artur Bergman
@@ -857,6 +857,11 @@ corruption bug that POE tickled in earlier Perl versions. In the end,
his work produced a simple compile-time hack that worked around a
problem relating to anonymous subs, scope and @{} processing.
+=item Matt Sergeant
+
+Matt contributed POE::Kernel::Poll, a more efficient way to watch
+multiple files than select().
+
=item Richard Soderberg
Richard Soderberg is <poe@crystalflame.net>, aka C<coral>. Richard is
View
33 lib/POE/Kernel.pm
@@ -633,13 +633,15 @@ macro enqueue_ready_selects (<fileno>,<vector>) {
sub SUBSTRATE_NAME_EVENT () { 'Event.pm' }
sub SUBSTRATE_NAME_GTK () { 'Gtk.pm' }
+sub SUBSTRATE_NAME_POLL () { 'Poll.pm' }
sub SUBSTRATE_NAME_SELECT () { 'select()' }
sub SUBSTRATE_NAME_TK () { 'Tk.pm' }
sub SUBSTRATE_EVENT () { 0x01 }
sub SUBSTRATE_GTK () { 0x02 }
-sub SUBSTRATE_SELECT () { 0x04 }
-sub SUBSTRATE_TK () { 0x08 }
+sub SUBSTRATE_POLL () { 0x04 }
+sub SUBSTRATE_SELECT () { 0x08 }
+sub SUBSTRATE_TK () { 0x10 }
BEGIN {
if (exists $INC{'Gtk.pm'}) {
@@ -657,6 +659,11 @@ BEGIN {
POE::Kernel::Event->import();
}
+ if (exists $INC{'IO/Poll.pm'}) {
+ require POE::Kernel::Poll;
+ POE::Kernel::Poll->import();
+ }
+
unless (defined &POE_SUBSTRATE) {
require POE::Kernel::Select;
POE::Kernel::Select->import();
@@ -3331,11 +3338,11 @@ written entirely in Perl. To use it, simply:
use POE;
-POE's event loop will also work cooperatively with Gtk's, Tk's or
-Event's. POE will see one of these three modules if it's used first
-and change its behavior accordingly.
+POE can adapt itself to work with other event loops and I/O multiplex
+systems. Currently it adapts to Gtk, Tk, Event.pm, or IO::Poll when
+one of those modules is used before POE::Kernel.
- use Gtk; # or use Tk; or use Event;
+ use Gtk; # Or Tk, Event, or IO::Poll;
use POE;
Methods to manage the process' global Kernel instance:
@@ -4623,11 +4630,25 @@ allows it to implement safe signals.
This loop allows POE to work in graphical programs using the Gtk-Perl
library.
+ use Gtk;
+ use POE;
+
+=item IO::Poll
+
+IO::Poll is potentially more efficient than POE's default select()
+code in large scale clients and servers.
+
+ use IO::Poll;
+ use POE;
+
=item Tk's Event Loop
This loop allows POE to work in graphical programs using the Tk-Perl
library.
+ use Event;
+ use POE;
+
=back
External event loops expect plain coderefs as callbacks. POE::Session
View
427 lib/POE/Loop/IO_Poll.pm
@@ -0,0 +1,427 @@
+# $Id#
+
+# IO::Poll substrate for POE::Kernel. The theory is that this will be
+# faster for large scale applications. This file is contributed by
+# Matt Sergeant (baud).
+
+# Empty package to appease perl.
+package POE::Kernel::Poll;
+
+use vars qw($VERSION);
+$VERSION = (qw($Revision$ ))[1];
+
+# Everything plugs into POE::Kernel;
+package POE::Kernel;
+use POE::Preprocessor;
+
+use strict;
+
+# Ensure that no other substrate module has been loaded.
+BEGIN {
+ die( "POE can't use IO::Poll and " . &POE_SUBSTRATE_NAME . "\n" )
+ if defined &POE_SUBSTRATE;
+};
+
+# Declare the substrate we're using.
+sub POE_SUBSTRATE () { SUBSTRATE_POLL }
+sub POE_SUBSTRATE_NAME () { SUBSTRATE_NAME_POLL }
+
+use IO::Poll qw(POLLRDNORM POLLWRNORM POLLIN POLLOUT POLLERR POLLHUP);
+
+sub MINIMUM_POLL_TIMEOUT () { 0 }
+sub POLL_ALL () { POLLIN | POLLOUT | POLLERR }
+
+#------------------------------------------------------------------------------
+# Signal handlers.
+
+sub _substrate_signal_handler_generic {
+ TRACE_SIGNALS and warn "\%\%\% Enqueuing generic SIG$_[0] event...\n";
+ $poe_kernel->_enqueue_event
+ ( $poe_kernel, $poe_kernel,
+ EN_SIGNAL, ET_SIGNAL,
+ [ $_[0] ],
+ time(), __FILE__, __LINE__
+ );
+ $SIG{$_[0]} = \&_substrate_signal_handler_generic;
+}
+
+sub _substrate_signal_handler_pipe {
+ TRACE_SIGNALS and warn "\%\%\% Enqueuing PIPE-like SIG$_[0] event...\n";
+ $poe_kernel->_enqueue_event
+ ( $poe_kernel, $poe_kernel,
+ EN_SIGNAL, ET_SIGNAL,
+ [ $_[0] ],
+ time(), __FILE__, __LINE__
+ );
+ $SIG{$_[0]} = \&_substrate_signal_handler_pipe;
+}
+
+# Special handler. Stop watching for children; instead, start a loop
+# that polls for them.
+sub _substrate_signal_handler_child {
+ TRACE_SIGNALS and warn "\%\%\% Enqueuing CHLD-like SIG$_[0] event...\n";
+ $SIG{$_[0]} = 'DEFAULT';
+ $poe_kernel->_enqueue_event
+ ( $poe_kernel, $poe_kernel,
+ EN_SCPOLL, ET_SCPOLL, [ ],
+ time(), __FILE__, __LINE__
+ );
+}
+
+#------------------------------------------------------------------------------
+# Signal handler maintenance macros.
+
+macro substrate_watch_signal {
+ # Child process has stopped.
+ if ($signal eq 'CHLD' or $signal eq 'CLD') {
+
+ # Begin constant polling loop. Only start it on CHLD or on CLD if
+ # CHLD doesn't exist.
+ $SIG{$signal} = 'DEFAULT';
+ $poe_kernel->_enqueue_event
+ ( $poe_kernel, $poe_kernel,
+ EN_SCPOLL, ET_SCPOLL,
+ [ ],
+ time() + 1, __FILE__, __LINE__
+ ) if $signal eq 'CHLD' or not exists $SIG{CHLD};
+
+ next;
+ }
+
+ # Broken pipe.
+ if ($signal eq 'PIPE') {
+ $SIG{$signal} = \&_substrate_signal_handler_pipe;
+ next;
+ }
+
+ # Artur Bergman (sky) noticed that xterm resizing can generate a LOT
+ # of WINCH signals. That rapidly crashes perl, which, with the help
+ # of most libc's, can't handle signals well at all. We ignore
+ # WINCH, therefore.
+ next if $signal eq 'WINCH';
+
+ # Everything else.
+ $SIG{$signal} = \&_substrate_signal_handler_generic;
+}
+
+macro substrate_resume_watching_child_signals {
+ $SIG{CHLD} = 'DEFAULT' if exists $SIG{CHLD};
+ $SIG{CLD} = 'DEFAULT' if exists $SIG{CLD};
+ $poe_kernel->_enqueue_event
+ ( $poe_kernel, $poe_kernel,
+ EN_SCPOLL, ET_SCPOLL, [ ],
+ time() + 1, __FILE__, __LINE__
+ ) if keys(%kr_sessions) > 1;
+}
+
+#------------------------------------------------------------------------------
+# Event watchers and callbacks.
+
+### Time.
+
+macro substrate_resume_time_watcher {
+ # does nothing
+}
+
+macro substrate_reset_time_watcher {
+ # does nothing
+}
+
+macro substrate_pause_time_watcher {
+ # does nothing
+}
+
+sub vec_to_poll {
+ return POLLIN if $_[0] == VEC_RD;
+ return POLLOUT if $_[0] == VEC_WR;
+ return POLLERR if $_[0] == VEC_EX;
+ croak "unknown I/O vector $_[0]";
+}
+
+### Filehandles.
+
+macro substrate_watch_filehandle (<fileno>,<vector>) {
+ # Cheat. $handle comes from the user's scope.
+
+ my $type = vec_to_poll(<vector>);
+ my $current = $POE::Kernel::Poll::KR_Poll->mask($handle) || 0;
+
+ TRACE_SELECT and
+ warn( sprintf( "Watch " . <fileno> .
+ ": Current mask: 0x%02X - combine with 0x%02X = 0x%02X\n",
+ $current, $type, $current | $type
+ )
+ );
+
+ $POE::Kernel::Poll::KR_Poll->mask($handle, $current | $type);
+ $kr_fno_vec->[FVC_ST_ACTUAL] = HS_RUNNING;
+ $kr_fno_vec->[FVC_ST_REQUEST] = HS_RUNNING;
+}
+
+macro substrate_ignore_filehandle (<fileno>,<vector>) {
+ # Cheat. $handle comes from the user's scope.
+
+ my $type = vec_to_poll(<vector>);
+ my $current = $POE::Kernel::Poll::KR_Poll->mask($handle) || 0;
+
+ TRACE_SELECT and
+ warn( sprintf( "Ignore ". <fileno> .
+ ": Current mask: 0x%02X - combine with 0x%02X = 0x%02X\n",
+ $current, $type, $current & ~$type
+ )
+ );
+
+ $POE::Kernel::Poll::KR_Poll->mask($handle, $current & ~$type);
+ $kr_fno_vec->[FVC_ST_ACTUAL] = HS_STOPPED;
+ $kr_fno_vec->[FVC_ST_REQUEST] = HS_STOPPED;
+}
+
+macro substrate_pause_filehandle_watcher (<fileno>,<vector>) {
+ # Cheat. $handle comes from the user's scope.
+
+ my $type = vec_to_poll(<vector>);
+ my $current = $POE::Kernel::Poll::KR_Poll->mask($handle) || 0;
+
+ TRACE_SELECT and
+ warn( sprintf( "Pause " . <fileno> .
+ ": Current mask: 0x%02X - combine with 0x%02X = 0x%02X\n",
+ $current, $type, $current & ~$type
+ )
+ );
+
+ $POE::Kernel::Poll::KR_Poll->mask($handle, $current & ~$type);
+ $kr_fno_vec->[FVC_ST_ACTUAL] = HS_PAUSED;
+}
+
+macro substrate_resume_filehandle_watcher (<fileno>,<vector>) {
+ # Cheat. $handle comes from the user's scope.
+
+ my $type = vec_to_poll(<vector>);
+ my $current = $POE::Kernel::Poll::KR_Poll->mask($handle) || 0;
+
+ TRACE_SELECT and
+ warn( sprintf( "Resume " . <fileno> .
+ ": Current mask: 0x%02X - combine with 0x%02X = 0x%02X\n",
+ $current, $type, $current | $type
+ )
+ );
+
+ $POE::Kernel::Poll::KR_Poll->mask($handle, $current | $type);
+ $kr_fno_vec->[FVC_ST_ACTUAL] = HS_RUNNING;
+}
+
+macro substrate_define_callbacks {
+ # does nothing
+}
+
+#------------------------------------------------------------------------------
+# Main loop management.
+
+macro substrate_init_main_loop {
+ # Initialize the vectors as vectors.
+ $POE::Kernel::Poll::KR_Poll = IO::Poll->new();
+}
+
+macro substrate_do_timeslice {
+ # Check for a hung kernel.
+ {% test_for_idle_poe_kernel %}
+
+ # Set the poll timeout based on current queue conditions. If there
+ # are FIFO events, then the poll timeout is zero and move on.
+ # Otherwise set the poll timeout until the next pending event, if
+ # there are any. If nothing is waiting, set the timeout for some
+ # constant number of seconds.
+
+ my $now = time();
+ my $timeout;
+
+ if (@kr_events) {
+ $timeout = $kr_events[0]->[ST_TIME] - $now;
+ $timeout = MINIMUM_POLL_TIMEOUT if $timeout < MINIMUM_POLL_TIMEOUT;
+ }
+ else {
+ $timeout = 3600;
+ }
+
+ if (TRACE_QUEUE) {
+ warn( '*** Kernel::run() iterating. ' .
+ sprintf("now(%.4f) timeout(%.4f) then(%.4f)\n",
+ $now-$^T, $timeout, ($now-$^T)+$timeout
+ )
+ );
+ warn( '*** Event times: ' .
+ join( ', ',
+ map { sprintf('%d=%.4f',
+ $_->[ST_SEQ], $_->[ST_TIME] - $now
+ )
+ } @kr_events
+ ) .
+ "\n"
+ );
+ }
+
+ # Ensure that the event queue remains in time order.
+ if (ASSERT_EVENTS and @kr_events) {
+ my $previous_time = $kr_events[0]->[ST_TIME];
+ foreach (@kr_events) {
+ die "event $_->[ST_SEQ] is out of order"
+ if $_->[ST_TIME] < $previous_time;
+ $previous_time = $_->[ST_TIME];
+ }
+ }
+
+ my $filenos = $POE::Kernel::Poll::KR_Poll->handles();
+
+ # Avoid looking at filehandles if we don't need to. -><- The added
+ # code to make this sleep is non-optimal. There is a way to do this
+ # in fewer tests.
+
+ if ($timeout or $filenos) {
+
+ # There are filehandles to poll, so do so.
+
+ if ($filenos) {
+ # Check filehandles, or wait for a period of time to elapse.
+ my $hits = $POE::Kernel::Poll::KR_Poll->poll($timeout);
+
+ if (ASSERT_SELECT) {
+ if ($hits < 0) {
+ confess "poll error: $!"
+ unless ( ($! == EINPROGRESS) or
+ ($! == EWOULDBLOCK) or
+ ($! == EINTR)
+ );
+ }
+ }
+
+ if (TRACE_SELECT) {
+ if ($hits > 0) {
+ warn "poll hits = $hits\n";
+ }
+ elsif ($hits == 0) {
+ warn "poll timed out...\n";
+ }
+ }
+
+ # If poll has seen filehandle activity, then gather up the
+ # active filehandles and synchronously dispatch events to the
+ # appropriate handlers.
+
+ if ($hits > 0) {
+
+ # This is where they're gathered.
+
+ my @rd_selects =
+ ( map { fileno($_) }
+ $POE::Kernel::Poll::KR_Poll->handles( POLLIN )
+ );
+ my @wr_selects =
+ ( map { fileno($_) }
+ $POE::Kernel::Poll::KR_Poll->handles( POLLOUT )
+ );
+ my @ex_selects =
+ ( map { fileno($_) }
+ $POE::Kernel::Poll::KR_Poll->handles( POLLERR )
+ );
+
+ if (TRACE_SELECT) {
+ if (@rd_selects) {
+ warn( "found pending rd selects: ",
+ join( ', ', sort { $a <=> $b } @rd_selects ),
+ "\n"
+ );
+ }
+ if (@wr_selects) {
+ warn( "found pending wr selects: ",
+ join( ', ', sort { $a <=> $b } @wr_selects ),
+ "\n"
+ );
+ }
+ if (@ex_selects) {
+ warn( "found pending ex selects: ",
+ join( ', ', sort { $a <=> $b } @ex_selects ),
+ "\n"
+ );
+ }
+ }
+
+ # IO::Poll often returns a $hits that doesn't match the number
+ # of handles that handles() returns. This ASSERT_SELECT has
+ # been disabled since it's not true for IO::Poll.
+ if (0 && ASSERT_SELECT) {
+ unless (@rd_selects or @wr_selects or @ex_selects) {
+ die "found no selects, with $hits hits from poll???\a\n";
+ }
+ }
+
+ # Enqueue the gathered selects, and flag them as temporarily
+ # paused. They'll resume after dispatch.
+
+ foreach my $fileno (@rd_selects) {
+ {% enqueue_ready_selects $fileno, VEC_RD %}
+ }
+
+ foreach my $fileno (@wr_selects) {
+ {% enqueue_ready_selects $fileno, VEC_WR %}
+ }
+
+ foreach my $fileno (@ex_selects) {
+ {% enqueue_ready_selects $fileno, VEC_EX %}
+ }
+ }
+ }
+
+ # No filehandles to poll on. Try to sleep instead. Use sleep()
+ # itself on MSWin32. Use a dummy four-argument select() everywhere
+ # else.
+
+ else {
+ if ($^O eq 'MSWin32') {
+ sleep($timeout);
+ }
+ else {
+ select(undef, undef, undef, $timeout);
+ }
+ }
+ }
+
+ # Dispatch whatever events are due.
+
+ $now = time();
+ while ( @kr_events and ($kr_events[0]->[ST_TIME] <= $now) ) {
+ my $event;
+
+ if (TRACE_QUEUE) {
+ $event = $kr_events[0];
+ warn( sprintf('now(%.4f) ', $now - $^T) .
+ sprintf('sched_time(%.4f) ', $event->[ST_TIME] - $^T) .
+ "seq($event->[ST_SEQ]) " .
+ "name($event->[ST_NAME])\n"
+ );
+ }
+
+ # Pull an event off the queue, and dispatch it.
+ $event = shift @kr_events;
+ delete $kr_event_ids{$event->[ST_SEQ]};
+ {% ses_refcount_dec2 $event->[ST_SESSION], SS_EVCOUNT %}
+ {% ses_refcount_dec2 $event->[ST_SOURCE], SS_POST_COUNT %}
+ $self->_dispatch_event(@$event);
+ }
+}
+
+macro substrate_main_loop {
+ # Run for as long as there are sessions to service.
+ while (keys %kr_sessions) {
+ {% substrate_do_timeslice %}
+ }
+}
+
+macro substrate_stop_main_loop {
+ # does nothing
+}
+
+sub signal_ui_destroy {
+ # does nothing
+}
+
+1;
View
6 mylib/Makefile-5005.pm
@@ -32,10 +32,14 @@ ExtUtils::AutoInstall->import
Socket => '',
'Filter::Util::Call' => 1.04,
],
- "Recommended modules for high-resolution timers." => [
+ "Recommended modules to increase timer/alarm/delay accuracy." => [
-default => 0,
'Time::HiRes' => '',
],
+ "Optional modules to speed up large-scale clients/servers." => [
+ -default => 0,
+ IO::Poll => 0.05,
+ ],
"Optional modules for controlling full-screen programs (e.g. vi)." => [
-default => 0,
'IO::Pty' => '1.02',
View
4 tests/08_errors.t
@@ -20,10 +20,10 @@ BEGIN {
qw( POE_USES_TIME_HIRES
SUBSTRATE_NAME_EVENT SUBSTRATE_NAME_GTK SUBSTRATE_NAME_SELECT
- SUBSTRATE_NAME_TK
+ SUBSTRATE_NAME_TK SUBSTRATE_NAME_POLL
SUBSTRATE_EVENT SUBSTRATE_GTK SUBSTRATE_SELECT SUBSTRATE_TK
- POE_SUBSTRATE POE_SUBSTRATE_NAME
+ SUBSTRATE_POLL POE_SUBSTRATE POE_SUBSTRATE_NAME
_substrate_signal_handler_generic
_substrate_signal_handler_pipe
View
16 tests/27_poll.t
@@ -0,0 +1,16 @@
+#!/usr/bin/perl -w
+# $Id$
+
+# Rerun t/04_selects.t but with IO::Poll instead.
+
+use strict;
+
+BEGIN {
+ eval 'use IO::Poll';
+ &test_setup(0, "need IO::Poll to test POE's support for that module")
+ if length($@) or not exists $INC{'IO/Poll.pm'};
+}
+
+require 't/04_selects.t';
+
+exit;

0 comments on commit da6a2f2

Please sign in to comment.
Something went wrong with that request. Please try again.