Skip to content

Commit

Permalink
Implement 'noreply' option for update commands. (Tomash Brechko <toma…
Browse files Browse the repository at this point in the history
…sh.brechko@gmail.com>)

Commands add, set, replace, append, prepend, cas, delete, incr, decr,
flush_all, verbosity can take last optional parameter, 'noreply',
which instructs the server to not send the reply.

Add benchmark script for noreply parameter.


git-svn-id: http://code.sixapart.com/svn/memcached/trunk/server@708 b0b603af-a30f-0410-a34e-baf09ae79d0b
  • Loading branch information
kroki committed Feb 22, 2008
1 parent 57e3367 commit d9ece78
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 28 deletions.
51 changes: 51 additions & 0 deletions devtools/bench_noreply.pl
@@ -0,0 +1,51 @@
#! /usr/bin/perl
#
use warnings;
use strict;

use IO::Socket::INET;

use FindBin;

@ARGV == 1 or @ARGV == 2
or die "Usage: $FindBin::Script HOST:PORT [COUNT]\n";

# Note that it's better to run the test over the wire, because for
# localhost the task may become CPU bound.
my $addr = $ARGV[0];
my $count = $ARGV[1] || 10_000;

my $sock = IO::Socket::INET->new(PeerAddr => $addr,
Timeout => 3);
die "$!\n" unless $sock;


# By running 'noreply' test first we also ensure there are no reply
# packets left in the network.
foreach my $noreply (1, 0) {
use Time::HiRes qw(gettimeofday tv_interval);

print "'noreply' is ", $noreply ? "enabled" : "disabled", ":\n";
my $param = $noreply ? 'noreply' : '';
my $start = [gettimeofday];
foreach (1 .. $count) {
print $sock "add foo 0 0 1 $param\r\n1\r\n";
scalar<$sock> unless $noreply;
print $sock "set foo 0 0 1 $param\r\n1\r\n";
scalar<$sock> unless $noreply;
print $sock "replace foo 0 0 1 $param\r\n1\r\n";
scalar<$sock> unless $noreply;
print $sock "append foo 0 0 1 $param\r\n1\r\n";
scalar<$sock> unless $noreply;
print $sock "prepend foo 0 0 1 $param\r\n1\r\n";
scalar<$sock> unless $noreply;
print $sock "incr foo 1 $param\r\n";
scalar<$sock> unless $noreply;
print $sock "decr foo 1 $param\r\n";
scalar<$sock> unless $noreply;
print $sock "delete foo $param\r\n";
scalar<$sock> unless $noreply;
}
my $end = [gettimeofday];
printf("update commands: %.2f secs\n\n", tv_interval($start, $end));
}
53 changes: 35 additions & 18 deletions doc/protocol.txt
Expand Up @@ -126,9 +126,10 @@ Storage commands

First, the client sends a command line which looks like this:

<command name> <key> <flags> <exptime> <bytes> [<cas unqiue>]\r\n
<command name> <key> <flags> <exptime> <bytes> [noreply]\r\n
cas <key> <flags> <exptime> <bytes> <cas unqiue> [noreply]\r\n

- <command name> is "set", "add", "replace", "append", "prepend", or "cas"
- <command name> is "set", "add", "replace", "append" or "prepend"

"set" means "store this data".

Expand Down Expand Up @@ -174,6 +175,12 @@ First, the client sends a command line which looks like this:
Clients should use the value returned from the "gets" command
when issuing "cas" updates.

- "noreply" optional parameter instructs the server to not send the
reply. NOTE: if the request line is malformed, the server can't
parse "noreply" option reliably. In this case it may send the error
to the client, and not reading it on the client side will break
things. Client should construct only valid requests.

After this line, the client sends the data block:

<data block>\r\n
Expand Down Expand Up @@ -245,7 +252,7 @@ Deletion

The command "delete" allows for explicit deletion of items:

delete <key> <time>\r\n
delete <key> [<time>] [noreply]\r\n

- <key> is the key of the item the client wishes the server to delete

Expand All @@ -261,6 +268,10 @@ delete <key> <time>\r\n
(which means that the item will be deleted immediately and further
storage commands with this key will succeed).

- "noreply" optional parameter instructs the server to not send the
reply. See the note in Storage commands regarding malformed
requests.

The response line to this command can be one of:

- "DELETED\r\n" to indicate success
Expand All @@ -285,17 +296,21 @@ non-existent key exists with value 0; instead, they will fail.

The client sends the command line:

incr <key> <value>\r\n
incr <key> <value> [noreply]\r\n

or

decr <key> <value>\r\n
decr <key> <value> [noreply]\r\n

