Skip to content

Commit

Permalink
Merge pull request #1688 from twose/patch_mysql_procedure
Browse files Browse the repository at this point in the history
Mysql procedure
  • Loading branch information
matyhtf committed Jun 5, 2018
2 parents 3bb6e1b + cf6f439 commit bd4681f
Show file tree
Hide file tree
Showing 9 changed files with 803 additions and 177 deletions.
192 changes: 155 additions & 37 deletions swoole_mysql.c
Expand Up @@ -666,6 +666,8 @@ static int mysql_parse_prepare_result(mysql_client *client, char *buf, size_t n_
//skip 1 byte
buf += 1;
stmt->warning_count = mysql_uint2korr(buf);
stmt->result = NULL;
stmt->buffer = NULL;
client->statement = stmt;
stmt->client = client;

Expand Down Expand Up @@ -1086,6 +1088,7 @@ static sw_inline int mysql_read_eof(mysql_client *client, char *buffer, int n_bu

client->response.warnings = mysql_uint2korr(buffer + 5);
client->response.status_code = mysql_uint2korr(buffer + 7);
MYSQL_RESPONSE_BUFFER->offset += client->response.packet_length + 4;

return SW_OK;
}
Expand All @@ -1094,8 +1097,9 @@ static sw_inline int mysql_read_params(mysql_client *client)
{
while (1)
{
char *buffer = client->buffer->str + client->buffer->offset;
uint32_t n_buf = client->buffer->length - client->buffer->offset;
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *t_buffer = buffer->str + buffer->offset;
uint32_t n_buf = buffer->length - buffer->offset;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "n_buf=%d, length=%d.", n_buf, client->response.packet_length);

Expand All @@ -1115,9 +1119,9 @@ static sw_inline int mysql_read_params(mysql_client *client)
}
// Read and ignore parameter field. Sentence from MySQL source:
// skip parameters data: we don't support it yet
client->response.packet_length = mysql_uint3korr(buffer);
client->response.packet_number = buffer[3];
client->buffer->offset += (client->response.packet_length + 4);
client->response.packet_length = mysql_uint3korr(t_buffer);
client->response.packet_number = t_buffer[3];
buffer->offset += (client->response.packet_length + 4);
client->statement->unreaded_param_count--;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "read param, count=%d.", client->statement->unreaded_param_count);
Expand All @@ -1128,9 +1132,8 @@ static sw_inline int mysql_read_params(mysql_client *client)
{
swTraceLog(SW_TRACE_MYSQL_CLIENT, "read eof [2]");

if (mysql_read_eof(client, buffer, n_buf) == 0)
if (mysql_read_eof(client, t_buffer, n_buf) == 0)
{
client->buffer->offset += 9;
return SW_OK;
}
else
Expand All @@ -1143,8 +1146,9 @@ static sw_inline int mysql_read_params(mysql_client *client)

static sw_inline int mysql_read_rows(mysql_client *client)
{
char *buffer = client->buffer->str + client->buffer->offset;
uint32_t n_buf = client->buffer->length - client->buffer->offset;
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *t_buffer = buffer->str + buffer->offset;
uint32_t n_buf = buffer->length - buffer->offset;
int ret;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "n_buf=%d", n_buf);
Expand All @@ -1158,24 +1162,18 @@ static sw_inline int mysql_read_rows(mysql_client *client)
return SW_ERR;
}
//RecordSet end
else if (mysql_read_eof(client, buffer, n_buf) == 0 && (n_buf == 9 || (n_buf > 9 && (client->response.status_code & SW_MYSQL_SERVER_MORE_RESULTS_EXISTS))))
else if (mysql_read_eof(client, t_buffer, n_buf) == SW_OK)
{
if (n_buf > 9)
{
// buffer may has multi responses
// we can't solve it in execute function so we return
swTraceLog(SW_TRACE_MYSQL_CLIENT, "remaining %d, more results exists", n_buf - 9);
}
if (client->response.columns)
{
mysql_columns_free(client);
}
return SW_OK;
}

client->response.packet_length = mysql_uint3korr(buffer);
client->response.packet_number = buffer[3];
buffer += 4;
client->response.packet_length = mysql_uint3korr(t_buffer);
client->response.packet_number = t_buffer[3];
t_buffer += 4;
n_buf -= 4;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "record size=%d", client->response.packet_length);
Expand All @@ -1189,12 +1187,12 @@ static sw_inline int mysql_read_rows(mysql_client *client)

