From 48c1b2b2df0206e9b09a12a2a4af33b689d6f198 Mon Sep 17 00:00:00 2001 From: matyhtf Date: Tue, 2 Apr 2019 14:30:52 +0800 Subject: [PATCH] use serv->ipc_max_size --- examples/php/buf_size.php | 4 +- examples/server/fixed_header_client.php | 62 ++++++++++++----------- examples/server/fixed_header_server.php | 66 +++++-------------------- include/server.h | 2 + include/swoole.h | 8 ++- src/server/process.c | 54 +++++++++++++------- src/server/reactor_thread.c | 3 +- src/server/worker.cc | 19 +++++-- 8 files changed, 110 insertions(+), 108 deletions(-) diff --git a/examples/php/buf_size.php b/examples/php/buf_size.php index 83d2c53eceb..393deeffdaa 100644 --- a/examples/php/buf_size.php +++ b/examples/php/buf_size.php @@ -18,5 +18,5 @@ $ret_n = socket_write($socket, str_repeat('A', $n), $n); var_dump($ret_n); -$ret_n = socket_read($fds[1], $n); -var_dump($ret_n); \ No newline at end of file +$data = socket_read($fds[1], $n); +var_dump(strlen($data)); diff --git a/examples/server/fixed_header_client.php b/examples/server/fixed_header_client.php index ae0c427d508..c415a384b8c 100644 --- a/examples/server/fixed_header_client.php +++ b/examples/server/fixed_header_client.php @@ -1,48 +1,54 @@ connect('127.0.0.1', 9504)) -{ +$client->set(array( + 'open_length_check' => true, + 'package_max_length' => 8 * 1024 * 1024, + 'package_length_type' => 'N', //see php pack() + 'package_length_offset' => 0, + 'package_body_offset' => 4, +)); + +if (!$client->connect('127.0.0.1', 9504)) { exit("connect failed\n"); } -for ($l=0; $l < 1; $l++) -{ - $data = ''; - for($i=0; $i< 10; $i++) - { - $len = rand(10000, 20000); - echo "package length=".($len + 4)."\n"; - send_test3($client, $len); +$func = "send_test" . intval(empty($argv[1]) ? 3 : $argv[1]); + +for ($l = 0; $l < 1; $l++) { + $data = ''; + for ($i = 0; $i < 10; $i++) { + $len = rand(100000, 200000); + echo "send : " . ($len + 4) . "\n"; + $func($client, $len); } - //echo 'total send size:', strlen($data),"\n"; - //$client->send($data); sleep(1); } function send_test3($client, $len) { - $data = pack('n', $len + 4); - $data .= str_repeat('A', $len).rand(1000, 9999); - - $chunks = str_split($data, 4000); - - foreach($chunks as $ch) - { - $client->send($ch); - } - echo "package: ".substr($data, -4, 4)."\n"; + $data = pack('N', $len + 4); + $data .= str_repeat('A', $len) . rand(1000, 9999); + $chunks = str_split($data, 4000); + foreach ($chunks as $ch) { + $client->send($ch); + } + $data = $client->recv(); + echo "recv : " . strlen($data) . "\n"; } function send_test2($client, $len) { - $data = pack('n', $len + 4); - $data .= str_repeat('A', $len).rand(1000, 9999); - $client->send($data); + $data = pack('N', $len + 4); + $data .= str_repeat('A', $len) . rand(100000, 999999); + $client->send($data); + + $data = $client->recv(); } function send_test1($client, $len) { - $client->send(pack('n', $len + 4)); - usleep(10); - $client->send(str_repeat('A', $len).rand(1000, 9999)); + $client->send(pack('N', $len + 4)); + usleep(10); + $client->send(str_repeat('A', $len) . rand(1000, 9999)); + $data = $client->recv(); } diff --git a/examples/server/fixed_header_server.php b/examples/server/fixed_header_server.php index 3694ae53350..c7e61a4273a 100644 --- a/examples/server/fixed_header_server.php +++ b/examples/server/fixed_header_server.php @@ -10,77 +10,35 @@ class SocketServer function run($host, $port) { - register_shutdown_function(array($this, 'errorHandler')); $this->serv = new swoole_server($host, $port); $this->serv->set(array( - //'daemonize' => true, - 'max_request' => 2000, //reload worker by run xx times - 'dispatch_mode' => 3, //who come first who is - 'worker_num' => 5, //how much worker will start - 'reactor_num' => 2, // depend cpu how much cpu you have - 'backlog' => 128, //accept queue - 'open_cpu_affinity' => 1, //get cpu more time - 'open_tcp_nodelay' => 1, // for small packet to open - 'tcp_defer_accept' => 5, //client will accept when not have data - 'max_conn' => 10000, - 'task_worker_num' => 10, - 'task_ipc_mode' => 2, //use queue with "who come first who is" - 'message_queue_key' => 0x72000100, + 'enable_coroutine' => false, +// 'dispatch_mode' => 3, //who come first who is + 'worker_num' => 1, //how much worker will start 'open_length_check' => true, - 'package_max_length' => 999999999, + 'package_max_length' => 8 * 1024 * 1024, 'package_length_type' => 'N', //see php pack() 'package_length_offset' => 0, 'package_body_offset' => 4, - )); $this->serv->on('receive', array($this, 'onReceive')); - $this->serv->on('close', array($this, 'onClose')); - $this->serv->on('task', array($this, 'onTask')); - $this->serv->on('finish', array($this, 'onFinish')); $this->serv->start(); } - - function onReceive($serv, $fd, $from_id, $data) + function onReceive($serv, $fd, $tid, $data) { - $packet = json_decode(substr($data,4), true); - - //todo::包可能解析失败 - $packet["socketfd"] = $fd; - $task_id = $serv->task(json_encode($packet)); - //todo::任务可能下发失败 - } - - function onTask($serv, $task_id, $from_id, $data) - { - $data = json_decode($data, true); - $fd = $data["socketfd"]; - + echo "recv " . strlen($data) . " bytes\n"; + $packet = substr($data, 4); $result = array( "code" => "0", "msg" => "ok", - "data" => $data, + "data" => $packet, ); - $serv->send($fd, json_encode($result)); - } - - function onFinish($serv, $task_id, $data) - { - - } - - function onClose($serv, $fd) - { - - } - - function errorHandler() - { - //if (!empty($this->current_fd)) { - // $rsp = Proxy::shutdown_handler(); - // $rsp && $this->serv->send($this->current_fd, $rsp); - //} + $resp = json_encode($result); + $send_data = pack('N', strlen($resp)) . $resp; + echo "send " . strlen($send_data) . " bytes\n"; + $serv->send($fd, $send_data); } } diff --git a/include/server.h b/include/server.h index 5e1c4a615cd..4579f025c1f 100644 --- a/include/server.h +++ b/include/server.h @@ -481,6 +481,8 @@ struct _swServer uint32_t buffer_output_size; uint32_t buffer_input_size; + uint32_t ipc_max_size; + void *ptr2; void *private_data_3; diff --git a/include/swoole.h b/include/swoole.h index 3d4a6b7e525..d503369b0b3 100644 --- a/include/swoole.h +++ b/include/swoole.h @@ -848,7 +848,7 @@ enum _swEventData_flag typedef struct _swDataHead { int fd; - uint16_t len; + uint32_t len; int16_t from_id; uint8_t type; uint8_t flags; @@ -872,6 +872,12 @@ typedef struct _swEventData char data[SW_IPC_BUFFER_SIZE]; } swEventData; +typedef struct _swSendBuffer +{ + swDataHead info; + char data[0]; +} swSendBuffer; + typedef struct _swDgramPacket { swSocketAddress info; diff --git a/src/server/process.c b/src/server/process.c index 8963e365286..3e6efad790b 100644 --- a/src/server/process.c +++ b/src/server/process.c @@ -22,6 +22,7 @@ typedef struct _swFactoryProcess { swPipe *pipes; + swSendBuffer *buffer; } swFactoryProcess; static int swFactoryProcess_start(swFactory *factory); @@ -126,6 +127,23 @@ static int swFactoryProcess_start(swFactory *factory) swServer_store_pipe_fd(serv, serv->workers[i].pipe_object); } + int bufsize; + socklen_t _len = sizeof(bufsize); + /** + * Get the maximum ipc[unix socket with dgram] transmission length + */ + if (getsockopt(serv->workers[0].pipe_master, SOL_SOCKET, SO_SNDBUF, &bufsize, &_len) != 0) + { + bufsize = SW_IPC_MAX_SIZE; + } + // - dgram header [32 byte] + serv->ipc_max_size = bufsize - 32; + object->buffer = sw_malloc(serv->ipc_max_size); + if (object->buffer == NULL) + { + swError("malloc[sndbuf] failed. Error: %s [%d]", strerror(errno), errno); + return SW_ERR; + } /** * The manager process must be started first, otherwise it will have a thread fork */ @@ -155,6 +173,7 @@ static int swFactoryProcess_notify(swFactory *factory, swDataHead *ev) */ static int swFactoryProcess_dispatch(swFactory *factory, swSendData *task) { + swFactoryProcess *object = (swFactoryProcess *) factory->object; swServer *serv = (swServer *) factory->ptr; int fd = task->info.fd; @@ -215,41 +234,42 @@ static int swFactoryProcess_dispatch(swFactory *factory, swSendData *task) uint32_t send_n = task->length; uint32_t offset = 0; - swEventData buf; char *data = task->data; + swSendBuffer *buf = object->buffer; + uint32_t ipc_size = serv->ipc_max_size - sizeof(buf->info); - buf.info = task->info; + buf->info = task->info; - if (send_n <= SW_IPC_BUFFER_SIZE) + if (send_n <= ipc_size) { - buf.info.flags = 0; - buf.info.len = send_n; - memcpy(buf.data, data, buf.info.len); - return swReactorThread_send2worker(serv, worker, &buf, sizeof(buf.info) + buf.info.len); + buf->info.flags = 0; + buf->info.len = send_n; + memcpy(buf->data, data, buf->info.len); + return swReactorThread_send2worker(serv, worker, buf, sizeof(buf->info) + buf->info.len); } - buf.info.flags = SW_EVENT_DATA_CHUNK; + buf->info.flags = SW_EVENT_DATA_CHUNK; while (send_n > 0) { - if (send_n > SW_IPC_BUFFER_SIZE) + if (send_n > ipc_size) { - buf.info.len = SW_IPC_BUFFER_SIZE; + buf->info.len = ipc_size; } else { - buf.info.flags |= SW_EVENT_DATA_END; - buf.info.len = send_n; + buf->info.flags |= SW_EVENT_DATA_END; + buf->info.len = send_n; } - memcpy(buf.data, data + offset, buf.info.len); + memcpy(buf->data, data + offset, buf->info.len); - send_n -= buf.info.len; - offset += buf.info.len; + send_n -= buf->info.len; + offset += buf->info.len; - swTrace("dispatch, type=%d|len=%d", buf.info.type, buf.info.len); + swTrace("dispatch, type=%d|len=%d", buf->info.type, buf.info.len); - if (swReactorThread_send2worker(serv, worker, &buf, sizeof(buf.info) + buf.info.len) < 0) + if (swReactorThread_send2worker(serv, worker, &buf, sizeof(buf->info) + buf->info.len) < 0) { return SW_ERR; } diff --git a/src/server/reactor_thread.c b/src/server/reactor_thread.c index 0ada7dfa9f1..629fdce49ea 100644 --- a/src/server/reactor_thread.c +++ b/src/server/reactor_thread.c @@ -1134,8 +1134,7 @@ int swReactorThread_dispatch(swConnection *conn, char *data, uint32_t length) swServer *serv = SwooleG.serv; swSendData task; - task.info.len = 0; - task.info.flags = 0; + bzero(&task.info, sizeof(task.info)); task.info.from_fd = conn->from_fd; task.info.from_id = conn->from_id; task.info.type = SW_EVENT_TCP; diff --git a/src/server/worker.cc b/src/server/worker.cc index a3ecd8e3fb8..b37049577e8 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -29,6 +29,8 @@ static int swWorker_onStreamRead(swReactor *reactor, swEvent *event); static int swWorker_onStreamPackage(swConnection *conn, char *data, uint32_t length); static int swWorker_onStreamClose(swReactor *reactor, swEvent *event); +static swSendBuffer *g_buffer = nullptr; + void swWorker_free(swWorker *worker) { if (worker->send_shm) @@ -716,6 +718,16 @@ int swWorker_loop(swServer *serv, int worker_id) serv->stream_protocol.package_max_length = INT_MAX; serv->stream_protocol.onPackage = swWorker_onStreamPackage; serv->buffer_pool = swLinkedList_new(0, NULL); + if (serv->buffer_pool == nullptr) + { + return SW_ERR; + } + } + + g_buffer = (swSendBuffer *) sw_malloc(serv->ipc_max_size); + if (g_buffer == nullptr) + { + return SW_ERR; } swWorker_onStart(serv); @@ -753,21 +765,20 @@ int swWorker_send2reactor(swServer *serv, swEventData *ev_data, size_t sendn, in */ static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event) { - swEventData task; swServer *serv = (swServer *) reactor->ptr; swFactory *factory = &serv->factory; int ret; read_from_pipe: - if (read(event->fd, &task, sizeof(task)) > 0) + if (read(event->fd, g_buffer, serv->ipc_max_size) > 0) { - ret = swWorker_onTask(factory, &task); + ret = swWorker_onTask(factory, (swEventData *) g_buffer); #ifndef SW_WORKER_RECV_AGAIN /** * Big package */ - if (task.info.flags & SW_EVENT_DATA_CHUNK) + if (g_buffer->info.flags & SW_EVENT_DATA_CHUNK) #endif { //no data