Skip to content

Commit

Permalink
Fixed #2208.
Browse files Browse the repository at this point in the history
  • Loading branch information
twose committed Dec 17, 2018
1 parent 57fc849 commit 76179fa
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 48 deletions.
193 changes: 149 additions & 44 deletions swoole_mysql.c
Expand Up @@ -53,6 +53,7 @@ static zend_object_handlers swoole_mysql_exception_handlers;

#define UTF8_MB4 "utf8mb4"
#define UTF8_MB3 "utf8"
#define DATETIME_MAX_SIZE 20

typedef struct _mysql_charset
{
Expand All @@ -61,6 +62,17 @@ typedef struct _mysql_charset
const char *collation;
} mysql_charset;

typedef struct _mysql_big_data_info {
// for used:
ulong_t len; // data length
ulong_t remaining_size; // max remaining size that can be read
uint32_t currrent_packet_remaining_size; // remaining size of current packet
char *read_p; // where to start reading data
// for result:
uint32_t ext_header_len; // extra packet header length
uint64_t ext_packet_len; // extra packet length (body only)
} mysql_big_data_info;

static const mysql_charset swoole_mysql_charsets[] =
{
{ 1, "big5", "big5_chinese_ci" },
Expand Down Expand Up @@ -436,7 +448,7 @@ int mysql_get_charset(char *name)
int mysql_get_result(mysql_connector *connector, char *buf, int len)
{
char *tmp = buf;
int packet_length = mysql_uint3korr(tmp);
uint32_t packet_length = mysql_uint3korr(tmp);
if (len < packet_length + SW_MYSQL_PACKET_HEADER_SIZE)
{
return 0;
Expand Down Expand Up @@ -753,7 +765,7 @@ int mysql_auth_switch(mysql_connector *connector, char *buf, int len)

int next_state = SW_MYSQL_HANDSHAKE_WAIT_RESULT;

int packet_length = mysql_uint3korr(tmp);
uint32_t packet_length = mysql_uint3korr(tmp);
//continue to wait for data
if (len < packet_length + SW_MYSQL_PACKET_HEADER_SIZE)
{
Expand Down Expand Up @@ -811,7 +823,7 @@ int mysql_auth_switch(mysql_connector *connector, char *buf, int len)
int mysql_parse_auth_signature(swString *buffer, mysql_connector *connector)
{
char *tmp = buffer->str;
int packet_length = mysql_uint3korr(tmp);
uint32_t packet_length = mysql_uint3korr(tmp);
//continue to wait for data
if (buffer->length < packet_length + SW_MYSQL_PACKET_HEADER_SIZE)
{
Expand Down Expand Up @@ -861,7 +873,7 @@ int mysql_parse_rsa(mysql_connector *connector, char *buf, int len)

char *tmp = buf;

int packet_length = mysql_uint3korr(tmp);
uint32_t packet_length = mysql_uint3korr(tmp);
//continue to wait for data
if (len < packet_length + SW_MYSQL_PACKET_HEADER_SIZE)
{
Expand Down Expand Up @@ -991,25 +1003,62 @@ static int mysql_parse_prepare_result(mysql_client *client, char *buf, size_t n_
return SW_OK;
}

static int mysql_decode_row(mysql_client *client, char *buf, int packet_len)
static zend_string* mysql_decode_big_data(mysql_big_data_info *mbdi)
{
// through ext_packet_num to calc read_n += ?
mbdi->ext_header_len = SW_MYSQL_PACKET_HEADER_SIZE * (((mbdi->len - mbdi->currrent_packet_remaining_size) / SW_MYSQL_MAX_PACKET_BODY_SIZE) + 1);
if (mbdi->ext_header_len + mbdi->len > mbdi->remaining_size)
{
return NULL;
}
else
{
// optimization: allocate a complete piece of memory at once
zend_string* zstring = zend_string_alloc(mbdi->len, 0);;
size_t write_s = 0, write_n = 0;
char *read_p, *write_p;
read_p = mbdi->read_p;
write_p = ZSTR_VAL(zstring);
// copy the remaining data of the current package
write_s = mbdi->currrent_packet_remaining_size;
memcpy(write_p, read_p, write_s);
read_p += write_s;
write_p += write_s;
write_n += write_s;
while (write_n < mbdi->len) // copy the next... package
{
uint32_t _packet_len = mysql_uint3korr(read_p);
mbdi->ext_packet_len += _packet_len;
write_s = MIN(_packet_len, mbdi->len - write_n);
memcpy(write_p, read_p + SW_MYSQL_PACKET_HEADER_SIZE, write_s);
read_p += SW_MYSQL_PACKET_HEADER_SIZE + write_s;
write_p += write_s;
write_n += write_s;
}
ZSTR_VAL(zstring)[mbdi->len] = '\0';
SW_ASSERT(ZSTR_VAL(zstring) + mbdi->len == write_p);
return zstring;
}
}

static ssize_t mysql_decode_row(mysql_client *client, char *buf, uint64_t packet_len, size_t n_buf)
{
int read_n = 0, i;
int i;
int tmp_len;
ulong_t len;
char nul;

mysql_row row;
char value_buffer[32];
bzero(&row, sizeof(row));
char *error;
//unused
//char mem;

ssize_t read_n = 0;
zend_string *zstring = NULL;
zval *result_array = client->response.result_array;
zval *row_array = sw_malloc_zval();

bzero(&row, sizeof(row));
array_init(row_array);

swTraceLog(SW_TRACE_MYSQL_CLIENT, "mysql_decode_row begin, num_column=%ld, packet_len=%d.", client->response.num_column, packet_len);
swTraceLog(SW_TRACE_MYSQL_CLIENT, "mysql_decode_row begin, num_column=%ld, packet_len=%u.", client->response.num_column, packet_len);

mysql_field *field = NULL;

Expand All @@ -1022,26 +1071,34 @@ static int mysql_decode_row(mysql_client *client, char *buf, int packet_len)
read_n = -SW_MYSQL_ERR_BAD_LCB;
goto _error;
}

read_n += tmp_len;
if (read_n + len > packet_len)

// WARNING: data may be longer than single package (0x00fffff => 16M)
if (unlikely(len > packet_len - read_n))
{
swWarn("mysql response parse error: length over buffer, read_n=%d, len=%lu, packet_len=%u", read_n, len, packet_len);
read_n = -SW_MYSQL_ERR_LEN_OVER_BUFFER;
goto _error;
mysql_big_data_info mbdi = { len, n_buf - read_n, packet_len - read_n, buf + read_n, 0, 0 };
if ((zstring = mysql_decode_big_data(&mbdi)))
{
read_n += mbdi.ext_header_len;
packet_len += mbdi.ext_header_len + mbdi.ext_packet_len;
}
else
{
read_n = SW_AGAIN;
goto _error;
}
}

field = &client->response.columns[i];

swTraceLog(SW_TRACE_MYSQL_CLIENT, "n=%d, field_name=%s, name_length=%d", i, field->name, field->name_length);

if (nul == 1)
{
swTraceLog(SW_TRACE_MYSQL_CLIENT, "column#%d: name=%.*s, type=null", i, field->name_length, field->name);
add_assoc_null(row_array, field->name);
continue;
}

swTraceLog(SW_TRACE_MYSQL_CLIENT, "value: name=%s, type=%d, value=%s, len=%ld", field->name, field->type, swoole_strndup(buf + read_n, len), len);
swTraceLog(SW_TRACE_MYSQL_CLIENT, "column#%d: name=%.*s, type=%d, value=%.*s, len=%lu", i, field->name_length, field->name, field->type, (int) len, buf + read_n, len);

switch (field->type)
{
Expand All @@ -1067,7 +1124,17 @@ static int mysql_decode_row(mysql_client *client, char *buf, int packet_len)
case SW_MYSQL_TYPE_DATETIME:
case SW_MYSQL_TYPE_DATE:
case SW_MYSQL_TYPE_JSON:
add_assoc_stringl(row_array, field->name, buf + read_n, len);
if (unlikely(zstring))
{
zval _zdata, *zdata = &_zdata;
ZVAL_STR(zdata, zstring);
add_assoc_zval(row_array, field->name, zdata);
zstring = NULL;
}
else
{
add_assoc_stringl(row_array, field->name, buf + read_n, len);
}
break;
/* Integer */
case SW_MYSQL_TYPE_TINY:
Expand Down Expand Up @@ -1189,8 +1256,6 @@ static int mysql_decode_row(mysql_client *client, char *buf, int packet_len)
return read_n;
}

#define DATETIME_MAX_SIZE 20

static int mysql_decode_datetime(char *buf, char *result)
{
uint16_t y = 0;
Expand Down Expand Up @@ -1260,18 +1325,19 @@ static void mysql_decode_year(char *buf, char *result)
snprintf(result, DATETIME_MAX_SIZE, "%04d", y);
}

static int mysql_decode_row_prepare(mysql_client *client, char *buf, int packet_len)
static ssize_t mysql_decode_row_prepare(mysql_client *client, char *buf, uint64_t packet_len, size_t n_buf)
{
int read_n = 0, i;
int i;
int tmp_len;
ulong_t len = 0;
char nul;
ssize_t read_n = 0;

unsigned int null_count = ((client->response.num_column + 9) / 8) + 1;
buf += null_count;
packet_len -= null_count;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "null_count=%d", null_count);
swTraceLog(SW_TRACE_MYSQL_CLIENT, "null_count=%u", null_count);

char datetime_buffer[DATETIME_MAX_SIZE];
mysql_row row;
Expand All @@ -1294,7 +1360,7 @@ static int mysql_decode_row_prepare(mysql_client *client, char *buf, int packet_
continue;
}

swTraceLog(SW_TRACE_MYSQL_CLIENT, "value: name=%s, type=%d, len=%lu", field->name, field->type, field->length);
swTraceLog(SW_TRACE_MYSQL_CLIENT, "column#%d: name=%s, type=%d, size=%lu", i, field->name, field->type, field->length);

switch (field->type)
{
Expand Down Expand Up @@ -1350,8 +1416,30 @@ static int mysql_decode_row_prepare(mysql_client *client, char *buf, int packet_
goto _error;
}
read_n += tmp_len;
add_assoc_stringl(row_array, field->name, buf + read_n, len);
swTraceLog(SW_TRACE_MYSQL_CLIENT, "%s=%s", field->name, swoole_strndup(buf + read_n, len));
// WARNING: data may be longer than single package (0x00fffff => 16M)
if (unlikely(len > packet_len - read_n))
{
zend_string *zstring;
mysql_big_data_info mbdi = { len, n_buf - read_n, packet_len - read_n, buf + read_n, 0, 0 };
if ((zstring = mysql_decode_big_data(&mbdi)))
{
zval _zdata, *zdata = &_zdata;
ZVAL_STR(zdata, zstring);
add_assoc_zval(row_array, field->name, zdata);
read_n += mbdi.ext_header_len;
packet_len += mbdi.ext_header_len + mbdi.ext_packet_len;
}
else
{
read_n = SW_AGAIN;
goto _error;
}
}
else
{
add_assoc_stringl(row_array, field->name, buf + read_n, len);
}
swTraceLog(SW_TRACE_MYSQL_CLIENT, "len=%lu, %s=%.*s", len, field->name, (int) len, buf + read_n);
break;

/* Integer */
Expand Down Expand Up @@ -1600,7 +1688,7 @@ static sw_inline int mysql_read_params(mysql_client *client)
{
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *p = buffer->str + buffer->offset;
uint32_t n_buf = buffer->length - buffer->offset;
size_t n_buf = buffer->length - buffer->offset;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "n_buf=%d, length=%d.", n_buf, client->response.packet_length);

Expand Down Expand Up @@ -1633,14 +1721,19 @@ static sw_inline int mysql_read_params(mysql_client *client)
}
}

/**
* @var char* p => package beginning point
* @var size_t n_buf => remaining buffer length
* @var ssize_t read_n => already read buffer len
*/
static sw_inline int mysql_read_rows(mysql_client *client)
{
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *p = buffer->str + buffer->offset;
uint32_t n_buf = buffer->length - buffer->offset;
int ret;
size_t n_buf = buffer->length - buffer->offset;
ssize_t read_n = 0;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "n_buf=%d", n_buf);
swTraceLog(SW_TRACE_MYSQL_CLIENT, "n_buf=%ju", (uintmax_t) n_buf);

//RecordSet parse
while (n_buf > 0)
Expand Down Expand Up @@ -1670,35 +1763,46 @@ static sw_inline int mysql_read_rows(mysql_client *client)

swTraceLog(SW_TRACE_MYSQL_CLIENT, "record size=%d", client->response.packet_length);

// to packege body
p += SW_MYSQL_PACKET_HEADER_SIZE;
n_buf -= SW_MYSQL_PACKET_HEADER_SIZE;
buffer->offset += SW_MYSQL_PACKET_HEADER_SIZE;

if (client->cmd == SW_MYSQL_COM_STMT_EXECUTE)
{
// ProtocolBinary::ResultsetRow
swMysqlPacketDump(p, client->response.packet_length + SW_MYSQL_PACKET_HEADER_SIZE, "ProtocolBinary::ResultsetRow");

ret = mysql_decode_row_prepare(client, p + SW_MYSQL_PACKET_HEADER_SIZE, client->response.packet_length);
read_n = mysql_decode_row_prepare(client, p, client->response.packet_length, n_buf);
}
else
{
// ProtocolText::ResultsetRow
swMysqlPacketDump(p, client->response.packet_length + SW_MYSQL_PACKET_HEADER_SIZE, "ProtocolText::ResultsetRow");

ret = mysql_decode_row(client, p + SW_MYSQL_PACKET_HEADER_SIZE, client->response.packet_length);
read_n = mysql_decode_row(client, p, client->response.packet_length, n_buf);
}

if (ret < 0)
if (read_n < 0)
{
// TODO: handle all decode error here
if (read_n == SW_AGAIN)
{
swWarn("mysql packet is incomplete.");
}
mysql_columns_free(client);
break;
return read_n;
}

//next row
// next row
p += read_n;
n_buf -= read_n;
buffer->offset += read_n;
client->response.num_row++;
p += client->response.packet_length + SW_MYSQL_PACKET_HEADER_SIZE;
n_buf -= client->response.packet_length + SW_MYSQL_PACKET_HEADER_SIZE;
buffer->offset += client->response.packet_length + SW_MYSQL_PACKET_HEADER_SIZE;
}

return ret;
// missing eof or err packet
return SW_AGAIN;
}

static int mysql_decode_field(char *buf, int len, mysql_field *col)
Expand Down Expand Up @@ -1903,7 +2007,7 @@ static int mysql_read_columns(mysql_client *client)
{
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *p = buffer->str + buffer->offset;
uint32_t n_buf = buffer->length - buffer->offset;
size_t n_buf = buffer->length - buffer->offset;
int ret;

for (; client->response.index_column < client->response.num_column; client->response.index_column++)
Expand Down Expand Up @@ -1935,7 +2039,7 @@ static int mysql_read_columns(mysql_client *client)
else
{
swWarn("mysql_decode_field failed, code=%d.", ret);
break;
return ret < 0 ? ret : SW_ERR;
}
}

Expand All @@ -1950,6 +2054,7 @@ static int mysql_read_columns(mysql_client *client)

if (mysql_read_eof(client, p, n_buf) != SW_OK)
{
swWarn("unexpected mysql non-eof packet.");
return SW_ERR;
}

Expand Down

0 comments on commit 76179fa

Please sign in to comment.