- <key> is the key of the item the client wishes to change

- <value> is the amount by which the client wants to increase/decrease
the item. It is a decimal representation of a 64-bit unsigned integer.

- "noreply" optional parameter instructs the server to not send the
reply. See the note in Storage commands regarding malformed
requests.

The response will be one of:

- "NOT_FOUND\r\n" to indicate the item with this value was not found
Expand Down Expand Up @@ -399,16 +414,17 @@ Other commands
--------------

"flush_all" is a command with an optional numeric argument. It always
succeeds, and the server sends "OK\r\n" in response. Its effect is to
invalidate all existing items immediately (by default) or after the
expiration specified. After invalidation none of the items will be returned
in response to a retrieval command (unless it's stored again under the
same key *after* flush_all has invalidated the items). flush_all doesn't
actually free all the memory taken up by existing items; that will
happen gradually as new items are stored. The most precise definition
of what flush_all does is the following: it causes all items whose
update time is earlier than the time at which flush_all was set to be
executed to be ignored for retrieval purposes.
succeeds, and the server sends "OK\r\n" in response (unless "noreply"
is given as the last parameter). Its effect is to invalidate all
existing items immediately (by default) or after the expiration
specified. After invalidation none of the items will be returned in
response to a retrieval command (unless it's stored again under the
same key *after* flush_all has invalidated the items). flush_all
doesn't actually free all the memory taken up by existing items; that
will happen gradually as new items are stored. The most precise
definition of what flush_all does is the following: it causes all
items whose update time is earlier than the time at which flush_all
was set to be executed to be ignored for retrieval purposes.

The intent of flush_all with a delay, was that in a setting where you
have a pool of memcached servers, and you need to flush all content,
Expand All @@ -431,9 +447,10 @@ In response, the server sends
"VERSION <version>\r\n", where <version> is the version string for the
server.

"verbosity" is a command with a numeric argument. It always
succeeds, and the server sends "OK\r\n" in response. Its effect is to
set the verbosity level of the logging output.
"verbosity" is a command with a numeric argument. It always succeeds,
and the server sends "OK\r\n" in response (unless "noreply" is given
as the last parameter). Its effect is to set the verbosity level of
the logging output.

"quit" is a command with no arguments:

Expand Down
55 changes: 45 additions & 10 deletions memcached.c
Expand Up @@ -372,6 +372,8 @@ conn *conn_new(const int sfd, const int init_state, const int event_flags,
c->bucket = -1;
c->gen = 0;

c->noreply = false;

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
Expand Down Expand Up @@ -733,6 +735,12 @@ static void out_string(conn *c, const char *str) {

assert(c != NULL);

if (c->noreply) {
c->noreply = false;
conn_set_state(c, conn_read);
return;
}

if (settings.verbose > 1)
fprintf(stderr, ">%d %s\n", c->sfd, str);

Expand Down Expand Up @@ -898,7 +906,7 @@ typedef struct token_s {
#define KEY_TOKEN 1
#define KEY_MAX_LENGTH 250

#define MAX_TOKENS 7
#define MAX_TOKENS 8

/*
* Tokenize the command string by replacing whitespace with '\0' and update
Expand Down Expand Up @@ -968,6 +976,23 @@ static void write_and_free(conn *c, char *buf, int bytes) {
}
}

static inline void set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
{
int noreply_index = ntokens - 2;

/*
NOTE: this function is not the first place where we are going to
send the reply. We could send it instead from process_command()
if the request line has wrong number of tokens. However parsing
malformed line for "noreply" option is not reliable anyway, so
it can't be helped.
*/
if (tokens[noreply_index].value
&& strcmp(tokens[noreply_index].value, "noreply") == 0) {
c->noreply = true;
}
}

inline static void process_stats_detail(conn *c, const char *command) {
assert(c != NULL);

Expand Down Expand Up @@ -1347,6 +1372,8 @@ static void process_update_command(conn *c, token_t *tokens, const size_t ntoken

assert(c != NULL);

set_noreply_maybe(c, tokens, ntokens);

if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
Expand Down Expand Up @@ -1418,6 +1445,8 @@ static void process_arithmetic_command(conn *c, token_t *tokens, const size_t nt

assert(c != NULL);

set_noreply_maybe(c, tokens, ntokens);

if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
Expand Down Expand Up @@ -1514,6 +1543,8 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken

assert(c != NULL);

set_noreply_maybe(c, tokens, ntokens);

if (settings.managed) {
int bucket = c->bucket;
if (bucket == -1) {
Expand All @@ -1535,7 +1566,7 @@ static void process_delete_command(conn *c, token_t *tokens, const size_t ntoken
return;
}

if(ntokens == 4) {
if(ntokens == (c->noreply ? 5 : 4)) {
exptime = strtol(tokens[2].value, NULL, 10);

if(errno == ERANGE) {
Expand Down Expand Up @@ -1598,6 +1629,8 @@ static void process_verbosity_command(conn *c, token_t *tokens, const size_t nto

assert(c != NULL);

set_noreply_maybe(c, tokens, ntokens);

level = strtoul(tokens[1].value, NULL, 10);
settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
out_string(c, "OK");
Expand Down Expand Up @@ -1635,7 +1668,7 @@ static void process_command(conn *c, char *command) {

process_get_command(c, tokens, ntokens, false);

} else if (ntokens == 6 &&
} else if ((ntokens == 6 || ntokens == 7) &&
((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
Expand All @@ -1644,23 +1677,23 @@ static void process_command(conn *c, char *command) {

process_update_command(c, tokens, ntokens, comm, false);

} else if (ntokens == 7 && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
} else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {

process_update_command(c, tokens, ntokens, comm, true);

} else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
} else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {

process_arithmetic_command(c, tokens, ntokens, 1);

} else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {

process_get_command(c, tokens, ntokens, true);

} else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
} else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {

process_arithmetic_command(c, tokens, ntokens, 0);

} else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
} else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {

process_delete_command(c, tokens, ntokens);

Expand Down Expand Up @@ -1729,11 +1762,13 @@ static void process_command(conn *c, char *command) {

process_stat(c, tokens, ntokens);

} else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
} else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
time_t exptime = 0;
set_current_time();

if(ntokens == 2) {
set_noreply_maybe(c, tokens, ntokens);

if(ntokens == (c->noreply ? 3 : 2)) {
settings.oldest_live = current_time - 1;
item_flush_expired();
out_string(c, "OK");
Expand Down Expand Up @@ -1798,7 +1833,7 @@ static void process_command(conn *c, char *command) {
#else
out_string(c, "CLIENT_ERROR Slab reassignment not supported");
#endif
} else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
} else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
process_verbosity_command(c, tokens, ntokens);
} else {
out_string(c, "ERROR");
Expand Down
1 change: 1 addition & 0 deletions memcached.h
Expand Up @@ -220,6 +220,7 @@ struct conn {
int bucket; /* bucket number for the next command, if running as
a managed instance. -1 (_not_ 0) means invalid. */
int gen; /* generation requested for the bucket */
bool noreply; /* True if the reply should not be sent. */
conn *next; /* Used for generating a list of conn structures */
};

Expand Down
53 changes: 53 additions & 0 deletions t/noreply.t
@@ -0,0 +1,53 @@
#!/usr/bin/perl

use strict;
use Test::More tests => 10;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;


my $server = new_memcached();
my $sock = $server->sock;


# Test that commands can take 'noreply' parameter.
print $sock "flush_all noreply\r\n";
print $sock "flush_all 0 noreply\r\n";

print $sock "verbosity 0 noreply\r\n";

print $sock "add noreply:foo 0 0 1 noreply\r\n1\r\n";
mem_get_is($sock, "noreply:foo", "1");

print $sock "set noreply:foo 0 0 1 noreply\r\n2\r\n";
mem_get_is($sock, "noreply:foo", "2");

print $sock "replace noreply:foo 0 0 1 noreply\r\n3\r\n";
mem_get_is($sock, "noreply:foo", "3");

print $sock "append noreply:foo 0 0 1 noreply\r\n4\r\n";
mem_get_is($sock, "noreply:foo", "34");

print $sock "prepend noreply:foo 0 0 1 noreply\r\n5\r\n";
my @result = mem_gets($sock, "noreply:foo");
ok($result[1] eq "534");

print $sock "cas noreply:foo 0 0 1 $result[0] noreply\r\n6\r\n";
mem_get_is($sock, "noreply:foo", "6");

print $sock "incr noreply:foo 3 noreply\r\n";
mem_get_is($sock, "noreply:foo", "9");

print $sock "decr noreply:foo 2 noreply\r\n";
mem_get_is($sock, "noreply:foo", "7");

print $sock "delete noreply:foo noreply\r\n";
mem_get_is($sock, "noreply:foo");

# Test that delete accepts both <time> and 'noreply'.
print $sock "add noreply:foo 0 0 1 noreply\r\n1\r\n";
print $sock "delete noreply:foo 10 noreply\r\n";
print $sock "add noreply:foo 0 0 1 noreply\r\n1\r\n";
# undef result means we couldn't add an entry because the key is locked.
mem_get_is($sock, "noreply:foo");

0 comments on commit d9ece78

Please sign in to comment.