Skip to content

Commit

Permalink
status and session_tracik callback function:
Browse files Browse the repository at this point in the history
This feature allows client applications to register a callback function,
which is called as soon as the server status changes or session_track
information was sent by the server.

Registration is handled via mysql_optionsv() API function:

mysql_optionsv(mysql, MARIADB_OPT_STATUS_CALLBACK, function, data)

The callback function must be defined as follws:

void status_callback(void *data, enum enum_mariadb_status_info type, ..)

  Parameters:
    - data  Pointer passed with registration of callback function
            (usually a connection handle)
    - type  Information type  STATUS_TYPE or SESSION_TRACK_TYPE

  Variadic Parameters:

  if (type == STATUS_TYPE):
    - server status (unsigned int)

  if (type == SESSION_TRACK_TYPE)
    - enum enum_session_state_type track_type - session track type

    if (track_type == SESSION_TRACK_SYSTEM_VARIABLES)
      - MARIADB_CONST_STRING *key
      - MARIADB_CONST_STRING *value

    else
      - MARIADB_CONST_STRING *value

An example can be found in connection.c (test_status_callback)
  • Loading branch information
9EOR9 committed Aug 2, 2022
1 parent 8eff2a8 commit a8832af
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 34 deletions.
2 changes: 2 additions & 0 deletions include/ma_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ struct st_mysql_options_extension {
char *restricted_auth;
char *rpl_host;
unsigned short rpl_port;
void (*status_callback)(void *ptr, enum enum_mariadb_status_info type, ...);
void *status_data;
};

typedef struct st_connection_handler
Expand Down
7 changes: 7 additions & 0 deletions include/mariadb_com.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,13 @@ enum enum_mysql_set_option
MYSQL_OPTION_MULTI_STATEMENTS_OFF
};

/* for status callback function */
enum enum_mariadb_status_info
{
STATUS_TYPE= 0,
SESSION_TRACK_TYPE
};

enum enum_session_state_type
{
SESSION_TRACK_SYSTEM_VARIABLES= 0,
Expand Down
3 changes: 2 additions & 1 deletion include/mysql.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ extern const char *SQLSTATE_UNKNOWN;
MARIADB_OPT_IO_WAIT,
MARIADB_OPT_SKIP_READ_RESPONSE,
MARIADB_OPT_RESTRICTED_AUTH,
MARIADB_OPT_RPL_REGISTER_REPLICA
MARIADB_OPT_RPL_REGISTER_REPLICA,
MARIADB_OPT_STATUS_CALLBACK
};