if (client->cmd == SW_MYSQL_COM_STMT_EXECUTE)
{
ret = mysql_decode_row_prepare(client, buffer, client->response.packet_length);
ret = mysql_decode_row_prepare(client, t_buffer, client->response.packet_length);
}
else
{
//decode
ret = mysql_decode_row(client, buffer, client->response.packet_length);
ret = mysql_decode_row(client, t_buffer, client->response.packet_length);
}

if (ret < 0)
Expand All @@ -1204,9 +1202,9 @@ static sw_inline int mysql_read_rows(mysql_client *client)

//next row
client->response.num_row++;
buffer += client->response.packet_length;
t_buffer += client->response.packet_length;
n_buf -= client->response.packet_length;
client->buffer->offset += client->response.packet_length + 4;
buffer->offset += client->response.packet_length + 4;
}

return SW_ERR;
Expand Down Expand Up @@ -1412,8 +1410,9 @@ static int mysql_decode_field(char *buf, int len, mysql_field *col)

static int mysql_read_columns(mysql_client *client)
{
char *buffer = client->buffer->str + client->buffer->offset;
uint32_t n_buf = client->buffer->length - client->buffer->offset;
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *t_buffer = buffer->str + buffer->offset;
uint32_t n_buf = buffer->length - buffer->offset;
int ret;

for (; client->response.index_column < client->response.num_column; client->response.index_column++)
Expand All @@ -1425,24 +1424,24 @@ static int mysql_read_columns(mysql_client *client)
return SW_ERR;
}

client->response.packet_length = mysql_uint3korr(buffer);
client->response.packet_length = mysql_uint3korr(t_buffer);

//no enough data
if (n_buf - 4 < client->response.packet_length)
{
return SW_ERR;
}

client->response.packet_number = buffer[3];
buffer += 4;
client->response.packet_number = t_buffer[3];
t_buffer += 4;
n_buf -= 4;

ret = mysql_decode_field(buffer, client->response.packet_length, &client->response.columns[client->response.index_column]);
ret = mysql_decode_field(t_buffer, client->response.packet_length, &client->response.columns[client->response.index_column]);
if (ret > 0)
{
buffer += client->response.packet_length;
t_buffer += client->response.packet_length;
n_buf -= client->response.packet_length;
client->buffer->offset += (client->response.packet_length + 4);
buffer->offset += (client->response.packet_length + 4);
}
else
{
Expand All @@ -1451,12 +1450,12 @@ static int mysql_read_columns(mysql_client *client)
}
}

if (mysql_read_eof(client, buffer, n_buf) < 0)
if (mysql_read_eof(client, t_buffer, n_buf) < 0)
{
return SW_ERR;
}

buffer += 9;
t_buffer += 9;
n_buf -= 9;

if (client->cmd != SW_MYSQL_COM_STMT_PREPARE)
Expand All @@ -1470,20 +1469,119 @@ static int mysql_read_columns(mysql_client *client)
}
}

client->buffer->offset += buffer - (client->buffer->str + client->buffer->offset);
buffer->offset += t_buffer - (buffer->str + buffer->offset);

return SW_OK;
}

// this function is used to check if multi responses has received over.
int mysql_is_over(mysql_client *client)
{
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *p;
if (client->check_offset == buffer->length)
{
// have already check all of the data
goto again;
}
size_t n_buf = buffer->length - client->check_offset; // remaining buffer size
uint32_t temp;

while (1)
{
p = buffer->str + client->check_offset; // where to start checking now
if (unlikely(buffer->length - buffer->offset < 5))
{
break;
}
temp = mysql_uint3korr(p); //package length
// add header
p += 4;
n_buf -= 4;
if (unlikely(n_buf < temp)) //package is incomplete
{
break;
}
else
{
client->check_offset += 4;
}

client->check_offset += temp; // add package length

if (client->check_offset >= buffer->length) // if false: more packages exist, skip the current one
{
switch ((uint8_t) p[0])
{
case 0xfe: // eof
{
// +type +warning
p += 3;
swDebug("meet eof and flag=%d", mysql_uint2korr(p));
goto check_flag;
}
case 0x00: // ok
{

// if (temp < 7)
// {
// break;
// }
ulong_t val = 0;
char nul;
int retcode;
int t_nbuf = n_buf;

//+type
p++;
t_nbuf--;

retcode = mysql_lcb_ll(p, &val, &nul, t_nbuf); //affecr rows
t_nbuf -= retcode;
p += retcode;

retcode = mysql_lcb_ll(p, &val, &nul, t_nbuf); //insert id
t_nbuf -= retcode;
p += retcode;

check_flag:
if ((mysql_uint2korr(p) & SW_MYSQL_SERVER_MORE_RESULTS_EXISTS) == 0)
{
over:
client->response.wait_recv = 0;
client->check_offset = 0;
return SW_OK;
}
break;
}
case 0xff: // response type = error
{
goto over;
}
}
}

n_buf -= temp;
if (n_buf == 0)
{
break;
}
}

again:
client->response.wait_recv = 2;
return SW_AGAIN;
}


