Skip to content

Commit

Permalink
use serv->ipc_max_size
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Apr 2, 2019
1 parent 0c5f9ba commit 48c1b2b
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 108 deletions.
4 changes: 2 additions & 2 deletions examples/php/buf_size.php
Expand Up @@ -18,5 +18,5 @@
$ret_n = socket_write($socket, str_repeat('A', $n), $n);
var_dump($ret_n);

$ret_n = socket_read($fds[1], $n);
var_dump($ret_n);
$data = socket_read($fds[1], $n);
var_dump(strlen($data));
62 changes: 34 additions & 28 deletions examples/server/fixed_header_client.php
@@ -1,48 +1,54 @@
<?php
$client = new swoole_client(SWOOLE_SOCK_TCP);
if(!$client->connect('127.0.0.1', 9504))
{
$client->set(array(
'open_length_check' => true,
'package_max_length' => 8 * 1024 * 1024,
'package_length_type' => 'N', //see php pack()
'package_length_offset' => 0,
'package_body_offset' => 4,
));

if (!$client->connect('127.0.0.1', 9504)) {
exit("connect failed\n");
}

for ($l=0; $l < 1; $l++)
{
$data = '';
for($i=0; $i< 10; $i++)
{
$len = rand(10000, 20000);
echo "package length=".($len + 4)."\n";
send_test3($client, $len);
$func = "send_test" . intval(empty($argv[1]) ? 3 : $argv[1]);

for ($l = 0; $l < 1; $l++) {
$data = '';
for ($i = 0; $i < 10; $i++) {
$len = rand(100000, 200000);
echo "send : " . ($len + 4) . "\n";
$func($client, $len);
}
//echo 'total send size:', strlen($data),"\n";
//$client->send($data);
sleep(1);
}

function send_test3($client, $len)
{
$data = pack('n', $len + 4);
$data .= str_repeat('A', $len).rand(1000, 9999);

$chunks = str_split($data, 4000);

foreach($chunks as $ch)
{
$client->send($ch);
}
echo "package: ".substr($data, -4, 4)."\n";
$data = pack('N', $len + 4);
$data .= str_repeat('A', $len) . rand(1000, 9999);
$chunks = str_split($data, 4000);
foreach ($chunks as $ch) {
$client->send($ch);
}
$data = $client->recv();
echo "recv : " . strlen($data) . "\n";
}

function send_test2($client, $len)
{
$data = pack('n', $len + 4);
$data .= str_repeat('A', $len).rand(1000, 9999);
$client->send($data);
$data = pack('N', $len + 4);
$data .= str_repeat('A', $len) . rand(100000, 999999);
$client->send($data);

$data = $client->recv();
}

function send_test1($client, $len)
{
$client->send(pack('n', $len + 4));
usleep(10);
$client->send(str_repeat('A', $len).rand(1000, 9999));
$client->send(pack('N', $len + 4));
usleep(10);
$client->send(str_repeat('A', $len) . rand(1000, 9999));
$data = $client->recv();
}
66 changes: 12 additions & 54 deletions examples/server/fixed_header_server.php
Expand Up @@ -10,77 +10,35 @@ class SocketServer

function run($host, $port)
{
register_shutdown_function(array($this, 'errorHandler'));
$this->serv = new swoole_server($host, $port);

$this->serv->set(array(
//'daemonize' => true,
'max_request' => 2000, //reload worker by run xx times
'dispatch_mode' => 3, //who come first who is
'worker_num' => 5, //how much worker will start
'reactor_num' => 2, // depend cpu how much cpu you have
'backlog' => 128, //accept queue
'open_cpu_affinity' => 1, //get cpu more time
'open_tcp_nodelay' => 1, // for small packet to open
'tcp_defer_accept' => 5, //client will accept when not have data
'max_conn' => 10000,
'task_worker_num' => 10,
'task_ipc_mode' => 2, //use queue with "who come first who is"
'message_queue_key' => 0x72000100,
'enable_coroutine' => false,
// 'dispatch_mode' => 3, //who come first who is
'worker_num' => 1, //how much worker will start
'open_length_check' => true,
'package_max_length' => 999999999,
'package_max_length' => 8 * 1024 * 1024,
'package_length_type' => 'N', //see php pack()
'package_length_offset' => 0,
'package_body_offset' => 4,

));

$this->serv->on('receive', array($this, 'onReceive'));
$this->serv->on('close', array($this, 'onClose'));
$this->serv->on('task', array($this, 'onTask'));
$this->serv->on('finish', array($this, 'onFinish'));
$this->serv->start();
}


function onReceive($serv, $fd, $from_id, $data)
function onReceive($serv, $fd, $tid, $data)
{
$packet = json_decode(substr($data,4), true);

//todo::包可能解析失败
$packet["socketfd"] = $fd;
$task_id = $serv->task(json_encode($packet));
//todo::任务可能下发失败
}

function onTask($serv, $task_id, $from_id, $data)
{
$data = json_decode($data, true);
$fd = $data["socketfd"];

echo "recv " . strlen($data) . " bytes\n";
$packet = substr($data, 4);
$result = array(
"code" => "0",
"msg" => "ok",
"data" => $data,
"data" => $packet,
);
$serv->send($fd, json_encode($result));
}

function onFinish($serv, $task_id, $data)
{

}

function onClose($serv, $fd)
{

}

function errorHandler()
{
//if (!empty($this->current_fd)) {
// $rsp = Proxy::shutdown_handler();
// $rsp && $this->serv->send($this->current_fd, $rsp);
//}
$resp = json_encode($result);
$send_data = pack('N', strlen($resp)) . $resp;
echo "send " . strlen($send_data) . " bytes\n";
$serv->send($fd, $send_data);
}
}
2 changes: 2 additions & 0 deletions include/server.h
Expand Up @@ -481,6 +481,8 @@ struct _swServer
uint32_t buffer_output_size;
uint32_t buffer_input_size;

uint32_t ipc_max_size;

void *ptr2;
void *private_data_3;

Expand Down
8 changes: 7 additions & 1 deletion include/swoole.h
Expand Up @@ -848,7 +848,7 @@ enum _swEventData_flag
typedef struct _swDataHead
{
int fd;
uint16_t len;
uint32_t len;
int16_t from_id;
uint8_t type;
uint8_t flags;
Expand All @@ -872,6 +872,12 @@ typedef struct _swEventData
char data[SW_IPC_BUFFER_SIZE];
} swEventData;

typedef struct _swSendBuffer
{
swDataHead info;
char data[0];
} swSendBuffer;

typedef struct _swDgramPacket
{
swSocketAddress info;
Expand Down
54 changes: 37 additions & 17 deletions src/server/process.c
Expand Up @@ -22,6 +22,7 @@
typedef struct _swFactoryProcess
{
swPipe *pipes;
swSendBuffer *buffer;
} swFactoryProcess;

static int swFactoryProcess_start(swFactory *factory);
Expand Down Expand Up @@ -126,6 +127,23 @@ static int swFactoryProcess_start(swFactory *factory)
swServer_store_pipe_fd(serv, serv->workers[i].pipe_object);
}

int bufsize;
socklen_t _len = sizeof(bufsize);
/**
* Get the maximum ipc[unix socket with dgram] transmission length
*/
if (getsockopt(serv->workers[0].pipe_master, SOL_SOCKET, SO_SNDBUF, &bufsize, &_len) != 0)
{
bufsize = SW_IPC_MAX_SIZE;
}
// - dgram header [32 byte]
serv->ipc_max_size = bufsize - 32;
object->buffer = sw_malloc(serv->ipc_max_size);
if (object->buffer == NULL)
{
swError("malloc[sndbuf] failed. Error: %s [%d]", strerror(errno), errno);
return SW_ERR;
}
/**
* The manager process must be started first, otherwise it will have a thread fork
*/
Expand Down Expand Up @@ -155,6 +173,7 @@ static int swFactoryProcess_notify(swFactory *factory, swDataHead *ev)
*/
static int swFactoryProcess_dispatch(swFactory *factory, swSendData *task)
{
swFactoryProcess *object = (swFactoryProcess *) factory->object;
swServer *serv = (swServer *) factory->ptr;
int fd = task->info.fd;

Expand Down Expand Up @@ -215,41 +234,42 @@ static int swFactoryProcess_dispatch(swFactory *factory, swSendData *task)

uint32_t send_n = task->length;
uint32_t offset = 0;
swEventData buf;
char *data = task->data;
swSendBuffer *buf = object->buffer;
uint32_t ipc_size = serv->ipc_max_size - sizeof(buf->info);

buf.info = task->info;
buf->info = task->info;

if (send_n <= SW_IPC_BUFFER_SIZE)
if (send_n <= ipc_size)
{
buf.info.flags = 0;
buf.info.len = send_n;
memcpy(buf.data, data, buf.info.len);
return swReactorThread_send2worker(serv, worker, &buf, sizeof(buf.info) + buf.info.len);
buf->info.flags = 0;
buf->info.len = send_n;
memcpy(buf->data, data, buf->info.len);
return swReactorThread_send2worker(serv, worker, buf, sizeof(buf->info) + buf->info.len);
}

buf.info.flags = SW_EVENT_DATA_CHUNK;
buf->info.flags = SW_EVENT_DATA_CHUNK;

while (send_n > 0)
{
if (send_n > SW_IPC_BUFFER_SIZE)
if (send_n > ipc_size)
{
buf.info.len = SW_IPC_BUFFER_SIZE;
buf->info.len = ipc_size;
}
else
{
buf.info.flags |= SW_EVENT_DATA_END;
buf.info.len = send_n;
buf->info.flags |= SW_EVENT_DATA_END;
buf->info.len = send_n;
}

memcpy(buf.data, data + offset, buf.info.len);
memcpy(buf->data, data + offset, buf->info.len);

send_n -= buf.info.len;
offset += buf.info.len;
send_n -= buf->info.len;
offset += buf->info.len;

swTrace("dispatch, type=%d|len=%d", buf.info.type, buf.info.len);
swTrace("dispatch, type=%d|len=%d", buf->info.type, buf.info.len);

if (swReactorThread_send2worker(serv, worker, &buf, sizeof(buf.info) + buf.info.len) < 0)
if (swReactorThread_send2worker(serv, worker, &buf, sizeof(buf->info) + buf->info.len) < 0)
{
return SW_ERR;
}
Expand Down
3 changes: 1 addition & 2 deletions src/server/reactor_thread.c
Expand Up @@ -1134,8 +1134,7 @@ int swReactorThread_dispatch(swConnection *conn, char *data, uint32_t length)
swServer *serv = SwooleG.serv;
swSendData task;

task.info.len = 0;
task.info.flags = 0;
bzero(&task.info, sizeof(task.info));
task.info.from_fd = conn->from_fd;
task.info.from_id = conn->from_id;
task.info.type = SW_EVENT_TCP;
Expand Down
19 changes: 15 additions & 4 deletions src/server/worker.cc
Expand Up @@ -29,6 +29,8 @@ static int swWorker_onStreamRead(swReactor *reactor, swEvent *event);
static int swWorker_onStreamPackage(swConnection *conn, char *data, uint32_t length);
static int swWorker_onStreamClose(swReactor *reactor, swEvent *event);

static swSendBuffer *g_buffer = nullptr;

void swWorker_free(swWorker *worker)
{
if (worker->send_shm)
Expand Down Expand Up @@ -716,6 +718,16 @@ int swWorker_loop(swServer *serv, int worker_id)
serv->stream_protocol.package_max_length = INT_MAX;
serv->stream_protocol.onPackage = swWorker_onStreamPackage;
serv->buffer_pool = swLinkedList_new(0, NULL);
if (serv->buffer_pool == nullptr)
{
return SW_ERR;
}
}

g_buffer = (swSendBuffer *) sw_malloc(serv->ipc_max_size);
if (g_buffer == nullptr)
{
return SW_ERR;
}

swWorker_onStart(serv);
Expand Down Expand Up @@ -753,21 +765,20 @@ int swWorker_send2reactor(swServer *serv, swEventData *ev_data, size_t sendn, in
*/
static int swWorker_onPipeReceive(swReactor *reactor, swEvent *event)
{
swEventData task;
swServer *serv = (swServer *) reactor->ptr;
swFactory *factory = &serv->factory;
int ret;

read_from_pipe:

if (read(event->fd, &task, sizeof(task)) > 0)
if (read(event->fd, g_buffer, serv->ipc_max_size) > 0)
{
ret = swWorker_onTask(factory, &task);
ret = swWorker_onTask(factory, (swEventData *) g_buffer);
#ifndef SW_WORKER_RECV_AGAIN
/**
* Big package
*/
if (task.info.flags & SW_EVENT_DATA_CHUNK)
if (g_buffer->info.flags & SW_EVENT_DATA_CHUNK)
#endif
{
//no data
Expand Down

0 comments on commit 48c1b2b

Please sign in to comment.