enum mariadb_value {
Expand Down
156 changes: 123 additions & 33 deletions libmariadb/mariadb_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,10 @@ mysql_real_connect(MYSQL *mysql, const char *host, const char *user,
if (!mysql->methods)
mysql->methods= &MARIADB_DEFAULT_METHODS;

/* set default */
if (!mysql->options.extension || !mysql->options.extension->status_callback)
mysql_optionsv(mysql, MARIADB_OPT_STATUS_CALLBACK, NULL, NULL);

/* if host contains a semicolon, we need to parse connection string */
if (host && strchr(host, ';'))
{
Expand Down Expand Up @@ -2430,19 +2434,97 @@ mysql_query(MYSQL *mysql, const char *query)
finish processing it.
*/



int STDCALL
mysql_send_query(MYSQL* mysql, const char* query, unsigned long length)
{
return ma_simple_command(mysql, COM_QUERY, query, length, 1,0);
}

void ma_save_session_track_info(void *ptr, enum enum_mariadb_status_info type, ...)
{
MYSQL *mysql= (MYSQL *)ptr;
enum enum_session_state_type track_type;
va_list ap;

DBUG_ASSERT(mysql != NULL);

/* We only handle SESSION_TRACK_TYPE here */
if (type != SESSION_TRACK_TYPE)
return;

va_start(ap, type);

track_type= va_arg(ap, enum enum_session_state_type);

switch (track_type) {
case SESSION_TRACK_SCHEMA:
case SESSION_TRACK_STATE_CHANGE:
case SESSION_TRACK_TRANSACTION_CHARACTERISTICS:
case SESSION_TRACK_TRANSACTION_STATE:
case SESSION_TRACK_GTIDS:
case SESSION_TRACK_SYSTEM_VARIABLES:
{
LIST *session_item;
MYSQL_LEX_STRING *str;
char *tmp;
MARIADB_CONST_STRING *data1= va_arg(ap, MARIADB_CONST_STRING *);

if (!(session_item= ma_multi_malloc(0,
&session_item, sizeof(LIST),
&str, sizeof(MYSQL_LEX_STRING),
&tmp, data1->length,
NULL)))
goto mem_error;

str->str= tmp;
memcpy(str->str, data1->str, data1->length);
str->length= data1->length;
session_item->data= str;
mysql->extension->session_state[track_type].list= list_add(mysql->extension->session_state[track_type].list,
session_item);
if (track_type == SESSION_TRACK_SYSTEM_VARIABLES)
{
MARIADB_CONST_STRING *data2= va_arg(ap, MARIADB_CONST_STRING *);
if (!(session_item= ma_multi_malloc(0,
&session_item, sizeof(LIST),
&str, sizeof(MYSQL_LEX_STRING),
&tmp, data2->length,
NULL)))
goto mem_error;

str->str= tmp;
memcpy(str->str, data2->str, data2->length);
str->length= data2->length;
session_item->data= str;
mysql->extension->session_state[track_type].list= list_add(mysql->extension->session_state[track_type].list,
session_item);
}
}
break;
}
return;

mem_error:
SET_CLIENT_ERROR(mysql, CR_OUT_OF_MEMORY, SQLSTATE_UNKNOWN, 0);
return;
}

int ma_read_ok_packet(MYSQL *mysql, uchar *pos, ulong length)
{
uchar *end= mysql->net.read_pos+length;
size_t item_len;
unsigned int last_server_status= mysql->server_status;
mysql->affected_rows= net_field_length_ll(&pos);
mysql->insert_id= net_field_length_ll(&pos);
mysql->server_status=uint2korr(pos);

/* callback */
if (mysql->options.extension->status_callback &&
mysql->server_status != last_server_status)
mysql->options.extension->status_callback(mysql->options.extension->status_data,
STATUS_TYPE, mysql->server_status);
pos+=2;
mysql->warning_count=uint2korr(pos);
pos+=2;
Expand All @@ -2466,8 +2548,6 @@ int ma_read_ok_packet(MYSQL *mysql, uchar *pos, ulong length)
int i;
if (pos < end)
{
LIST *session_item;
MYSQL_LEX_STRING *str= NULL;
enum enum_session_state_type si_type;
uchar *old_pos= pos;

Expand All @@ -2483,7 +2563,7 @@ int ma_read_ok_packet(MYSQL *mysql, uchar *pos, ulong length)
while (pos < end)
{
size_t plen;
char *data;
MYSQL_LEX_STRING data1, data2;
si_type= (enum enum_session_state_type)net_field_length(&pos);

switch(si_type) {
Expand All @@ -2505,55 +2585,52 @@ int ma_read_ok_packet(MYSQL *mysql, uchar *pos, ulong length)
plen= net_field_length(&pos);
if (pos + plen > end)
goto corrupted;
if (!(session_item= ma_multi_malloc(0,
&session_item, sizeof(LIST),
&str, sizeof(MYSQL_LEX_STRING),
&data, plen,
NULL)))
goto oom;
str->length= plen;
str->str= data;
memcpy(str->str, (char *)pos, plen);
pos+= plen;
session_item->data= str;
mysql->extension->session_state[si_type].list= list_add(mysql->extension->session_state[si_type].list, session_item);

data1.str= (char *)pos;
data1.length= plen;
if (si_type != SESSION_TRACK_SYSTEM_VARIABLES)
{
mysql->options.extension->status_callback(mysql->options.extension->status_data,
SESSION_TRACK_TYPE, si_type,
&data1);
if (mysql->net.last_errno)
goto oom;
}
pos+= plen;
/* in case schema has changed, we have to update mysql->db */
if (si_type == SESSION_TRACK_SCHEMA)
{
free(mysql->db);
mysql->db= malloc(plen + 1);
memcpy(mysql->db, str->str, plen);
mysql->db[plen]= 0;
memcpy(mysql->db, data1.str, data1.length);
mysql->db[data1.length]= 0;
}
else if (si_type == SESSION_TRACK_SYSTEM_VARIABLES)
{
my_bool set_charset= 0;
/* make sure that we update charset in case it has changed */
if (!strncmp(str->str, "character_set_client", str->length))
if (!strncmp(data1.str, "character_set_client", plen))
set_charset= 1;
plen= net_field_length(&pos);
if (pos + plen > end)
goto corrupted;
if (!(session_item= ma_multi_malloc(0,
&session_item, sizeof(LIST),
&str, sizeof(MYSQL_LEX_STRING),
&data, plen,
NULL)))
goto oom;
str->length= plen;
str->str= data;
memcpy(str->str, (char *)pos, plen);
data2.str= (char *)pos;
data2.length= plen;

mysql->options.extension->status_callback(mysql->options.extension->status_data,
SESSION_TRACK_TYPE, si_type,
&data1, &data2);
if (mysql->net.last_errno)
goto oom;

pos+= plen;
session_item->data= str;
mysql->extension->session_state[si_type].list= list_add(mysql->extension->session_state[si_type].list, session_item);
if (set_charset && str->length < CHARSET_NAME_LEN &&
strncmp(mysql->charset->csname, str->str, str->length) != 0)
if (set_charset && plen < CHARSET_NAME_LEN &&
strncmp(mysql->charset->csname, data2.str, data2.length) != 0)
{
char cs_name[CHARSET_NAME_LEN];
const MARIADB_CHARSET_INFO *cs_info;
memcpy(cs_name, str->str, str->length);
cs_name[str->length]= 0;
memcpy(cs_name, data2.str, data2.length);
cs_name[plen]= 0;
if ((cs_info = mysql_find_charset_name(cs_name)))
mysql->charset= cs_info;
}
Expand Down Expand Up @@ -3663,6 +3740,19 @@ mysql_optionsv(MYSQL *mysql,enum mysql_option option, ...)
OPT_SET_EXTENDED_VALUE(&mysql->options, rpl_port, (ushort)arg2);
}
break;
case MARIADB_OPT_STATUS_CALLBACK:
{
void *arg2= va_arg(ap, void *);
if (arg1)
{
OPT_SET_EXTENDED_VALUE(&mysql->options, status_callback, arg1);
OPT_SET_EXTENDED_VALUE(&mysql->options, status_data, arg2);
} else {
OPT_SET_EXTENDED_VALUE(&mysql->options, status_callback, ma_save_session_track_info);
OPT_SET_EXTENDED_VALUE(&mysql->options, status_data, mysql);
}
}
break;
default:
va_end(ap);
SET_CLIENT_ERROR(mysql, CR_NOT_IMPLEMENTED, SQLSTATE_UNKNOWN, 0);
Expand Down
102 changes: 102 additions & 0 deletions unittest/libmariadb/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -2151,7 +2151,109 @@ static int test_conc365_reconnect(MYSQL *my)
return rc;
}

struct st_callback {
char autocommit;
char database[64];
char charset[64];
};

void my_status_callback(void *ptr, enum enum_mariadb_status_info type, ...)
{
va_list ap;
struct st_callback *data= (struct st_callback *)ptr;
va_start(ap, type);

switch(type) {
case STATUS_TYPE:
{
int status= va_arg(ap, int);
data->autocommit= status & SERVER_STATUS_AUTOCOMMIT;
}
break;
case SESSION_TRACK_TYPE:
{
enum enum_session_state_type track_type= va_arg(ap, enum enum_session_state_type);
switch (track_type) {
case SESSION_TRACK_SCHEMA:
{
MARIADB_CONST_STRING *str= va_arg(ap, MARIADB_CONST_STRING *);
strncpy(data->database, str->str, str->length);
}
break;
case SESSION_TRACK_SYSTEM_VARIABLES:
{
MARIADB_CONST_STRING *key= va_arg(ap, MARIADB_CONST_STRING *);
MARIADB_CONST_STRING *val= va_arg(ap, MARIADB_CONST_STRING *);

if (!strncmp(key->str, "character_set_client", key->length))
{
strncpy(data->charset, val->str, val->length);
}
}
break;
default:
break;
}
}
default:
break;
}
va_end(ap);
}

static int test_status_callback(MYSQL *my __attribute__((unused)))
{
MYSQL *mysql= mysql_init(NULL);
char tmp[64];
int rc;
struct st_callback data= {0,"", ""};

rc= mysql_optionsv(mysql, MARIADB_OPT_STATUS_CALLBACK, my_status_callback, &data);

if (!my_test_connect(mysql, hostname, username,
password, NULL, port, socketname, 0))
{
diag("error1: %s", mysql_error(mysql));
return FAIL;
}

rc= mysql_autocommit(mysql, 0);
check_mysql_rc(rc, mysql);
rc= mysql_autocommit(mysql, 1);
check_mysql_rc(rc, mysql);

if (!data.autocommit)
{
diag("autocommit not set");
return FAIL;
}
diag("-------------------------");

sprintf(tmp, "USE %s", schema);
rc= mysql_query(mysql, tmp);
check_mysql_rc(rc, mysql);

if (strcmp(data.database, schema))
{
diag("Expected database: %s instead of %s", schema, data.database);
return FAIL;
}

rc= mysql_query(mysql, "SET NAMES latin1");
check_mysql_rc(rc, mysql);

if (strcmp(data.charset, "latin1"))
{
diag("Expected charset latin1 instead of %s", data.charset);
return FAIL;
}

mysql_close(mysql);
return OK;
}

struct my_tests_st my_tests[] = {
{"test_status_callback", test_status_callback, TEST_CONNECTION_NONE, 0, NULL, NULL},
{"test_conc365", test_conc365, TEST_CONNECTION_NONE, 0, NULL, NULL},
{"test_conc365_reconnect", test_conc365_reconnect, TEST_CONNECTION_DEFAULT, 0, NULL, NULL},
{"test_conn_str", test_conn_str, TEST_CONNECTION_NONE, 0, NULL, NULL},
Expand Down

0 comments on commit a8832af

Please sign in to comment.