Skip to content

Commit

Permalink
Make redis-cli support PSYNC command (#11647)
Browse files Browse the repository at this point in the history
The current redis-cli does not support the real PSYNC command, the older
version of redis-cli can support PSYNC is because that we actually issue
the SYNC command instead of PSYNC, so it act like SYNC (always full-sync).
Noted that in this case we will send the SYNC first (triggered by sendSync),
then send the PSYNC (the one in redis-cli input).

Didn't bother to find which version that the order changed, we send PSYNC
first (the one in redis-cli input), and then send the SYNC (the one triggered
by sendSync). So even full-sync is not working anymore, and it will result
this output (mentioned in issue #11246):
```
psync dummy 0
Entering replica output mode...  (press Ctrl-C to quit)
SYNC with master, discarding bytes of bulk transfer until EOF marker...
Error reading RDB payload while SYNCing
```

This PR adds PSYNC support to redis-cli, which can handle +FULLRESYNC and
+CONTINUE responses, and some examples will follow.


Co-authored-by: Oran Agra <oran@redislabs.com>
  • Loading branch information
enjoy-binbin and oranagra committed Jan 4, 2023
1 parent c805212 commit 4ef4c4a
Showing 1 changed file with 74 additions and 22 deletions.
96 changes: 74 additions & 22 deletions src/redis-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ static struct pref {

static volatile sig_atomic_t force_cancel_loop = 0;
static void usage(int err);
static void slaveMode(void);
static void slaveMode(int send_sync);
char *redisGitSHA1(void);
char *redisGitDirty(void);
static int cliConnect(int flags);
Expand Down Expand Up @@ -1848,7 +1848,7 @@ static int cliSendCommand(int argc, char **argv, long repeat) {

if (config.slave_mode) {
printf("Entering replica output mode... (press Ctrl-C to quit)\n");
slaveMode();
slaveMode(0);
config.slave_mode = 0;
zfree(argvlen);
return REDIS_ERR; /* Error = slaveMode lost connection to master */
Expand Down Expand Up @@ -7695,19 +7695,36 @@ static ssize_t readConn(redisContext *c, char *buf, size_t len)

/* Sends SYNC and reads the number of bytes in the payload. Used both by
* slaveMode() and getRDB().
* returns 0 in case an EOF marker is used. */
unsigned long long sendSync(redisContext *c, char *out_eof) {
*
* send_sync if 1 means we will explicitly send SYNC command. If 0 means
* we will not send SYNC command, will send the command that in c->obuf.
*
* Returns the size of the RDB payload to read, or 0 in case an EOF marker is used and the size
* is unknown, also returns 0 in case a PSYNC +CONTINUE was found (no RDB payload).
*
* The out_full_mode parameter if 1 means this is a full sync, if 0 means this is partial mode. */
unsigned long long sendSync(redisContext *c, int send_sync, char *out_eof, int *out_full_mode) {
/* To start we need to send the SYNC command and return the payload.
* The hiredis client lib does not understand this part of the protocol
* and we don't want to mess with its buffers, so everything is performed
* using direct low-level I/O. */
char buf[4096], *p;
ssize_t nread;

/* Send the SYNC command. */
if (cliWriteConn(c, "SYNC\r\n", 6) != 6) {
fprintf(stderr,"Error writing to master\n");
exit(1);
if (out_full_mode) *out_full_mode = 1;

if (send_sync) {
/* Send the SYNC command. */
if (cliWriteConn(c, "SYNC\r\n", 6) != 6) {
fprintf(stderr,"Error writing to master\n");
exit(1);
}
} else {
/* We have written the command into c->obuf before. */
if (cliWriteConn(c, "", 0) != 0) {
fprintf(stderr,"Error writing to master\n");
exit(1);
}
}

/* Read $<payload>\r\n, making sure to read just up to "\n" */
Expand All @@ -7720,46 +7737,81 @@ unsigned long long sendSync(redisContext *c, char *out_eof) {
}
if (*p == '\n' && p != buf) break;
if (*p != '\n') p++;
if (p >= buf + sizeof(buf) - 1) break; /* Go back one more char for null-term. */
}
*p = '\0';
if (buf[0] == '-') {
fprintf(stderr, "SYNC with master failed: %s\n", buf);
exit(1);
}

/* Handling PSYNC responses.
* Read +FULLRESYNC <replid> <offset>\r\n, after that is the $<payload> or the $EOF:<40 bytes delimiter>
* Read +CONTINUE <replid>\r\n or +CONTINUE\r\n, after that is the command stream */
if (!strncmp(buf, "+FULLRESYNC", 11) ||
!strncmp(buf, "+CONTINUE", 9))
{
int sync_partial = !strncmp(buf, "+CONTINUE", 9);
fprintf(stderr, "PSYNC replied %s\n", buf);
p = buf;
while(1) {
nread = readConn(c,p,1);
if (nread <= 0) {
fprintf(stderr,"Error reading bulk length while PSYNCing\n");
exit(1);
}
if (*p == '\n' && p != buf) break;
if (*p != '\n') p++;
if (p >= buf + sizeof(buf) - 1) break; /* Go back one more char for null-term. */
}
*p = '\0';

if (sync_partial) {
if (out_full_mode) *out_full_mode = 0;
return 0;
}
}

if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= RDB_EOF_MARK_SIZE) {
memcpy(out_eof, buf+5, RDB_EOF_MARK_SIZE);
return 0;
}
return strtoull(buf+1,NULL,10);
}

static void slaveMode(void) {
static void slaveMode(int send_sync) {
static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
unsigned long long payload = sendSync(context,eofmark);
static int out_full_mode;
unsigned long long payload = sendSync(context, send_sync, eofmark, &out_full_mode);
char buf[1024];
int original_output = config.output;
char *info = out_full_mode ? "Full resync" : "Partial resync";

if (payload == 0) {
if (out_full_mode == 1 && payload == 0) {
/* SYNC with EOF marker or PSYNC +FULLRESYNC with EOF marker. */
payload = ULLONG_MAX;
memset(lastbytes,0,RDB_EOF_MARK_SIZE);
usemark = 1;
fprintf(stderr,"SYNC with master, discarding "
"bytes of bulk transfer until EOF marker...\n");
} else {
fprintf(stderr,"SYNC with master, discarding %llu "
"bytes of bulk transfer...\n", payload);
fprintf(stderr, "%s with master, discarding "
"bytes of bulk transfer until EOF marker...\n", info);
} else if (out_full_mode == 1 && payload != 0) {
/* SYNC without EOF marker or PSYNC +FULLRESYNC. */
fprintf(stderr, "%s with master, discarding %llu "
"bytes of bulk transfer...\n", info, payload);
} else if (out_full_mode == 0 && payload == 0) {
/* PSYNC +CONTINUE (no RDB payload). */
fprintf(stderr, "%s with master...\n", info);
}


/* Discard the payload. */
while(payload) {
ssize_t nread;

nread = readConn(context,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
if (nread <= 0) {
fprintf(stderr,"Error reading RDB payload while SYNCing\n");
fprintf(stderr,"Error reading RDB payload while %sing\n", info);
exit(1);
}
payload -= nread;
Expand All @@ -7780,12 +7832,12 @@ static void slaveMode(void) {

if (usemark) {
unsigned long long offset = ULLONG_MAX - payload;
fprintf(stderr,"SYNC done after %llu bytes. Logging commands from master.\n", offset);
fprintf(stderr,"%s done after %llu bytes. Logging commands from master.\n", info, offset);
/* put the slave online */
sleep(1);
sendReplconf("ACK", "0");
} else
fprintf(stderr,"SYNC done. Logging commands from master.\n");
fprintf(stderr,"%s done. Logging commands from master.\n", info);

/* Now we can use hiredis to read the incoming protocol. */
config.output = OUTPUT_CSV;
Expand Down Expand Up @@ -7814,7 +7866,7 @@ static void getRDB(clusterManagerNode *node) {
static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
unsigned long long payload = sendSync(s, eofmark);
unsigned long long payload = sendSync(s, 1, eofmark, NULL);
char buf[4096];

if (payload == 0) {
Expand Down Expand Up @@ -9027,7 +9079,7 @@ int main(int argc, char **argv) {
if (cliConnect(0) == REDIS_ERR) exit(1);
sendCapa();
sendReplconf("rdb-filter-only", "");
slaveMode();
slaveMode(1);
}

/* Get RDB/functions mode. */
Expand Down

0 comments on commit 4ef4c4a

Please sign in to comment.