Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mysql procedure #1688

Merged
merged 26 commits into from Jun 5, 2018
Merged
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9f970aa
Remove useless code.
twose Jun 2, 2018
353fb3a
Add fetch_mode option and format.
twose Jun 2, 2018
def0d15
Add fetchAll method and swoole_mysql_coro_statement_free.
twose Jun 2, 2018
1676c60
Add MySQL fetch mode unit test.
twose Jun 2, 2018
316f5ee
Fix.
twose Jun 2, 2018
6ce4c37
Fix zval free.
twose Jun 3, 2018
b5d502c
Add twice tests and use config constant.
twose Jun 3, 2018
5771403
Add fetch method.
twose Jun 3, 2018
01f7c0f
Add single fetch test.
twose Jun 3, 2018
b67a5c6
Add without fetch test.
twose Jun 3, 2018
a5b9ba0
Add mysql_is_over, fix the fetchAll return value.
twose Jun 4, 2018
87a4938
Now we can recv all the responses data.
twose Jun 4, 2018
c560525
Add MYSQL_RESPONSE_BUFFER to separate client and statement buffer.
twose Jun 4, 2018
297c2f2
separate parse_response to swoole_mysql_coro_parse_response.
twose Jun 4, 2018
7f5e6ff
Fix the offset error.
twose Jun 4, 2018
6034d06
Add nextResult and it works successful!
twose Jun 4, 2018
939a5f2
Fix the is over check bug due to the MySQL protocol problem.
twose Jun 5, 2018
a66bdb0
Some patches to fine down the mysql coro: Always check the package co…
twose Jun 5, 2018
3dbb5ba
Tidy up the code.
twose Jun 5, 2018
b8db6e4
Fix the response type check, clear cache attribute totally.
twose Jun 5, 2018
38a139d
Add unit test: procedure in fetch mode
twose Jun 5, 2018
9031ced
Return false when we are not in fetch mode.
twose Jun 5, 2018
c2f356c
Without MySQL client connection, we can also parse the new response i…
twose Jun 5, 2018
58a4d1d
Support procedure without fetch mode.
twose Jun 5, 2018
13ff4ff
Add procedure unit test (without fetch mode).
twose Jun 5, 2018
cf6f439
It's a unsigned int value.
twose Jun 5, 2018
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+803 −177
Diff settings

Always

Just for now

Copy path View file
@@ -666,6 +666,8 @@ static int mysql_parse_prepare_result(mysql_client *client, char *buf, size_t n_
//skip 1 byte
buf += 1;
stmt->warning_count = mysql_uint2korr(buf);
stmt->result = NULL;
stmt->buffer = NULL;
client->statement = stmt;
stmt->client = client;

@@ -1086,6 +1088,7 @@ static sw_inline int mysql_read_eof(mysql_client *client, char *buffer, int n_bu

client->response.warnings = mysql_uint2korr(buffer + 5);
client->response.status_code = mysql_uint2korr(buffer + 7);
MYSQL_RESPONSE_BUFFER->offset += client->response.packet_length + 4;

return SW_OK;
}
@@ -1094,8 +1097,9 @@ static sw_inline int mysql_read_params(mysql_client *client)
{
while (1)
{
char *buffer = client->buffer->str + client->buffer->offset;
uint32_t n_buf = client->buffer->length - client->buffer->offset;
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *t_buffer = buffer->str + buffer->offset;
uint32_t n_buf = buffer->length - buffer->offset;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "n_buf=%d, length=%d.", n_buf, client->response.packet_length);

@@ -1115,9 +1119,9 @@ static sw_inline int mysql_read_params(mysql_client *client)
}
// Read and ignore parameter field. Sentence from MySQL source:
// skip parameters data: we don't support it yet
client->response.packet_length = mysql_uint3korr(buffer);
client->response.packet_number = buffer[3];
client->buffer->offset += (client->response.packet_length + 4);
client->response.packet_length = mysql_uint3korr(t_buffer);
client->response.packet_number = t_buffer[3];
buffer->offset += (client->response.packet_length + 4);
client->statement->unreaded_param_count--;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "read param, count=%d.", client->statement->unreaded_param_count);
@@ -1128,9 +1132,8 @@ static sw_inline int mysql_read_params(mysql_client *client)
{
swTraceLog(SW_TRACE_MYSQL_CLIENT, "read eof [2]");

if (mysql_read_eof(client, buffer, n_buf) == 0)
if (mysql_read_eof(client, t_buffer, n_buf) == 0)
{
client->buffer->offset += 9;
return SW_OK;
}
else
@@ -1143,8 +1146,9 @@ static sw_inline int mysql_read_params(mysql_client *client)

static sw_inline int mysql_read_rows(mysql_client *client)
{
char *buffer = client->buffer->str + client->buffer->offset;
uint32_t n_buf = client->buffer->length - client->buffer->offset;
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *t_buffer = buffer->str + buffer->offset;
uint32_t n_buf = buffer->length - buffer->offset;
int ret;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "n_buf=%d", n_buf);
@@ -1158,24 +1162,18 @@ static sw_inline int mysql_read_rows(mysql_client *client)
return SW_ERR;
}
//RecordSet end
else if (mysql_read_eof(client, buffer, n_buf) == 0 && (n_buf == 9 || (n_buf > 9 && (client->response.status_code & SW_MYSQL_SERVER_MORE_RESULTS_EXISTS))))
else if (mysql_read_eof(client, t_buffer, n_buf) == SW_OK)
{
if (n_buf > 9)
{
// buffer may has multi responses
// we can't solve it in execute function so we return
swTraceLog(SW_TRACE_MYSQL_CLIENT, "remaining %d, more results exists", n_buf - 9);
}
if (client->response.columns)
{
mysql_columns_free(client);
}
return SW_OK;
}

