Skip to content

Commit

Permalink
proxy: support to-be-closed for result objects
Browse files Browse the repository at this point in the history
if using mcp.internal(r) to fetch keys, but not returning them to the
user, the underlying item references will normally stick around until
they are garbage collected. Memcached is _not_ designed for this:
references must be held for a short and temporary period of time.

If using a res object that you don't intend to send back to the user, it
must be marked with <close>
  • Loading branch information
dormando committed May 24, 2024
1 parent 4a75ac2 commit 0954b53
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 130 deletions.
12 changes: 12 additions & 0 deletions proxy_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,17 @@ static int mcplib_response_gc(lua_State *L) {
return 0;
}

// Note that this can be called multiple times for a single object, as opposed
// to _gc. The cleanup routine is armored against repeat accesses by NULL'ing
// th efields it checks.
static int mcplib_response_close(lua_State *L) {
LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_resp_t *r = luaL_checkudata(L, 1, "mcp.response");
mcp_response_cleanup(t, r);

return 0;
}

// NOTE: backends are global objects owned by pool objects.
// Each pool has a "proxy pool object" distributed to each worker VM.
// proxy pool objects are held at the same time as any request exists on a
Expand Down Expand Up @@ -1685,6 +1696,7 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
{"line", mcplib_response_line},
{"elapsed", mcplib_response_elapsed},
{"__gc", mcplib_response_gc},
{"__close", mcplib_response_close},
{NULL, NULL}
};

Expand Down
109 changes: 4 additions & 105 deletions t/proxyinternal.lua
Original file line number Diff line number Diff line change
@@ -1,111 +1,10 @@
function mcp_config_pools(oldss)
mcp.backend_read_timeout(0.5)
mcp.backend_connect_timeout(5)

local srv = mcp.backend

-- Single backend for zones to ease testing.
-- For purposes of this config the proxy is always "zone 1" (z1)
local b1 = srv('b1', '127.0.0.1', 11611)
local b2 = srv('b2', '127.0.0.1', 11612)
local b3 = srv('b3', '127.0.0.1', 11613)

local b1z = {b1}
local b2z = {b2}
local b3z = {b3}

-- convert the backends to pools.
-- as per a normal full config see simple.lua or t/startfile.lua
local zones = {
z1 = mcp.pool(b1z),
z2 = mcp.pool(b2z),
z3 = mcp.pool(b3z),
}

return zones
end

-- WORKER CODE:

-- Using a very simple route handler only to allow testing the three
-- workarounds in the same configuration file.
function prefix_factory(pattern, list, default)
local p = pattern
local l = list
local d = default
return function(r)
local route = l[string.match(r:key(), p)]
if route == nil then
return d(r)
end
return route(r)
end
end

-- just for golfing the code in mcp_config_routes()
function toproute_factory(pfx, label)
local err = "SERVER_ERROR no " .. label .. " route\r\n"
return prefix_factory("^/(%a+)/", pfx, function(r) return err end)
function mcp_config_pools()
return true
end

-- Do specialized testing based on the key prefix.
function mcp_config_routes(zones)
local pfx_get = {}
local pfx_set = {}
local pfx_touch = {}
local pfx_gets = {}
local pfx_gat = {}
local pfx_gats = {}
local pfx_cas = {}
local pfx_add = {}
local pfx_delete = {}
local pfx_incr = {}
local pfx_decr = {}
local pfx_append = {}
local pfx_prepend = {}
local pfx_mg = {}
local pfx_ms = {}
local pfx_md = {}
local pfx_ma = {}

