/
tcp_watermarks.perl
162 lines (129 loc) · 4.97 KB
/
tcp_watermarks.perl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/perl -w
# $Id$
# This program tests the high and low watermarks. It merges the
# wheels from wheels.perl and the chargen service from selects.perl to
# create a wheel-based chargen service. It differs from
# watermarks.perl in that it uses a TCP server component.
use strict;
use lib '../lib';
use POE qw(Component::Server::TCP Wheel::ReadWrite Driver::SysRW Filter::Line);
my $chargen_port = 32019;
#==============================================================================
# This is a simple chargen service.
package Chargen::Connection;
use POE::Session;
# Create a new chargen session around a successfully accepted socket.
sub new {
my ($package, $socket) = @_;
POE::Session->create
( inline_states =>
{ _start => \&poe_start,
wheel_got_flush => \&poe_got_flush,
wheel_got_input => \&poe_got_input,
wheel_got_error => \&poe_got_error,
wheel_throttle => \&poe_throttle,
wheel_resume => \&poe_resume,
write_chunk => \&poe_write_chunk,
},
args => [ $socket ],
);
undef;
}
# The session was set up within POE::Kernel, so it's safe to begin
# working. Wrap a ReadWrite wheel around the socket, set up some
# persistent variables, and begin writing chunks.
sub poe_start {
$_[HEAP]->{wheel} = POE::Wheel::ReadWrite->new
( Handle => $_[ARG0],
Driver => POE::Driver::SysRW->new(),
Filter => POE::Filter::Line->new(),
InputEvent => 'wheel_got_input',
ErrorEvent => 'wheel_got_error',
HighMark => 256,
LowMark => 128,
HighEvent => 'wheel_throttle',
LowEvent => 'wheel_resume',
);
$_[HEAP]->{okay_to_send} = 1;
$_[HEAP]->{start_character} = 32;
$_[KERNEL]->yield('write_chunk');
}
# The client sent us input. Rather than leaving it on the socket,
# we've read it to ignore it.
sub poe_got_input {
warn "Chargen session ", $_[SESSION]->ID, " is ignoring some input.\n";
}
# An error occurred. Log it and stop this session. If the parent
# hasn't stopped, then it will continue running.
sub poe_got_error {
warn( "Chargen session ", $_[SESSION]->ID, " encountered ", $_[ARG0],
" error $_[ARG1]: $_[ARG2]\n"
);
$_[HEAP]->{okay_to_send} = 0;
delete $_[HEAP]->{wheel};
}
# Write a chunk of data to the client socket.
sub poe_write_chunk {
# Sometimes a write-chunk event comes in that ought not. This race
# occurs because water-mark events are called synchronously, while
# write-chunk events are posted asynchronously. So it may not be
# okay to write a chunk when we get a write-chunk event.
if ($_[HEAP]->{okay_to_send}) {
# Enqueue chunks until ReadWrite->put() signals that its driver's
# buffer has reached (or exceeded) its high-water mark.
while (1) {
# Create a chargen line. Build a 72-column line of consecutive
# characters, starting with whatever character code we have
# stored. Wrap characters beyond "~" around to " ".
my $chargen_line =
join( '',
map { chr }
($_[HEAP]->{start_character} .. ($_[HEAP]->{start_character}+71))
);
$chargen_line =~ tr[\x7F-\xDD][\x20-\x7E];
# Increment the start character, wrapping \x7F to \x20.
$_[HEAP]->{start_character} = 32
if (++$_[HEAP]->{start_character} > 126);
# Enqueue the line for output. Stop enqueuing lines if the
# buffer's high water mark is reached.
last if $_[HEAP]->{wheel}->put($chargen_line);
}
# Go around again!
$_[KERNEL]->yield('write_chunk');
}
}
# Be impressive. Log that the session has throttled, and set a flag
# so spurious write-chunk events are ignored.
sub poe_throttle {
warn "Chargen session ", $_[SESSION]->ID, " is throttled.\n";
$_[HEAP]->{okay_to_send} = 0;
}
# Be impressive, part two. Log that the session has resumed sending,
# and clear the stop-writing flag. Only bother doing this if there's
# still a handle; that way it doesn't keep looping around after an
# error or something.
sub poe_resume {
if (exists $_[HEAP]->{wheel}) {
warn "Chargen session ", $_[SESSION]->ID, " is resuming.\n";
$_[HEAP]->{okay_to_send} = 1;
$_[KERNEL]->yield('write_chunk');
}
}
#==============================================================================
# Main loop. Create the server, and run it until something stops it.
package main;
print( "*** If all goes well, a watermarked (self-throttling) chargen\n",
"*** service will be listening on localhost port 32019. You can\n",
"*** watch it perform flow control by connecting to it over a slow\n",
"*** connection or with a client you can pause. The server will\n",
"*** throttle itself when its output buffer becomes too large, and\n",
"*** it will resume output when the client receives enough data.\n",
);
POE::Component::Server::TCP->new
( Port => $chargen_port,
Acceptor => sub {
Chargen::Connection->new($_[ARG0]);
},
);
$poe_kernel->run();
exit;