Skip to content

Commit

Permalink
CONC-470: Support for semi synchronous replication
Browse files Browse the repository at this point in the history
Beside already supported asynchronous replication
the replication/binlog API now supports semi
synchronous replication:

If an event contains a semi synchronous indicator (0xEF)
behind status byte and acknowledgement flag is set,
mariadb_rpl_fetch() automatically sends an acknowledge
message to the connected primary server.
  • Loading branch information
9EOR9 committed Oct 9, 2021
1 parent 52934a1 commit 004f9d4
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 8 deletions.
8 changes: 7 additions & 1 deletion include/mariadb_rpl.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2018 MariaDB Corporation AB
/* Copyright (C) 2018-2021 MariaDB Corporation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
Expand Down Expand Up @@ -38,6 +38,9 @@ extern "C" {

#define LOG_EVENT_ARTIFICIAL_F 0x20

/* SEMI SYNCHRONOUS REPLICATION */
#define SEMI_SYNC_INDICATOR 0xEF
#define SEMI_SYNC_ACK_REQ 0x01

/* Options */
enum mariadb_rpl_option {
Expand Down Expand Up @@ -266,6 +269,9 @@ typedef struct st_mariadb_rpl_event
unsigned int event_length;
unsigned int next_event_pos;
unsigned short flags;
/* Added in C/C 3.3.0 */
uint8_t is_semi_sync;
uint8_t semi_sync_flags;
/****************/
union {
struct st_mariadb_rpl_rotate_event rotate;
Expand Down
40 changes: 38 additions & 2 deletions libmariadb/mariadb_rpl.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/************************************************************************************
Copyright (C) 2018 MariaDB Corpoeation AB
Copyright (C) 2018-2021 MariaDB Corpoeation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
Expand Down Expand Up @@ -93,7 +93,7 @@ int STDCALL mariadb_rpl_open(MARIADB_RPL *rpl)
*/
ptr= buf=
#ifdef WIN32
(unsigned char *)_alloca(rpl->filename_length + 11);
(unsigned char *)_alloca(rpl->filename_length + 11);
#else
(unsigned char *)alloca(rpl->filename_length + 11);
#endif
Expand Down Expand Up @@ -171,6 +171,14 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN
rpl_event->checksum= uint4korr(rpl->buffer + rpl->buffer_size - 4);

rpl_event->ok= rpl->buffer[0];

/* CONC-470: add support for semi snychronous replication */
if ((rpl_event->is_semi_sync= (rpl->buffer[1] == SEMI_SYNC_INDICATOR)))
{
rpl_event->semi_sync_flags= rpl->buffer[2];
rpl->buffer+= 2;
}

rpl_event->timestamp= uint4korr(rpl->buffer + 1);
rpl_event->event_type= (unsigned char)*(rpl->buffer + 5);
rpl_event->server_id= uint4korr(rpl->buffer + 6);
Expand Down Expand Up @@ -201,6 +209,11 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN
ev+= 4;
if (rpl_alloc_string(rpl_event, &rpl_event->event.checkpoint.filename, ev, len))
goto mem_error;
free(rpl->filename);
if (!(rpl->filename= (char *)malloc(len)))
goto mem_error;
memcpy(rpl->filename, ev, len);
rpl->filename_length= len;
break;
case FORMAT_DESCRIPTION_EVENT:
rpl_event->event.format_description.format = uint2korr(ev);
Expand Down Expand Up @@ -391,12 +404,35 @@ MARIADB_RPL_EVENT * STDCALL mariadb_rpl_fetch(MARIADB_RPL *rpl, MARIADB_RPL_EVEN
return NULL;
break;
}

/* check if we have to send acknoledgement to primary
when semi sync replication is used */
if (rpl_event->is_semi_sync &&
rpl_event->semi_sync_flags == SEMI_SYNC_ACK_REQ)
{
size_t buf_size= rpl->filename_length + 1 + 9;
uchar *buffer= alloca(buf_size);

buffer[0]= SEMI_SYNC_INDICATOR;
int8store(buffer + 1, (int64_t)rpl_event->next_event_pos);
memcpy(buffer + 9, rpl->filename, rpl->filename_length);
buffer[buf_size - 1]= 0;

if (ma_net_write(&rpl->mysql->net, buffer, buf_size) ||
(ma_net_flush(&rpl->mysql->net)))
goto net_error;
}

return rpl_event;
}
mem_error:
free(rpl_event);
SET_CLIENT_ERROR(rpl->mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
return 0;
net_error:
free(rpl_event);
SET_CLIENT_ERROR(rpl->mysql, CR_CONNECTION_ERROR, SQLSTATE_UNKNOWN, 0);
return 0;
}

void STDCALL mariadb_rpl_close(MARIADB_RPL *rpl)
Expand Down
119 changes: 114 additions & 5 deletions unittest/libmariadb/rpl_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,47 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "my_test.h"
#include "mariadb_rpl.h"

static int test_rpl_01(MYSQL *mysql)
static int test_rpl_async(MYSQL *my __attribute__((unused)))
{
MYSQL *mysql= mysql_init(NULL);
MYSQL_RES *result;
MYSQL_ROW row;
MARIADB_RPL_EVENT *event= NULL;
MARIADB_RPL *rpl= mariadb_rpl_init(mysql);
MARIADB_RPL *rpl;
int events= 0, rc;

SKIP_SKYSQL;
SKIP_MAXSCALE;

if (!is_mariadb)
return SKIP;

if (!my_test_connect(mysql, hostname, username,
password, schema, port, socketname, 0))
{
diag("Error: %s", mysql_error(mysql));
mysql_close(mysql);
return FAIL;
}

rc= mysql_query(mysql, "SELECT @@log_bin");
check_mysql_rc(rc, mysql);

result= mysql_store_result(mysql);
row= mysql_fetch_row(result);
if (!atoi(row[0]))
rc= SKIP;
mysql_free_result(result);

if (rc == SKIP)
{
diag("binary log disabled -> skip");
mysql_close(mysql);
return SKIP;
}

rpl = mariadb_rpl_init(mysql);

mysql_query(mysql, "SET @mariadb_slave_capability=4");
mysql_query(mysql, "SET NAMES latin1");
mysql_query(mysql, "SET @slave_gtid_strict_mode=1");
Expand All @@ -45,18 +82,90 @@ static int test_rpl_01(MYSQL *mysql)
if (mariadb_rpl_open(rpl))
return FAIL;

while((event= mariadb_rpl_fetch(rpl, event)))
/* We run rpl_api as very last test, too make sure
binary log contains > 10000 events.
*/
while((event= mariadb_rpl_fetch(rpl, event)) && events < 10000)
{
diag("event: %d\n", event->event_type);
events++;
}
mariadb_free_rpl_event(event);
mariadb_rpl_close(rpl);
mysql_close(mysql);
return OK;
}

static int test_rpl_semisync(MYSQL *my __attribute__((unused)))
{
MYSQL *mysql= mysql_init(NULL);
MYSQL_RES *result;
MYSQL_ROW row;
MARIADB_RPL_EVENT *event= NULL;
MARIADB_RPL *rpl;
int events= 0, rc;

SKIP_SKYSQL;
SKIP_MAXSCALE;

if (!is_mariadb)
return SKIP;

if (!my_test_connect(mysql, hostname, username,
password, schema, port, socketname, 0))
{
diag("Error: %s", mysql_error(mysql));
mysql_close(mysql);
return FAIL;
}

rc= mysql_query(mysql, "SELECT @@log_bin");
check_mysql_rc(rc, mysql);

result= mysql_store_result(mysql);
row= mysql_fetch_row(result);
if (!atoi(row[0]))
rc= SKIP;
mysql_free_result(result);

if (rc == SKIP)
{
diag("binary log disabled -> skip");
mysql_close(mysql);
return SKIP;
}

rpl = mariadb_rpl_init(mysql);

mysql_query(mysql, "SET @mariadb_slave_capability=4");
mysql_query(mysql, "SET NAMES latin1");
mysql_query(mysql, "SET @slave_gtid_strict_mode=1");
mysql_query(mysql, "SET @slave_gtid_ignore_duplicates=1");
mysql_query(mysql, "SET NAMES utf8");
mysql_query(mysql, "SET @master_binlog_checksum= @@global.binlog_checksum");
mysql_query(mysql, "SET @rpl_semi_sync_slave=1");
rpl->server_id= 12;
rpl->start_position= 4;
rpl->flags= MARIADB_RPL_BINLOG_SEND_ANNOTATE_ROWS;

if (mariadb_rpl_open(rpl))
return FAIL;

/* We run rpl_api as very last test, too make sure
binary log contains > 10000 events.
*/
while((event= mariadb_rpl_fetch(rpl, event)) && events < 10000)
{
events++;
}
mariadb_free_rpl_event(event);
mariadb_rpl_close(rpl);
mysql_close(mysql);
return OK;
}

struct my_tests_st my_tests[] = {
{"test_rpl_01", test_rpl_01, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"test_rpl_async", test_rpl_async, TEST_CONNECTION_NEW, 0, NULL, NULL},
{"test_rpl_semisync", test_rpl_semisync, TEST_CONNECTION_NEW, 0, NULL, NULL},
{NULL, NULL, 0, 0, NULL, NULL}
};

Expand Down

0 comments on commit 004f9d4

Please sign in to comment.