local basic = function(r)
mcp.attach(mcp.CMD_ANY_STORAGE, function(r)
return mcp.internal(r)
end

pfx_get["b"] = basic
pfx_set["b"] = basic
pfx_touch["b"] = basic
pfx_gets["b"] = basic
pfx_gat["b"] = basic
pfx_gats["b"] = basic
pfx_cas["b"] = basic
pfx_add["b"] = basic
pfx_delete["b"] = basic
pfx_incr["b"] = basic
pfx_decr["b"] = basic
pfx_append["b"] = basic
pfx_prepend["b"] = basic
pfx_mg["b"] = basic
pfx_ms["b"] = basic
pfx_md["b"] = basic
pfx_ma["b"] = basic

mcp.attach(mcp.CMD_GET, toproute_factory(pfx_get, "get"))
mcp.attach(mcp.CMD_SET, toproute_factory(pfx_set, "set"))
mcp.attach(mcp.CMD_TOUCH, toproute_factory(pfx_touch, "touch"))
mcp.attach(mcp.CMD_GETS, toproute_factory(pfx_gets, "gets"))
mcp.attach(mcp.CMD_GAT, toproute_factory(pfx_gat, "gat"))
mcp.attach(mcp.CMD_GATS, toproute_factory(pfx_gats, "gats"))
mcp.attach(mcp.CMD_CAS, toproute_factory(pfx_cas, "cas"))
mcp.attach(mcp.CMD_ADD, toproute_factory(pfx_add, "add"))
mcp.attach(mcp.CMD_DELETE, toproute_factory(pfx_delete, "delete"))
mcp.attach(mcp.CMD_INCR, toproute_factory(pfx_incr, "incr"))
mcp.attach(mcp.CMD_DECR, toproute_factory(pfx_decr, "decr"))
mcp.attach(mcp.CMD_APPEND, toproute_factory(pfx_append, "append"))
mcp.attach(mcp.CMD_PREPEND, toproute_factory(pfx_prepend, "prepend"))
mcp.attach(mcp.CMD_MG, toproute_factory(pfx_mg, "mg"))
mcp.attach(mcp.CMD_MS, toproute_factory(pfx_ms, "ms"))
mcp.attach(mcp.CMD_MD, toproute_factory(pfx_md, "md"))
mcp.attach(mcp.CMD_MA, toproute_factory(pfx_ma, "ma"))

end)
end
25 changes: 0 additions & 25 deletions t/proxyinternal.t
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,10 @@ sub check_version {
like(<$ps>, qr/VERSION /, "version received");
}

my @mocksrvs = ();
#diag "making mock servers";
for my $port (11611, 11612, 11613) {
my $srv = mock_server($port);
ok(defined $srv, "mock server created");
push(@mocksrvs, $srv);
}

my $p_srv = new_memcached("-o proxy_config=./t/proxyinternal.lua,ext_item_size=500,ext_item_age=1,ext_path=$ext_path:64m,ext_max_sleep=100000 -t 1");
my $ps = $p_srv->sock;
$ps->autoflush(1);

# set up server backend sockets.
# uncomment when needed. currently they get thrown out so this can hang.
#my @mbe = ();
#diag "accepting mock backends";
#for my $msrv (@mocksrvs) {
# my $be = $msrv->accept();
# $be->autoflush(1);
# ok(defined $be, "mock backend created");
# push(@mbe, $be);
#}

#diag "validating backends";
#for my $be (@mbe) {
# like(<$be>, qr/version/, "received version command");
# print $be "VERSION 1.0.0-mock\r\n";
#}