client->response.packet_length = mysql_uint3korr(buffer);
client->response.packet_number = buffer[3];
buffer += 4;
client->response.packet_length = mysql_uint3korr(t_buffer);
client->response.packet_number = t_buffer[3];
t_buffer += 4;
n_buf -= 4;

swTraceLog(SW_TRACE_MYSQL_CLIENT, "record size=%d", client->response.packet_length);
@@ -1189,12 +1187,12 @@ static sw_inline int mysql_read_rows(mysql_client *client)

if (client->cmd == SW_MYSQL_COM_STMT_EXECUTE)
{
ret = mysql_decode_row_prepare(client, buffer, client->response.packet_length);
ret = mysql_decode_row_prepare(client, t_buffer, client->response.packet_length);
}
else
{
//decode
ret = mysql_decode_row(client, buffer, client->response.packet_length);
ret = mysql_decode_row(client, t_buffer, client->response.packet_length);
}

if (ret < 0)
@@ -1204,9 +1202,9 @@ static sw_inline int mysql_read_rows(mysql_client *client)

//next row
client->response.num_row++;
buffer += client->response.packet_length;
t_buffer += client->response.packet_length;
n_buf -= client->response.packet_length;
client->buffer->offset += client->response.packet_length + 4;
buffer->offset += client->response.packet_length + 4;
}

return SW_ERR;
@@ -1412,8 +1410,9 @@ static int mysql_decode_field(char *buf, int len, mysql_field *col)

static int mysql_read_columns(mysql_client *client)
{
char *buffer = client->buffer->str + client->buffer->offset;
uint32_t n_buf = client->buffer->length - client->buffer->offset;
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *t_buffer = buffer->str + buffer->offset;
uint32_t n_buf = buffer->length - buffer->offset;
int ret;

for (; client->response.index_column < client->response.num_column; client->response.index_column++)
@@ -1425,24 +1424,24 @@ static int mysql_read_columns(mysql_client *client)
return SW_ERR;
}

client->response.packet_length = mysql_uint3korr(buffer);
client->response.packet_length = mysql_uint3korr(t_buffer);

//no enough data
if (n_buf - 4 < client->response.packet_length)
{
return SW_ERR;
}

client->response.packet_number = buffer[3];
buffer += 4;
client->response.packet_number = t_buffer[3];
t_buffer += 4;
n_buf -= 4;

ret = mysql_decode_field(buffer, client->response.packet_length, &client->response.columns[client->response.index_column]);
ret = mysql_decode_field(t_buffer, client->response.packet_length, &client->response.columns[client->response.index_column]);
if (ret > 0)
{
buffer += client->response.packet_length;
t_buffer += client->response.packet_length;
n_buf -= client->response.packet_length;
client->buffer->offset += (client->response.packet_length + 4);
buffer->offset += (client->response.packet_length + 4);
}
else
{
@@ -1451,12 +1450,12 @@ static int mysql_read_columns(mysql_client *client)
}
}

if (mysql_read_eof(client, buffer, n_buf) < 0)
if (mysql_read_eof(client, t_buffer, n_buf) < 0)
{
return SW_ERR;
}

buffer += 9;
t_buffer += 9;
n_buf -= 9;

if (client->cmd != SW_MYSQL_COM_STMT_PREPARE)
@@ -1470,20 +1469,119 @@ static int mysql_read_columns(mysql_client *client)
}
}

client->buffer->offset += buffer - (client->buffer->str + client->buffer->offset);
buffer->offset += t_buffer - (buffer->str + buffer->offset);

return SW_OK;
}

