Skip to content

Commit

Permalink
Merge pull request #15009 from mlschroe/master
Browse files Browse the repository at this point in the history
[backend] use standard keepalive mechanism in bs_notifyforward
  • Loading branch information
mlschroe committed Oct 5, 2023
2 parents ab0799d + cbcffc0 commit 0c5436c
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 101 deletions.
31 changes: 19 additions & 12 deletions src/backend/BSRPC.pm
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,7 @@ sub createreq {
@xhdrs = grep {!/^authorization:/i} @xhdrs;
delete $param->{'authenticator'};
}
if (!$param->{'keepalive'} || ($act ne 'GET' && $act ne 'HEAD')) {
unshift @xhdrs, "Connection: close" unless $param->{'noclose'};
}
unshift @xhdrs, "Connection: close" unless $param->{'noclose'} || $param->{'keepalive'};
unshift @xhdrs, "User-Agent: $useragent" unless !defined($useragent) || grep {/^user-agent:/si} @xhdrs;
unshift @xhdrs, "Host: $hostport" unless grep {/^host:/si} @xhdrs;
if (defined $auth) {
Expand Down Expand Up @@ -382,27 +380,34 @@ sub rpc {

# connect to server
my $keepalive;
my $keepalivecookie;
my ($keepalivecookie, $keepalivecount, $keepalivestart);
my $sock;
my $is_ssl;
if (exists($param->{'socket'})) {
$sock = $param->{'socket'};
} else {
die("rpc continuation without socket\n") if $param->{'continuation'};
my $hostaddr = lookuphost($host, $port, \%hostlookupcache);
die("unknown host '$host'\n") unless $hostaddr;
$keepalive = $param->{'keepalive'};
$keepalivecookie = "$hostaddr/".($proxytunnel || '');
if (!$param->{'continuation'} && $keepalive && $keepalive->{'socket'}) {
my $request = $param->{'request'} || 'GET';
if ($keepalive->{'cookie'} eq $keepalivecookie && ($request eq 'GET' || $request eq 'HEAD')) {
if ($keepalive && $keepalive->{'socket'}) {
if (($keepalive->{'cookie'} || '') eq $keepalivecookie) {
$sock = $keepalive->{'socket'};
$keepalivestart = $keepalive->{'start'} || time();
$keepalivecount = ($keepalive->{'count'} || 0) + 1;
%$keepalive = ();
verify_sslpeerfingerprint($sock, $param->{'sslpeerfingerprint'}) if $param->{'sslpeerfingerprint'} && ($proto eq 'https' || $proxytunnel);
} else {
close($keepalive->{'socket'});
%$keepalive = ();
}
}
%$keepalive = () if $keepalive; # clean old data in case we die
if (!$sock) {
if ($keepalive) {
$keepalivestart = time();
$keepalivecount = 1;
}
$sock = opensocket($hostaddr);
connect($sock, $hostaddr) || die("connect to $host:$port: $!\n");
if ($proxytunnel) {
Expand Down Expand Up @@ -479,7 +484,6 @@ sub rpc {
BSHTTP::gethead(\%headers, $headers);

# no keepalive if the server says so
%$keepalive = () if $keepalive;
undef $keepalive if lc($headers{'connection'} || '') eq 'close';
undef $keepalive if !defined($headers{'content-length'}) && lc($headers{'transfer-encoding'} || '') ne 'chunked';

Expand All @@ -499,7 +503,6 @@ sub rpc {
#}
if ($status =~ /^30[27][^\d]/ && ($param->{'ignorestatus'} || 0) != 2) {
close $sock;
%$keepalive = () if $keepalive;
die("error: no redirects allowed\n") unless defined $param->{'maxredirects'};
die("error: status 302 but no 'location' header found\n") unless exists $headers{'location'};
die("error: max number of redirects reached\n") if $param->{'maxredirects'} < 1;
Expand All @@ -515,7 +518,6 @@ sub rpc {
my $auth = $param->{'authenticator'}->($param, $headers{'www-authenticate'}, \%headers);
if ($auth) {
close $sock;
%$keepalive = () if $keepalive;
my %myparam = %$param;
delete $myparam{'authenticator'};
$myparam{'headers'} = [ grep {!/^authorization:/i} @{$myparam{'headers'} || []} ];
Expand All @@ -525,7 +527,6 @@ sub rpc {
}
if (!$param->{'ignorestatus'}) {
close $sock;
%$keepalive = () if $keepalive;
die("$1 remote error: $2 ($uri)\n") if $status =~ /^(\d+) +(.*?)$/;
die("remote error: $status\n");
}
Expand All @@ -543,6 +544,9 @@ sub rpc {
if ($keepalive) {
$keepalive->{'socket'} = $sock;
$keepalive->{'cookie'} = $keepalivecookie;
$keepalive->{'start'} = $keepalivestart;
$keepalive->{'count'} = $keepalivecount;
$keepalive->{'last'} = time();
} else {
close $sock;
undef $sock;
Expand All @@ -563,6 +567,9 @@ sub rpc {
if ($keepalive && $sock) {
$keepalive->{'socket'} = $sock;
$keepalive->{'cookie'} = $keepalivecookie;
$keepalive->{'start'} = $keepalivestart;
$keepalive->{'count'} = $keepalivecount;
$keepalive->{'last'} = time();
} else {
close $sock if $sock;
}
Expand Down
76 changes: 71 additions & 5 deletions src/backend/BSServer.pm
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ sub serverstatus_str {
$state = 'F';
} elsif ($state == 2) {
$state = 'R';
} elsif ($state == 3) {
$state = 'K';
} else {
$state = '?';
}
Expand All @@ -260,6 +262,13 @@ sub serverstatus_str {
return $str;
}

sub serverstatus_close {
if ($serverstatus_ok) {
close(STA);
undef $serverstatus_ok;
}
}

sub maxchildreached {
my ($conf, $what, $group, $full, $data) = @_;
my $now = time();
Expand Down Expand Up @@ -297,6 +306,25 @@ sub dump_child_pids {
}
}

sub wait_for_request {
my ($sock, $timeout) = @_;
while (1) {
my $t;
if (defined($timeout)) {
$t = $timeout - time();
return undef if $t < 0;
}
my $rin = '';
vec($rin, fileno($sock), 1) = 1;
my $r = select($rin, undef, undef, $t);
if (!defined($r) || $r == -1) {
die("select: $!\n") unless $! == POSIX::EINTR;
next;
}
return 1 if $r;
}
}

sub server {
my ($conf) = @_;

Expand Down Expand Up @@ -486,6 +514,8 @@ sub server {
return $req;
}

redo_keepalive:

eval {
do {
local $SIG{'ALRM'} = sub {print "read request timout for peer $req->{'peer'}\n" ; POSIX::_exit(0);};
Expand All @@ -503,11 +533,41 @@ sub server {
}
};
return @{$req->{'returnfromserver'}} if $req->{'returnfromserver'} && !$@;
reply_error($conf, $@) if $@;
close $clnt;
undef $clnt;
delete $req->{'__socket'};
if ($@) {
exit(0) if $req->{'keepalive_count'} && $@ eq "empty query\n";
if ($req->{'replying'}) {
delete $req->{'allow_keepalive'};
delete $req->{'__do_keepalive'};
}
reply_error($conf, $@);
}
if (!$req->{'__do_keepalive'}) {
close $clnt;
undef $clnt;
delete $req->{'__socket'};
}
log_slow_requests($conf, $req) if $conf->{'slowrequestlog'};

if ($req->{'__do_keepalive'} && $clnt) {
# clean up req
if ($serverstatus_ok) {
if (defined(sysseek(STA, $BSServer::slot * 256, Fcntl::SEEK_SET))) {
syswrite(STA, pack("NNCCnZ244", time(), $$, $group, 0, 3, 'keepalive'), 256);
}
}
# wait for the next request
my $keepalive_maxidle = $req->{'keepalive_maxidle'} || 10;
my $r = wait_for_request($clnt, time() + $keepalive_maxidle);
if ($r) {
my %nreq = ( 'peer' => 'unknown', 'conf' => $conf, 'server' => $server, 'starttime' => time(), 'group' => $group, '__socket' => $clnt );
exists($req->{$_}) and $nreq{$_} = $req->{$_} for qw{peer peerip peerport keepalive_count keepalive_start};
$nreq{'keepalive_count'}++;
$nreq{'keepalive_start'} ||= $req->{'starttime'} || time();
%$req = %nreq;
goto redo_keepalive;
}
}

exit(0);
}

Expand All @@ -523,6 +583,12 @@ sub reply {
my ($str, @hdrs) = @_;

my $req = $BSServer::request || {};
delete $req->{'__do_keepalive'};
if ($req->{'allow_keepalive'}) {
if ($req->{'headers'} && ($req->{'headers'}->{'connection'} || 'keep-alive') eq 'keep-alive') {
$req->{'__do_keepalive'} = 1;
}
}
if (@hdrs && $hdrs[0] =~ /^status: ((\d+).*)/i) {
my $msg = $1;
$msg =~ s/:/ /g;
Expand All @@ -533,7 +599,7 @@ sub reply {
$req->{'statuscode'} ||= 200;
}
push @hdrs, "Cache-Control: no-cache" unless grep {/^cache-control:/i} @hdrs;
push @hdrs, "Connection: close";
push @hdrs, "Connection: close" unless $req->{'__do_keepalive'};
push @hdrs, "Content-Length: ".length($str) if defined($str);
my $data = join("\r\n", @hdrs)."\r\n\r\n";
$data .= $str if defined($str) && ($req->{'action'} || '') ne 'HEAD';
Expand Down
70 changes: 9 additions & 61 deletions src/backend/bs_notifyforward
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ BEGIN {
}

use POSIX;
use Data::Dumper;
use Fcntl qw(:DEFAULT :flock);
use XML::Structured ':bytes';
use Getopt::Long ();
Expand Down Expand Up @@ -94,76 +93,25 @@ sub markdone {
syswrite($markfd, "|", 1) == 1 || die("syswrite: $!\n");
}

my $multirequestsock;
my $multirequestcnt = 0;
my $multirequeststart = 0;

sub ansreqreceiver {
my ($ansreq, $param) = @_;
return unless $ansreq->{'headers'}->{'x-multirequest'};
BSHTTP::read_data($ansreq, undef, 1);
die("ansreq contains data: $ansreq->{'__data'}\n") if length($ansreq->{'__data'});
open($multirequestsock, '+<&', $ansreq->{'__socket'});
die("socket dup failed\n") unless $multirequestsock;
print "entering multirequest mode\n";
$multirequestcnt = 0;
$multirequeststart = time();
}

sub multirequestrpc {
my ($sock, $redisdata) = @_;

my $pkg = BSUtil::tostorable($redisdata);
substr($pkg, 0, 0, sprintf("CL%030d", length($pkg)));
BSHTTP::swrite($sock, $pkg);
my $ans = '';
while (1) {
my $r = sysread($sock, $ans, 1);
last if $r;
die("received truncated answer: $!\n") if !defined($r) && $! != POSIX::EINTR && $! != POSIX::EWOULDBLOCK;
die("received truncated answer\n") if defined $r;
}
if ($ans ne '0') {
my ($status) = BSRPC::readanswerheaderblock($sock, $ans);
die("remote error: $status\n");
}
}
my $redis_keepalive = {};

sub forwardredis {
my ($redisdata, $markfd, $markoffs) = @_;

if ($multirequestsock && ($multirequestcnt++ >= 64 || $multirequeststart + 300 < time())) {
close($multirequestsock);
$multirequestsock = undef;
}
if ($multirequestsock) {
local $SIG{'ALRM'} = sub { alarm(0); die("multirequest rpc timeout\n") };
eval {
alarm(300);
multirequestrpc($multirequestsock, $redisdata);
};
alarm(0);
if (!$@) {
markdone($markfd, $_) for @$markoffs;
print "forwarded ".@$markoffs." redis notifications (multirequest mode)\n";
return;
}
warn($@);
close($multirequestsock);
$multirequestsock = undef;
}

%$redis_keepalive = () if $redis_keepalive->{'start'} && ($redis_keepalive->{'count'} >= 64 || $redis_keepalive->{'start'} + 300 < time());
my $param = {
'uri' => "$BSConfig::srcserver/notify/redis",
'request' => 'POST',
'timeout' => 300,
'headers' => [ 'Content-Type: application/octet-stream' ],
'data' => BSUtil::tostorable($redisdata),
'receiver' => \&ansreqreceiver,
'keepalive' => $redis_keepalive,
'verbose' => 1,
};
my @args;
push @args, 'multirequest=1' if $BSConfig::srcserver =~ /^http:/; # sock dup only works for http
BSRPC::rpc($param, undef, @args);
eval { BSRPC::rpc($param, undef, 'keepalive=1') };
if ($@) {
%$redis_keepalive = ();
die($@);
}
markdone($markfd, $_) for @$markoffs;
print "forwarded ".@$markoffs." redis notifications\n";
}
Expand Down
2 changes: 2 additions & 0 deletions src/backend/bs_serverstatus
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ while(1) {
$state = 'F';
} elsif ($state == 2) {
$state = 'R';
} elsif ($state == 3) {
$state = 'K';
} else {
$state = '?';
}
Expand Down
44 changes: 21 additions & 23 deletions src/backend/bs_srcserver
Original file line number Diff line number Diff line change
Expand Up @@ -6947,28 +6947,22 @@ sub add_redis_notifications {
BSUtil::ping("$redisdir/.ping") unless $oldsize > 0;
}

sub redis_multirequest_server {
print "entering multirequest mode\n";
BSStdServer::stdreply($BSStdServer::return_ok, 'X-Multirequest: true');
return undef if xfork();
sub external_notification_redis_dispatch {
my ($conf, $req) = @_;
die("redis keepalive process: must be POST:/notify/redis request\n") if "$req->{'action'}:$req->{'path'}" ne 'POST:/notify/redis';
return external_notification_redis(BSDispatch::parse_cgi_singles($req));
}

sub start_redis_keepalive_process {
my $req = $BSServer::request;
my $sock = $req->{'__socket'};
while (1) {
while (length($req->{'__data'}) < 32) {
my $r = sysread($sock, $req->{'__data'}, 8192, length($req->{'__data'}));
if (!$r && defined($r)) {
print "multirequest process exiting due to eof\n";
exit(0);
}
die("read error\n") if !defined($r) && ($! != POSIX::EINTR && $! != POSIX::EWOULDBLOCK);
}
my $hdr = substr($req->{'__data'}, 0, 32, '');
die("bad content header: $hdr\n") unless ($hdr =~ /^CL(\d{30})$/s) && $1;
$req->{'__cl'} = $1;
delete $req->{'__eof'};
my $payload = BSUtil::fromstorable(BSServer::read_data());
add_redis_notifications($payload);
BSHTTP::swrite($sock, "0");
# background us
print "starting redis keepalive process\n";
if (xfork()) {
delete $req->{'__do_keepalive'}; # just finish normally
} else {
BSServer::serverstatus_close(); # no longer using the slot
delete $req->{'starttime'}; # don't log twice
$req->{'conf'}->{'dispatch'} = \&external_notification_redis_dispatch; # only allow redist notifications
}
}

Expand All @@ -6977,8 +6971,12 @@ sub external_notification_redis {
die("need payload for redis notification\n") unless BSServer::have_content();
my $payload = BSUtil::fromstorable(BSServer::read_data());
add_redis_notifications($payload);
return redis_multirequest_server() if $cgi->{'multirequest'};
return $BSStdServer::return_ok;
my $req = $BSServer::request;
$req->{'allow_keepalive'} = 1 if $cgi->{'keepalive'} && ($req->{'keepalive_count'} || 0) < 20 && ($req->{'keepalive_starttime'} || 0) + 300 < time();
$req->{'keepalive_maxidle'} = 300;
BSStdServer::stdreply($BSStdServer::return_ok);
start_redis_keepalive_process() if $req->{'__do_keepalive'} && !$req->{'keepalive_count'};
return undef;
}

# FIXME improve error handling
Expand Down

0 comments on commit 0c5436c

Please sign in to comment.