{
print $ps "ms /b/a 2\r\nhi\r\n";
is(scalar <$ps>, "HD\r\n", "bare ms command works");
Expand Down
21 changes: 21 additions & 0 deletions t/proxyinternal2.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
function mcp_config_pools()
return true
end

local result_leak = {}
-- Do specialized testing based on the key prefix.
function mcp_config_routes(zones)
mcp.attach(mcp.CMD_ANY_STORAGE, function(r)
local cmd = r:command()
if cmd == mcp.CMD_GET or cmd == mcp.CMD_MG then
-- marking the object as <close> will clean up its internal
-- references as soon as it drops out of scope.
-- it is an error to try to use this 'res' outside of this 'if'
-- statement!
local res <close> = mcp.internal(r)
-- uncomment to test effects of leaking a res obj
table.insert(result_leak, res)
end
return mcp.internal(r)
end)
end
118 changes: 118 additions & 0 deletions t/proxyinternal2.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#!/usr/bin/env perl

use strict;
use warnings;
use Test::More;
use FindBin qw($Bin);
use lib "$Bin/lib";
use Carp qw(croak);
use MemcachedTest;
use IO::Socket qw(AF_INET SOCK_STREAM);
use IO::Select;
use Data::Dumper qw/Dumper/;

if (!supports_proxy()) {
plan skip_all => 'proxy not enabled';
exit 0;
}

# Set up some server sockets.
sub mock_server {
my $port = shift;
my $srv = IO::Socket->new(
Domain => AF_INET,
Type => SOCK_STREAM,
Proto => 'tcp',
LocalHost => '127.0.0.1',
LocalPort => $port,
ReusePort => 1,
Listen => 5) || die "IO::Socket: $@";
return $srv;
}

# Put a version command down the pipe to ensure the socket is clear.
# client version commands skip the proxy code
sub check_version {
my $ps = shift;
print $ps "version\r\n";
like(<$ps>, qr/VERSION /, "version received");
}

my $p_srv = new_memcached("-o proxy_config=./t/proxyinternal2.lua,slab_chunk_max=32 -t 1");
my $ps = $p_srv->sock;
$ps->autoflush(1);

subtest 'basic large item' => sub {
my $data = 'x' x 500000;
print $ps "set /b/beeeg 0 0 500000\r\n$data\r\n";
is(scalar <$ps>, "STORED\r\n", "big item stored");

print $ps "get /b/beeeg\r\n";
is(scalar <$ps>, "VALUE /b/beeeg 0 500000\r\n", "got large response");
is(scalar <$ps>, "$data\r\n", "got data portion back");
is(scalar <$ps>, "END\r\n", "saw END");

print $ps "delete /b/beeeg\r\n";
is(scalar <$ps>, "DELETED\r\n");
check_version($ps);
};

subtest 'basic chunked item' => sub {
my $data = 'x' x 900000;
print $ps "set /b/chunked 0 0 900000\r\n$data\r\n";
is(scalar <$ps>, "STORED\r\n", "big item stored");

print $ps "get /b/chunked\r\n";
is(scalar <$ps>, "VALUE /b/chunked 0 900000\r\n", "got large response");
is(scalar <$ps>, "$data\r\n", "got data portion back");
is(scalar <$ps>, "END\r\n", "saw END");

print $ps "delete /b/chunked\r\n";
is(scalar <$ps>, "DELETED\r\n");
check_version($ps);
};

subtest 'flood memory' => sub {
# ensure we don't have a basic reference counter leak
my $data = 'x' x 500000;
for (1 .. 200) {
print $ps "set /b/$_ 0 0 500000\r\n$data\r\n";
is(scalar <$ps>, "STORED\r\n", "flood set");
}
for (1 .. 200) {
print $ps "ms /b/$_ 500000 T30\r\n$data\r\n";
is(scalar <$ps>, "HD\r\n", "flood ms");
}

# overwrite the same value a bunch of times.
for (1 .. 200) {
print $ps "ms BOOM 500000 T30\r\n$data\r\n";
is(scalar <$ps>, "HD\r\n", "flood ms");
# fetch to attempt to leak objects
mem_get_is($ps, "BOOM", $data);
}
print $ps "md BOOM\r\n";
like(scalar <$ps>, qr/HD|NF/, "deleted");

check_version($ps);
};

subtest 'check stats' => sub {
# delete things manually since we can't easily call flush_all
for (1 .. 200) {
print $ps "md /b/$_\r\n";
like(scalar <$ps>, qr/HD|NF/, "deleted");
}
# everything else should've been pushed out of memory by the flood

my $s = mem_stats($ps, 'slabs');
for my $k (keys %$s) {
if ($k =~ m/(\d+):used/) {
is($s->{$k}, 0, "class " . $k . " is empty")
#print STDERR $k, " => ", $s->{$k}, "\n";
}
}
#print STDERR "DUMP:", Dumper($s), "\n";
};

done_testing();

0 comments on commit 0954b53

Please sign in to comment.