// this function is used to check if multi responses has received over.
int mysql_is_over(mysql_client *client)
{
swString *buffer = MYSQL_RESPONSE_BUFFER;
char *p;
if (client->check_offset == buffer->length)
{
// have already check all of the data
goto again;
}
size_t n_buf = buffer->length - client->check_offset; // remaining buffer size
uint32_t temp;

while (1)
{
p = buffer->str + client->check_offset; // where to start checking now
if (unlikely(buffer->length - buffer->offset < 5))
{
break;
}
temp = mysql_uint3korr(p); //package length
// add header
p += 4;
n_buf -= 4;
if (unlikely(n_buf < temp)) //package is incomplete
{
break;
}
else
{
client->check_offset += 4;
}

client->check_offset += temp; // add package length

if (client->check_offset >= buffer->length) // if false: more packages exist, skip the current one
{
switch ((uint8_t) p[0])
{
case 0xfe: // eof
{
// +type +warning
p += 3;
swDebug("meet eof and flag=%d", mysql_uint2korr(p));
goto check_flag;
}
case 0x00: // ok
{

// if (temp < 7)
// {
// break;
// }
ulong_t val = 0;
char nul;
int retcode;
int t_nbuf = n_buf;

//+type
p++;
t_nbuf--;

retcode = mysql_lcb_ll(p, &val, &nul, t_nbuf); //affecr rows
t_nbuf -= retcode;
p += retcode;

retcode = mysql_lcb_ll(p, &val, &nul, t_nbuf); //insert id
t_nbuf -= retcode;
p += retcode;

check_flag:
if ((mysql_uint2korr(p) & SW_MYSQL_SERVER_MORE_RESULTS_EXISTS) == 0)
{
over:
client->response.wait_recv = 0;
client->check_offset = 0;
return SW_OK;
}
break;
}
case 0xff: // response type = error
{
goto over;
}
}
}

n_buf -= temp;
if (n_buf == 0)
{
break;
}
}

again:
client->response.wait_recv = 2;
return SW_AGAIN;
}


int mysql_response(mysql_client *client)
{
swString *buffer = client->buffer;
swString *buffer = MYSQL_RESPONSE_BUFFER;

char *p = buffer->str + buffer->offset;
int ret;
char nul;
int n_buf = buffer->length - buffer->offset;
size_t n_buf = buffer->length - buffer->offset;

while (n_buf > 0)
{
@@ -1597,7 +1695,7 @@ int mysql_response(mysql_client *client)
{
return SW_ERR;
}
client->buffer->offset += (4 + ret);
buffer->offset += (4 + ret);
client->response.columns = ecalloc(client->response.num_column, sizeof(mysql_field));
client->state = SW_MYSQL_STATE_READ_FIELD;
break;
@@ -1884,6 +1982,26 @@ static PHP_METHOD(swoole_mysql, connect)
connector->strict_type = 0;
}

if (php_swoole_array_get_value(_ht, "fetch_mode", value))
{
#if PHP_MAJOR_VERSION < 7
if(Z_TYPE_P(value) == IS_BOOL && Z_BVAL_P(value) == 1)
#else
if (Z_TYPE_P(value) == IS_TRUE)
#endif
{
connector->fetch_mode = 1;
}
else
{
connector->fetch_mode = 0;
}
}
else
{
connector->fetch_mode = 0;
}

swClient *cli = emalloc(sizeof(swClient));
int type = SW_SOCK_TCP;

Copy path View file
@@ -19,7 +19,6 @@
#ifndef SWOOLE_MYSQL_H_
#define SWOOLE_MYSQL_H_

//#define SW_MYSQL_STRICT_TYPE
//#define SW_MYSQL_DEBUG

enum mysql_command
@@ -225,6 +224,7 @@ typedef struct
char *password;
char *database;
zend_bool strict_type;
zend_bool fetch_mode;

zend_size_t host_len;
zend_size_t user_len;
@@ -296,6 +296,8 @@ typedef struct
uint16_t unreaded_param_count;
struct _mysql_client *client;
zval *object;
swString *buffer; /* save the mysql multi responses data */
zval *result; /* save the zval array result */
} mysql_statement;

typedef struct
@@ -339,7 +341,6 @@ typedef struct _mysql_client
int fd;
uint32_t transaction :1;
uint32_t connected :1;
uint32_t strict;

mysql_connector connector;
mysql_statement *statement;
@@ -352,8 +353,8 @@ typedef struct _mysql_client
zval _onClose;
#endif

off_t check_offset;
mysql_response_t response; /* single response */
swLinkedList *response_list; /* multi responses (in fetch mode) */

} mysql_client;

@@ -409,12 +410,15 @@ typedef struct _mysql_client
mysql_int4store((T),def_temp); \
mysql_int4store((T+4),def_temp2); } while (0)

#define MYSQL_RESPONSE_BUFFER (client->cmd == SW_MYSQL_COM_STMT_EXECUTE ? client->statement->buffer : client->buffer)

int mysql_get_result(mysql_connector *connector, char *buf, int len);
int mysql_get_charset(char *name);
int mysql_handshake(mysql_connector *connector, char *buf, int len);
int mysql_request(swString *sql, swString *buffer);
int mysql_prepare(swString *sql, swString *buffer);
int mysql_response(mysql_client *client);
int mysql_is_over(mysql_client *client);

#ifdef SW_MYSQL_DEBUG
void mysql_client_info(mysql_client *client);
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.