Skip to content

Commit

Permalink
[backend] use standard keepalive mechanism in bs_notifyforward
Browse files Browse the repository at this point in the history
This should make it possible to have a persistent connection
over a reverse proxy like haproxy.
  • Loading branch information
mlschroe committed Oct 5, 2023
1 parent fa2418f commit 09e8f48
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 84 deletions.
70 changes: 9 additions & 61 deletions src/backend/bs_notifyforward
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ BEGIN {
}

use POSIX;
use Data::Dumper;
use Fcntl qw(:DEFAULT :flock);
use XML::Structured ':bytes';
use Getopt::Long ();
Expand Down Expand Up @@ -94,76 +93,25 @@ sub markdone {
syswrite($markfd, "|", 1) == 1 || die("syswrite: $!\n");
}

my $multirequestsock;
my $multirequestcnt = 0;
my $multirequeststart = 0;

sub ansreqreceiver {
my ($ansreq, $param) = @_;
return unless $ansreq->{'headers'}->{'x-multirequest'};
BSHTTP::read_data($ansreq, undef, 1);
die("ansreq contains data: $ansreq->{'__data'}\n") if length($ansreq->{'__data'});
open($multirequestsock, '+<&', $ansreq->{'__socket'});
die("socket dup failed\n") unless $multirequestsock;
print "entering multirequest mode\n";
$multirequestcnt = 0;
$multirequeststart = time();
}

sub multirequestrpc {
my ($sock, $redisdata) = @_;

my $pkg = BSUtil::tostorable($redisdata);
substr($pkg, 0, 0, sprintf("CL%030d", length($pkg)));
BSHTTP::swrite($sock, $pkg);
my $ans = '';
while (1) {
my $r = sysread($sock, $ans, 1);
last if $r;
die("received truncated answer: $!\n") if !defined($r) && $! != POSIX::EINTR && $! != POSIX::EWOULDBLOCK;
die("received truncated answer\n") if defined $r;
}
if ($ans ne '0') {
my ($status) = BSRPC::readanswerheaderblock($sock, $ans);
die("remote error: $status\n");
}
}
my $redis_keepalive = {};

sub forwardredis {
my ($redisdata, $markfd, $markoffs) = @_;

if ($multirequestsock && ($multirequestcnt++ >= 64 || $multirequeststart + 300 < time())) {
close($multirequestsock);
$multirequestsock = undef;
}
if ($multirequestsock) {
local $SIG{'ALRM'} = sub { alarm(0); die("multirequest rpc timeout\n") };
eval {
alarm(300);
multirequestrpc($multirequestsock, $redisdata);
};
alarm(0);
if (!$@) {
markdone($markfd, $_) for @$markoffs;
print "forwarded ".@$markoffs." redis notifications (multirequest mode)\n";
return;
}
warn($@);
close($multirequestsock);
$multirequestsock = undef;
}

%$redis_keepalive = () if $redis_keepalive->{'start'} && ($redis_keepalive->{'count'} >= 64 || $redis_keepalive->{'start'} + 300 < time());
my $param = {
'uri' => "$BSConfig::srcserver/notify/redis",
'request' => 'POST',
'timeout' => 300,
'headers' => [ 'Content-Type: application/octet-stream' ],
'data' => BSUtil::tostorable($redisdata),
'receiver' => \&ansreqreceiver,
'keepalive' => $redis_keepalive,
'verbose' => 1,
};
my @args;
push @args, 'multirequest=1' if $BSConfig::srcserver =~ /^http:/; # sock dup only works for http
BSRPC::rpc($param, undef, @args);
eval { BSRPC::rpc($param, undef, 'keepalive=1') };
if ($@) {
%$redis_keepalive = ();
die($@);
}
markdone($markfd, $_) for @$markoffs;
print "forwarded ".@$markoffs." redis notifications\n";
}
Expand Down
44 changes: 21 additions & 23 deletions src/backend/bs_srcserver
Original file line number Diff line number Diff line change
Expand Up @@ -6947,28 +6947,22 @@ sub add_redis_notifications {
BSUtil::ping("$redisdir/.ping") unless $oldsize > 0;
}

sub redis_multirequest_server {
print "entering multirequest mode\n";
BSStdServer::stdreply($BSStdServer::return_ok, 'X-Multirequest: true');
return undef if xfork();
sub external_notification_redis_dispatch {
my ($conf, $req) = @_;
die("redis keepalive process: must be POST:/notify/redis request\n") if "$req->{'action'}:$req->{'path'}" ne 'POST:/notify/redis';
return external_notification_redis(BSDispatch::parse_cgi_singles($req));
}

sub start_redis_keepalive_process {
my $req = $BSServer::request;
my $sock = $req->{'__socket'};
while (1) {
while (length($req->{'__data'}) < 32) {
my $r = sysread($sock, $req->{'__data'}, 8192, length($req->{'__data'}));
if (!$r && defined($r)) {
print "multirequest process exiting due to eof\n";
exit(0);
}
die("read error\n") if !defined($r) && ($! != POSIX::EINTR && $! != POSIX::EWOULDBLOCK);
}
my $hdr = substr($req->{'__data'}, 0, 32, '');
die("bad content header: $hdr\n") unless ($hdr =~ /^CL(\d{30})$/s) && $1;
$req->{'__cl'} = $1;
delete $req->{'__eof'};
my $payload = BSUtil::fromstorable(BSServer::read_data());
add_redis_notifications($payload);
BSHTTP::swrite($sock, "0");
# background us
print "starting redis keepalive process\n";
if (xfork()) {
delete $req->{'__do_keepalive'}; # just finish normally
} else {
BSServer::serverstatus_close(); # no longer using the slot
delete $req->{'starttime'}; # don't log twice
$req->{'conf'}->{'dispatch'} = \&external_notification_redis_dispatch; # only allow redist notifications
}
}

Expand All @@ -6977,8 +6971,12 @@ sub external_notification_redis {
die("need payload for redis notification\n") unless BSServer::have_content();
my $payload = BSUtil::fromstorable(BSServer::read_data());
add_redis_notifications($payload);
return redis_multirequest_server() if $cgi->{'multirequest'};
return $BSStdServer::return_ok;
my $req = $BSServer::request;
$req->{'allow_keepalive'} = 1 if $cgi->{'keepalive'} && ($req->{'keepalive_count'} || 0) < 20 && ($req->{'keepalive_starttime'} || 0) + 300 < time();
$req->{'keepalive_maxidle'} = 300;
BSStdServer::stdreply($BSStdServer::return_ok);
start_redis_keepalive_process() if $req->{'__do_keepalive'} && !$req->{'keepalive_count'};
return undef;
}

# FIXME improve error handling
Expand Down

0 comments on commit 09e8f48

Please sign in to comment.