Skip to content

Commit

Permalink
Fix for CONC-659:
Browse files Browse the repository at this point in the history
When checking for a semi sync indication header, we need also check if
the undocumented session variable @rpl_semi_sync_slave was set.
Otherwise the timestamp of the event could contain values which match
the 2 bytes of the semi sync header.

Since the variable rpl_semi_sync_slave and it's behavior is not documented,
a new option MARIADB_RPL_SEMI_SYNC was added.
  • Loading branch information
9EOR9 committed Aug 11, 2023
1 parent cd59c70 commit c8ca891
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 25 deletions.
1 change: 1 addition & 0 deletions include/errmsg.h
Expand Up @@ -113,6 +113,7 @@ extern const char *mariadb_client_errors[]; /* Error messages */
#define CR_UNKNOWN_BINLOG_EVENT 5020
#define CR_BINLOG_ERROR 5021
#define CR_BINLOG_INVALID_FILE 5022
#define CR_BINLOG_SEMI_SYNC_ERROR 5023

/* Always last, if you add new error codes please update the
value for CR_MARIADB_LAST_ERROR */
Expand Down
6 changes: 4 additions & 2 deletions include/mariadb_rpl.h
Expand Up @@ -86,7 +86,8 @@ enum mariadb_rpl_option {
MARIADB_RPL_UNCOMPRESS,
MARIADB_RPL_HOST,
MARIADB_RPL_PORT,
MARIADB_RPL_EXTRACT_VALUES
MARIADB_RPL_EXTRACT_VALUES,
MARIADB_RPL_SEMI_SYNC,
};

/* Event types: From MariaDB Server sql/log_event.h */
Expand Down Expand Up @@ -340,7 +341,8 @@ typedef struct st_mariadb_rpl {
uint8_t extract_values;
char nonce[12];
uint8_t encrypted;
} MARIADB_RPL;
uint8_t is_semi_sync;
}MARIADB_RPL;

typedef struct st_mariadb_rpl_value {
enum enum_field_types field_type;
Expand Down
1 change: 1 addition & 0 deletions libmariadb/ma_errmsg.c
Expand Up @@ -117,6 +117,7 @@ const char *mariadb_client_errors[] =
/* 5020 */ "Binary log error (File: %.*s start_pos=%ld): Unknown event type (%d) with flag 'not_ignorable'.",
/* 5021 */ "Binary log error (File: %.*s start_pos=%ld): %s.",
/* 5022 */ "File '%s' is not a binary log file",
/* 5023 */ "Semi sync request error: %s",
""
};

Expand Down
115 changes: 96 additions & 19 deletions libmariadb/mariadb_rpl.c
Expand Up @@ -33,6 +33,8 @@

#ifdef WIN32
#include <malloc.h>
#undef alloca
#define alloca _malloca
#endif

#define RPL_EVENT_HEADER_SIZE 19
Expand Down Expand Up @@ -690,9 +692,9 @@ MARIADB_RPL * STDCALL mariadb_rpl_init_ex(MYSQL *mysql, unsigned int version)

if ((rpl->mysql= mysql))
{
MYSQL_RES *result;
if (!mysql_query(mysql, "select @@binlog_checksum"))
{
MYSQL_RES *result;
if ((result= mysql_store_result(mysql)))
{
MYSQL_ROW row= mysql_fetch_row(result);
Expand Down Expand Up @@ -795,6 +797,35 @@ int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl)
uint32_t replica_id= rpl->server_id;
ptr= buf= (unsigned char *)alloca(rpl->filename_length + 11);

if (!ptr)
{
rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0);
}

if (rpl->is_semi_sync)
{
if (mysql_query(rpl->mysql, "SET @rpl_semi_sync_slave=1"))
{
rpl_set_error(rpl, mysql_errno(rpl->mysql), 0, mysql_error(rpl->mysql));
return 1;
}
}
else {
MYSQL_RES* result;
MYSQL_ROW row;
if (mysql_query(rpl->mysql, "SELECT @rpl_semi_sync_slave=1"))
{
rpl_set_error(rpl, mysql_errno(rpl->mysql), 0, mysql_error(rpl->mysql));
return 1;
}
if ((result = mysql_store_result(rpl->mysql)))
{
if ((row = mysql_fetch_row(result)))
rpl->is_semi_sync = (row[0] != NULL && row[0][0] == '1');
mysql_free_result(result);
}
}

int4store(ptr, (unsigned int)rpl->start_position);
ptr+= 4;
int2store(ptr, rpl->flags);
Expand All @@ -812,6 +843,9 @@ int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl)
char *buf[RPL_BINLOG_MAGIC_SIZE];
MYSQL mysql;

/* Semi sync doesn't work when processing files */
rpl->is_semi_sync = 0;

if (rpl->fp)
ma_close(rpl->fp);

Expand Down Expand Up @@ -909,6 +943,50 @@ static uint32_t get_compression_info(const unsigned char *buf,
return len;
}

