Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: yosh/redis
base: 3345ab446a
...
head fork: yosh/redis
compare: f5d19cf0c4
  • 8 commits
  • 10 files changed
  • 0 commit comments
  • 2 contributors
Commits on Jan 07, 2012
@antirez antirez List connected slaves with ip,port,state information in INFO, as requ…
…ested by github issue #219
8b102e0
@antirez antirez Do not propagate DEBUG LOADAOF 8939ff1
@antirez antirez Fixed replication when multiple slaves are attaching at the same time…
…. The output buffer was not copied correctly between slaves. This fixes issue #141.
a5045d5
@antirez antirez Protections against protocol desyncs, leading to infinite query buffe…
…r growing, due to nul-terms in specific bytes of the request or indefinitely long multi bulk or bulk count strings without newlines. This bug is related to Issue #141 as well.
e7b85b3
@antirez antirez Regression tests for protocol desync bug related to Issue #141 5378578
@antirez antirez Redis test: when assertion fails print not just the expression but al…
…so expanded values in the error message.
d4945b2
@antirez antirez Regression test for the main problem causing issue #141. Minor change…
…s/fixes/additions to the test suite itself needed to write the test.
2c2b615
Commits on Jan 08, 2012
@yosh Merge remote-tracking branch 'antirez/2.4' into rdio-2.4 f5d19cf
View
1  src/debug.c
@@ -235,6 +235,7 @@ void debugCommand(redisClient *c) {
addReply(c,shared.err);
return;
}
+ server.dirty = 0; /* Prevent AOF / replication */
redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
View
39 src/networking.c
@@ -1,6 +1,8 @@
#include "redis.h"
#include <sys/uio.h>
+static void setProtocolError(redisClient *c, int pos);
+
void *dupClientReplyValue(void *o) {
incrRefCount((robj*)o);
return o;
@@ -388,6 +390,16 @@ void addReplyBulkLongLong(redisClient *c, long long ll) {
addReplyBulkCBuffer(c,buf,len);
}
+/* Copy 'src' client output buffers into 'dst' client output buffers.
+ * The function takes care of freeing the old output buffers of the
+ * destination client. */
+void copyClientOutputBuffer(redisClient *dst, redisClient *src) {
+ listRelease(dst->reply);
+ dst->reply = listDup(src->reply);
+ memcpy(dst->buf,src->buf,src->bufpos);
+ dst->bufpos = src->bufpos;
+}
+
static void acceptCommonHandler(int fd) {
redisClient *c;
if ((c = createClient(fd)) == NULL) {
@@ -668,8 +680,13 @@ int processInlineBuffer(redisClient *c) {
size_t querylen;
/* Nothing to do without a \r\n */
- if (newline == NULL)
+ if (newline == NULL) {
+ if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
+ addReplyError(c,"Protocol error: too big inline request");
+ setProtocolError(c,0);
+ }
return REDIS_ERR;
+ }
/* Split the input buffer up to the \r\n */
querylen = newline-(c->querybuf);
@@ -719,8 +736,13 @@ int processMultibulkBuffer(redisClient *c) {
/* Multi bulk length cannot be read without a \r\n */
newline = strchr(c->querybuf,'\r');
- if (newline == NULL)
+ if (newline == NULL) {
+ if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
+ addReplyError(c,"Protocol error: too big mbulk count string");
+ setProtocolError(c,0);
+ }
return REDIS_ERR;
+ }
/* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
@@ -754,8 +776,13 @@ int processMultibulkBuffer(redisClient *c) {
/* Read bulk length if unknown */
if (c->bulklen == -1) {
newline = strchr(c->querybuf+pos,'\r');
- if (newline == NULL)
+ if (newline == NULL) {
+ if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
+ addReplyError(c,"Protocol error: too big bulk count string");
+ setProtocolError(c,0);
+ }
break;
+ }
/* Buffer should also contain \n */
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
@@ -796,9 +823,9 @@ int processMultibulkBuffer(redisClient *c) {
c->querybuf = sdsrange(c->querybuf,pos,-1);
/* We're done when c->multibulk == 0 */
- if (c->multibulklen == 0) {
- return REDIS_OK;
- }
+ if (c->multibulklen == 0) return REDIS_OK;
+
+ /* Still not read to process the command */
return REDIS_ERR;
}
View
33 src/redis.c
@@ -1352,6 +1352,39 @@ sds genRedisInfoString(void) {
bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC));
}
+ /* List connected slaves */
+ if (listLength(server.slaves)) {
+ int slaveid = 0;
+ listNode *ln;
+ listIter li;
+
+ listRewind(server.slaves,&li);
+ while((ln = listNext(&li))) {
+ redisClient *slave = listNodeValue(ln);
+ char *state = NULL;
+ char ip[32];
+ int port;
+
+ if (anetPeerToString(slave->fd,ip,&port) == -1) continue;
+ switch(slave->replstate) {
+ case REDIS_REPL_WAIT_BGSAVE_START:
+ case REDIS_REPL_WAIT_BGSAVE_END:
+ state = "wait_bgsave";
+ break;
+ case REDIS_REPL_SEND_BULK:
+ state = "send_bulk";
+ break;
+ case REDIS_REPL_ONLINE:
+ state = "online";
+ break;
+ }
+ if (state == NULL) continue;
+ info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n",
+ slaveid,ip,port,state);
+ slaveid++;
+ }
+ }
+
if (server.masterhost) {
info = sdscatprintf(info,
"master_host:%s\r\n"
View
2  src/redis.h
@@ -49,6 +49,7 @@
#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
#define REDIS_SHARED_INTEGERS 10000
#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
+#define REDIS_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */
#define REDIS_MAX_LOGMSG_LEN 4096 /* Default maximum length of syslog messages */
#define REDIS_AUTO_AOFREWRITE_PERC 100
#define REDIS_AUTO_AOFREWRITE_MIN_SIZE (1024*1024)
@@ -705,6 +706,7 @@ void addReplyStatus(redisClient *c, char *status);
void addReplyDouble(redisClient *c, double d);
void addReplyLongLong(redisClient *c, long long ll);
void addReplyMultiBulkLen(redisClient *c, long length);
+void copyClientOutputBuffer(redisClient *dst, redisClient *src);
void *dupClientReplyValue(void *o);
void getClientsMaxBuffers(unsigned long *longest_output_list,
unsigned long *biggest_input_buffer);
View
3  src/replication.c
@@ -122,8 +122,7 @@ void syncCommand(redisClient *c) {
if (ln) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
- listRelease(c->reply);
- c->reply = listDup(slave->reply);
+ copyClientOutputBuffer(c,slave);
c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
View
15 tests/helpers/gen_write_load.tcl
@@ -0,0 +1,15 @@
+source tests/support/redis.tcl
+
+proc gen_write_load {host port seconds} {
+ set start_time [clock seconds]
+ set r [redis $host $port 1]
+ $r select 9
+ while 1 {
+ $r set [expr rand()] [expr rand()]
+ if {[clock seconds]-$start_time > $seconds} {
+ exit 0
+ }
+ }
+}
+
+gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2]
View
68 tests/integration/replication.tcl
@@ -65,3 +65,71 @@ start_server {tags {"repl"}} {
} {0 0}
}
}
+
+proc start_write_load {host port seconds} {
+ exec tclsh8.5 tests/helpers/gen_write_load.tcl $host $port $seconds &
+}
+
+proc stop_write_load {handle} {
+ catch {exec /bin/kill -9 $handle}
+}
+
+start_server {tags {"repl"}} {
+ set master [srv 0 client]
+ set master_host [srv 0 host]
+ set master_port [srv 0 port]
+ set slaves {}
+ set load_handle0 [start_write_load $master_host $master_port 20]
+ set load_handle1 [start_write_load $master_host $master_port 20]
+ set load_handle2 [start_write_load $master_host $master_port 20]
+ set load_handle3 [start_write_load $master_host $master_port 20]
+ set load_handle4 [start_write_load $master_host $master_port 20]
+ after 2000
+ start_server {} {
+ lappend slaves [srv 0 client]
+ start_server {} {
+ lappend slaves [srv 0 client]
+ start_server {} {
+ lappend slaves [srv 0 client]
+ test "Connect multiple slaves at the same time (issue #141)" {
+ [lindex $slaves 0] slaveof $master_host $master_port
+ [lindex $slaves 1] slaveof $master_host $master_port
+ [lindex $slaves 2] slaveof $master_host $master_port
+
+ # Wait for all the three slaves to reach the "online" state
+ set retry 100
+ while {$retry} {
+ set info [r -3 info]
+ if {[string match {*slave0:*,online*slave1:*,online*slave2:*,online*} $info]} {
+ break
+ } else {
+ incr retry -1
+ after 100
+ }
+ }
+ if {$retry == 0} {
+ error "assertion:Slaves not correctly synchronized"
+ }
+ stop_write_load $load_handle0
+ stop_write_load $load_handle1
+ stop_write_load $load_handle2
+ stop_write_load $load_handle3
+ stop_write_load $load_handle4
+ after 1000
+ set digest [$master debug digest]
+ set digest0 [[lindex $slaves 0] debug digest]
+ set digest1 [[lindex $slaves 1] debug digest]
+ set digest2 [[lindex $slaves 2] debug digest]
+ assert {$digest ne 0000000000000000000000000000000000000000}
+ assert {$digest eq $digest0}
+ assert {$digest eq $digest1}
+ assert {$digest eq $digest2}
+ #puts [$master dbsize]
+ #puts [[lindex $slaves 0] dbsize]
+ #puts [[lindex $slaves 1] dbsize]
+ #puts [[lindex $slaves 2] dbsize]
+ }
+ }
+ }
+ }
+}
View
4 tests/support/test.tcl
@@ -4,8 +4,8 @@ set ::num_failed 0
set ::tests_failed {}
proc assert {condition} {
- if {![uplevel 1 expr $condition]} {
- error "assertion:Expected condition '$condition' to be true"
+ if {![uplevel 1 [list expr $condition]]} {
+ error "assertion:Expected condition '$condition' to be true ([uplevel 1 [list subst -nocommands $condition]])"
}
}
View
2  tests/test_helper.tcl
@@ -113,7 +113,7 @@ proc reconnect {args} {
}
# re-set $srv in the servers list
- set ::servers [lreplace $::servers end+$level 1 $srv]
+ lset ::servers end+$level $srv
}
proc redis_deferring_client {args} {
View
31 tests/unit/protocol.tcl
@@ -59,4 +59,35 @@ start_server {tags {"protocol"}} {
reconnect
assert_error "*wrong*arguments*ping*" {r ping x y z}
}
+
+ set c 0
+ foreach seq [list "\x00" "*\x00" "$\x00"] {
+ incr c
+ test "Protocol desync regression test #$c" {
+ set s [socket [srv 0 host] [srv 0 port]]
+ puts -nonewline $s $seq
+ set payload [string repeat A 1024]"\n"
+ set test_start [clock seconds]
+ set test_time_limit 5
+ while 1 {
+ if {[catch {
+ puts -nonewline $s payload
+ flush $s
+ incr payload_size [string length $payload]
+ }]} {
+ set retval [gets $s]
+ close $s
+ break
+ } else {
+ set elapsed [expr {[clock seconds]-$test_start}]
+ if {$elapsed > $test_time_limit} {
+ close $s
+ error "assertion:Redis did not closed connection after protocol desync"
+ }
+ }
+ }
+ set retval
+ } {*Protocol error*}
+ }
+ unset c
}

No commit comments for this range

Something went wrong with that request. Please try again.