Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

file 198 lines (161 sloc) 6.191 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
#!/usr/bin/perl -w

# This program tests the high and low watermarks. It merges the
# wheels from wheels.perl and the chargen service from selects.perl to
# create a wheel-based chargen service.

use strict;
use lib '../lib';
use POE qw(Wheel::SocketFactory Wheel::ReadWrite Driver::SysRW Filter::Line);

my $chargen_port = 32100;

#==============================================================================
# This is a simple TCP server. It answers connections and passes them
# to new chargen service sessions.

package Chargen::Server;
use POE::Session;

# Create a new chargen server. This doesn't create a real object; it
# just spawns a new session. OO purists will hate me for this.
sub new {
  POE::Session->create
    ( inline_states =>
      { _start => \&poe_start,
        accepted => \&poe_accepted,
        error => \&poe_error,
      }
    );
  undef;
}

# The Session has been set up within POE::Kernel, so it's safe to
# begin working. Create a socket factory to listen for new
# connections.
sub poe_start {
  $_[HEAP]->{listener} = POE::Wheel::SocketFactory->new
    ( SuccessEvent => 'accepted',
      FailureEvent => 'error',
      BindPort => $chargen_port,
      Reuse => 'yes',
    );
}

# Start a session to handle successfully connected clients.
sub poe_accepted {
  Chargen::Connection->new($_[ARG0]);
}

# Upon error, log the error and stop the server. Client sessions may
# still be running, and the process will continue until they
# gracefully exit.
sub poe_error {
  warn "Chargen::Server encountered $_[ARG0] error $_[ARG1]: $_[ARG2]\n";
  delete $_[HEAP]->{listener};
}

#==============================================================================
# This is a simple chargen service.

package Chargen::Connection;
use POE::Session;

# Create a new chargen session around a successfully accepted socket.
sub new {
  my ($package, $socket) = @_;
  POE::Session->create
    ( inline_states =>
      { _start => \&poe_start,
        wheel_got_flush => \&poe_got_flush,
        wheel_got_input => \&poe_got_input,
        wheel_got_error => \&poe_got_error,
        wheel_throttle => \&poe_throttle,
        wheel_resume => \&poe_resume,
        write_chunk => \&poe_write_chunk,
      },
      args => [ $socket ],
    );
  undef;
}

# The session was set up within POE::Kernel, so it's safe to begin
# working. Wrap a ReadWrite wheel around the socket, set up some
# persistent variables, and begin writing chunks.
sub poe_start {
  $_[HEAP]->{wheel} = POE::Wheel::ReadWrite->new
    ( Handle => $_[ARG0],
      Driver => POE::Driver::SysRW->new(),
      Filter => POE::Filter::Line->new(),

      InputEvent => 'wheel_got_input',
      ErrorEvent => 'wheel_got_error',

      HighMark => 256,
      LowMark => 128,
      HighEvent => 'wheel_throttle',
      LowEvent => 'wheel_resume',
    );

  $_[HEAP]->{okay_to_send} = 1;
  $_[HEAP]->{start_character} = 32;

  $_[KERNEL]->yield('write_chunk');
}

# The client sent us input. Rather than leaving it on the socket,
# we've read it to ignore it.
sub poe_got_input {
  warn "Chargen session ", $_[SESSION]->ID, " is ignoring some input.\n";
}

# An error occurred. Log it and stop this session. If the parent
# hasn't stopped, then it will continue running.
sub poe_got_error {
  warn( "Chargen session ", $_[SESSION]->ID, " encountered ", $_[ARG0],
        " error $_[ARG1]: $_[ARG2]\n"
      );
  $_[HEAP]->{okay_to_send} = 0;
  delete $_[HEAP]->{wheel};
}

# Write a chunk of data to the client socket.
sub poe_write_chunk {

  # Sometimes a write-chunk event comes in that ought not. This race
  # occurs because water-mark events are called synchronously, while
  # write-chunk events are posted asynchronously. So it may not be
  # okay to write a chunk when we get a write-chunk event.

  if ($_[HEAP]->{okay_to_send}) {

    # Enqueue chunks until ReadWrite->put() signals that its driver's
    # buffer has reached (or exceeded) its high-water mark.

    while (1) {

      # Create a chargen line. Build a 72-column line of consecutive
      # characters, starting with whatever character code we have
      # stored. Wrap characters beyond "~" around to " ".
      my $chargen_line =
        join( '',
              map { chr }
              ($_[HEAP]->{start_character} .. ($_[HEAP]->{start_character}+71))
            );
      $chargen_line =~ tr[\x7F-\xDD][\x20-\x7E];

      # Increment the start character, wrapping \x7F to \x20.
      $_[HEAP]->{start_character} = 32
        if (++$_[HEAP]->{start_character} > 126);

      # Enqueue the line for output. Stop enqueuing lines if the
      # buffer's high water mark is reached.
      last if $_[HEAP]->{wheel}->put($chargen_line);
    }

    # Go around again!
    $_[KERNEL]->yield('write_chunk');
  }

}

# Be impressive. Log that the session has throttled, and set a flag
# so spurious write-chunk events are ignored.

sub poe_throttle {
  warn "Chargen session ", $_[SESSION]->ID, " is throttled.\n";
  $_[HEAP]->{okay_to_send} = 0;
}

# Be impressive, part two. Log that the session has resumed sending,
# and clear the stop-writing flag. Only bother doing this if there's
# still a handle; that way it doesn't keep looping around after an
# error or something.

sub poe_resume {
  if (exists $_[HEAP]->{wheel}) {
    warn "Chargen session ", $_[SESSION]->ID, " is resuming.\n";
    $_[HEAP]->{okay_to_send} = 1;
    $_[KERNEL]->yield('write_chunk');
  }
}

#==============================================================================
# Main loop. Create the server, and run it until something stops it.

package main;

print( "*** If all goes well, a watermarked (self-throttling) chargen\n",
       "*** service will be listening on localhost port 32019. You can\n",
       "*** watch it perform flow control by connecting to it over a slow\n",
       "*** connection or with a client you can pause. The server will\n",
       "*** throttle itself when its output buffer becomes too large, and\n",
       "*** it will resume output when the client receives enough data.\n",
     );
Chargen::Server->new;
$poe_kernel->run();

exit;
Something went wrong with that request. Please try again.