static uint8_t mariadb_rpl_send_semisync_ack(MARIADB_RPL* rpl, MARIADB_RPL_EVENT* event)
{
size_t buf_size = 0;
uchar* buf;

if (!rpl)
return 1;

if (!event)
{
rpl_set_error(rpl, CR_BINLOG_SEMI_SYNC_ERROR, 0, "Invalid event");
return 1;
}

if (!rpl->is_semi_sync)
{
rpl_set_error(rpl, CR_BINLOG_SEMI_SYNC_ERROR, 0, "semi synchronous replication is not enabled");
return 1;
}
if (!event->is_semi_sync || !event->semi_sync_flags != SEMI_SYNC_ACK_REQ)
{
rpl_set_error(rpl, CR_BINLOG_SEMI_SYNC_ERROR, 0, "This event doesn't require to send semi synchronous acknoledgement");
return 1;
}

buf_size = rpl->filename_length + 9;
buf = alloca(buf_size);

buf[0] = SEMI_SYNC_INDICATOR;
int8store(buf + 1, event->next_event_pos);
memcpy(buf + 9, rpl->filename, rpl->filename_length);

ma_net_clear(&rpl->mysql->net);

if (ma_net_write(&rpl->mysql->net, buf, buf_size) ||
(ma_net_flush(&rpl->mysql->net)))
{
rpl_set_error(rpl, CR_CONNECTION_ERROR, 0);
return 1;
}

return 0;
}

MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVENT *event)
{
unsigned char *ev= 0;
Expand Down Expand Up @@ -1032,7 +1110,7 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN
rpl_event->ok= *ev++;

/* CONC-470: add support for semi snychronous replication */
if ((rpl_event->is_semi_sync= (*ev == SEMI_SYNC_INDICATOR)))
if (rpl->is_semi_sync && (rpl_event->is_semi_sync= (*ev == SEMI_SYNC_INDICATOR)))
{
RPL_CHECK_POS(ev, ev_end, 1);
ev++;
Expand Down Expand Up @@ -1826,19 +1904,11 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN
if (rpl_event->is_semi_sync &&
rpl_event->semi_sync_flags == SEMI_SYNC_ACK_REQ)
{
size_t buf_size= rpl->filename_length + 1 + 8;
uchar *buffer= alloca(buf_size);

buffer[0]= SEMI_SYNC_INDICATOR;
int8store(buffer + 1, (int64_t)rpl_event->next_event_pos);
memcpy(buffer + 9, rpl->filename, rpl->filename_length);

/* clear network buffer before sending the reply packet*/
ma_net_clear(&rpl->mysql->net);

if (ma_net_write(&rpl->mysql->net, buffer, buf_size) ||
(ma_net_flush(&rpl->mysql->net)))
goto net_error;
if (mariadb_rpl_send_semisync_ack(rpl, rpl_event))
{
/* ACK failed and rpl->error was set */
return rpl_event;
}
}

if (rpl->use_checksum && !rpl_event->checksum)
Expand All @@ -1865,10 +1935,6 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN
mariadb_free_rpl_event(rpl_event);
rpl_set_error(rpl, CR_OUT_OF_MEMORY, 0);
return 0;
net_error:
mariadb_free_rpl_event(rpl_event);
rpl_set_error(rpl, CR_CONNECTION_ERROR, 0);
return 0;
malformed_packet:
rpl_set_error(rpl, CR_BINLOG_ERROR, 0, RPL_ERR_POS(rpl),
"Malformed packet");
Expand Down Expand Up @@ -1961,6 +2027,11 @@ int STDCALL mariadb_rpl_optionsv(MARIADB_RPL *rpl,
rpl->extract_values= (uint8_t)va_arg(ap, uint32_t);
break;
}
case MARIADB_RPL_SEMI_SYNC:
{
rpl->is_semi_sync = (uint8_t)va_arg(ap, uint32_t);
break;
}
default:
rc= -1;
goto end;
Expand Down Expand Up @@ -2009,6 +2080,12 @@ int STDCALL mariadb_rpl_get_optionsv(MARIADB_RPL *rpl,
*start= rpl->start_position;
break;
}
case MARIADB_RPL_SEMI_SYNC:
{
unsigned int* semi_sync = va_arg(ap, unsigned int*);
*semi_sync = rpl->is_semi_sync;
}

default:
va_end(ap);
return 1;
Expand Down
7 changes: 3 additions & 4 deletions unittest/libmariadb/rpl_api.c
Expand Up @@ -225,7 +225,6 @@ static int test_conc467(MYSQL *my __attribute__((unused)))
mysql_query(mysql, "SET @slave_gtid_ignore_duplicates=1");
mysql_query(mysql, "SET NAMES utf8");
mysql_query(mysql, "SET @master_binlog_checksum= @@global.binlog_checksum");
mysql_query(mysql, "SET @rpl_semi_sync_slave=1");
rpl->server_id= 12;
rpl->start_position= 4;
rpl->flags= MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS;
Expand Down Expand Up @@ -343,9 +342,9 @@ static int test_conc592(MYSQL *my __attribute__((unused)))
}

struct my_tests_st my_tests[] = {
{"test_conc592", test_conc592, TEST_CONNECTION_NEW, 0, NULL, NULL},
{"test_rpl_async", test_rpl_async, TEST_CONNECTION_NEW, 0, NULL, NULL},
{"test_rpl_semisync", test_rpl_semisync, TEST_CONNECTION_NEW, 0, NULL, NULL},
// {"test_conc592", test_conc592, TEST_CONNECTION_NEW, 0, NULL, NULL},
//{"test_rpl_async", test_rpl_async, TEST_CONNECTION_NEW, 0, NULL, NULL},
//{"test_rpl_semisync", test_rpl_semisync, TEST_CONNECTION_NEW, 0, NULL, NULL},
{"test_conc467", test_conc467, TEST_CONNECTION_NEW, 0, NULL, NULL},
{NULL, NULL, 0, 0, NULL, NULL}
};
Expand Down

0 comments on commit c8ca891

Please sign in to comment.