int mysql_response(mysql_client *client)
{
swString *buffer = client->buffer;
swString *buffer = MYSQL_RESPONSE_BUFFER;

char *p = buffer->str + buffer->offset;
int ret;
char nul;
int n_buf = buffer->length - buffer->offset;
size_t n_buf = buffer->length - buffer->offset;

while (n_buf > 0)
{
Expand Down Expand Up @@ -1597,7 +1695,7 @@ int mysql_response(mysql_client *client)
{
return SW_ERR;
}
client->buffer->offset += (4 + ret);
buffer->offset += (4 + ret);
client->response.columns = ecalloc(client->response.num_column, sizeof(mysql_field));
client->state = SW_MYSQL_STATE_READ_FIELD;
break;
Expand Down Expand Up @@ -1884,6 +1982,26 @@ static PHP_METHOD(swoole_mysql, connect)
connector->strict_type = 0;
}

if (php_swoole_array_get_value(_ht, "fetch_mode", value))
{
#if PHP_MAJOR_VERSION < 7
if(Z_TYPE_P(value) == IS_BOOL && Z_BVAL_P(value) == 1)
#else
if (Z_TYPE_P(value) == IS_TRUE)
#endif
{
connector->fetch_mode = 1;
}
else
{
connector->fetch_mode = 0;
}
}
else
{
connector->fetch_mode = 0;
}

swClient *cli = emalloc(sizeof(swClient));
int type = SW_SOCK_TCP;

Expand Down
10 changes: 7 additions & 3 deletions swoole_mysql.h
Expand Up @@ -19,7 +19,6 @@
#ifndef SWOOLE_MYSQL_H_
#define SWOOLE_MYSQL_H_

//#define SW_MYSQL_STRICT_TYPE
//#define SW_MYSQL_DEBUG

enum mysql_command
Expand Down Expand Up @@ -225,6 +224,7 @@ typedef struct
char *password;
char *database;
zend_bool strict_type;
zend_bool fetch_mode;

zend_size_t host_len;
zend_size_t user_len;
Expand Down Expand Up @@ -296,6 +296,8 @@ typedef struct
uint16_t unreaded_param_count;
struct _mysql_client *client;
zval *object;
swString *buffer; /* save the mysql multi responses data */
zval *result; /* save the zval array result */
} mysql_statement;

typedef struct
Expand Down Expand Up @@ -339,7 +341,6 @@ typedef struct _mysql_client
int fd;
uint32_t transaction :1;
uint32_t connected :1;
uint32_t strict;

mysql_connector connector;
mysql_statement *statement;
Expand All @@ -352,8 +353,8 @@ typedef struct _mysql_client
zval _onClose;
#endif

off_t check_offset;
mysql_response_t response; /* single response */
swLinkedList *response_list; /* multi responses (in fetch mode) */

} mysql_client;

Expand Down Expand Up @@ -409,12 +410,15 @@ typedef struct _mysql_client
mysql_int4store((T),def_temp); \
mysql_int4store((T+4),def_temp2); } while (0)

#define MYSQL_RESPONSE_BUFFER (client->cmd == SW_MYSQL_COM_STMT_EXECUTE ? client->statement->buffer : client->buffer)

int mysql_get_result(mysql_connector *connector, char *buf, int len);
int mysql_get_charset(char *name);
int mysql_handshake(mysql_connector *connector, char *buf, int len);
int mysql_request(swString *sql, swString *buffer);
int mysql_prepare(swString *sql, swString *buffer);
int mysql_response(mysql_client *client);
int mysql_is_over(mysql_client *client);

#ifdef SW_MYSQL_DEBUG
void mysql_client_info(mysql_client *client);
Expand Down

0 comments on commit bd4681f

Please sign in to comment.