Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
twose committed Apr 2, 2019
2 parents 66c31ee + df6e2d1 commit 0c5f9ba
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 179 deletions.
14 changes: 11 additions & 3 deletions examples/php/buf_size.php
Expand Up @@ -6,9 +6,17 @@
$socket = $fds[0];

socket_set_option($socket, SOL_SOCKET, SO_SNDBUF, 8 * 1024 * 1024);
$retval = socket_get_option($socket, SOL_SOCKET, SO_SNDBUF);
var_dump($retval);
$send_buf_size = socket_get_option($socket, SOL_SOCKET, SO_SNDBUF);
echo "send_buf_size=$send_buf_size\n";

$n = $retval - 32;

socket_set_option($socket, SOL_SOCKET, SO_RCVBUF, 8 * 1024);
$recv_buf_size = socket_get_option($socket, SOL_SOCKET, SO_RCVBUF);
echo "recv_buf_size=$recv_buf_size\n";

$n = $send_buf_size - 32;
$ret_n = socket_write($socket, str_repeat('A', $n), $n);
var_dump($ret_n);

$ret_n = socket_read($fds[1], $n);
var_dump($ret_n);
137 changes: 59 additions & 78 deletions examples/server/fixed_header_server.php
@@ -1,105 +1,86 @@
<?php
define('PID_FILE_NAME', '/tmp/swoole_server.pid');

$serv = new FixedHeaderServer();
$serv = new SocketServer();
$serv->run('0.0.0.0', 9504);

class FixedHeaderServer
class SocketServer
{
protected $buffer = array();
protected $length = array();

/**
* @var swoole_server
*/
protected $serv;
protected $serv; //swoole server

const MAX_PACKAGE_LEN = 8000000;
const MAX_PACKAGE_LEN = 8000000; //max data accept

function onPackage($fd, $pkg)
function run($host, $port)
{
$this->current_fd = $fd;
var_dump($pkg);
$resp = "hello world";
$this->serv->send($fd, $resp);
$this->current_fd = '';
register_shutdown_function(array($this, 'errorHandler'));
$this->serv = new swoole_server($host, $port);

$this->serv->set(array(
//'daemonize' => true,
'max_request' => 2000, //reload worker by run xx times
'dispatch_mode' => 3, //who come first who is
'worker_num' => 5, //how much worker will start
'reactor_num' => 2, // depend cpu how much cpu you have
'backlog' => 128, //accept queue
'open_cpu_affinity' => 1, //get cpu more time
'open_tcp_nodelay' => 1, // for small packet to open
'tcp_defer_accept' => 5, //client will accept when not have data
'max_conn' => 10000,
'task_worker_num' => 10,
'task_ipc_mode' => 2, //use queue with "who come first who is"
'message_queue_key' => 0x72000100,
'open_length_check' => true,
'package_max_length' => 999999999,
'package_length_type' => 'N', //see php pack()
'package_length_offset' => 0,
'package_body_offset' => 4,

));

$this->serv->on('receive', array($this, 'onReceive'));
$this->serv->on('close', array($this, 'onClose'));
$this->serv->on('task', array($this, 'onTask'));
$this->serv->on('finish', array($this, 'onFinish'));
$this->serv->start();
}


function onReceive($serv, $fd, $from_id, $data)
{
echo "package".substr($data, -4, 4)." length=". (strlen($data) - 2)."\n";
$packet = json_decode(substr($data,4), true);

//todo::包可能解析失败
$packet["socketfd"] = $fd;
$task_id = $serv->task(json_encode($packet));
//todo::任务可能下发失败
}

function onReceive_unpack_php($serv, $fd, $from_id, $data)
function onTask($serv, $task_id, $from_id, $data)
{
if (empty($this->buffer[$fd]))
{
$this->buffer[$fd] = '';
$this->length[$fd] = 0;
}

$this->buffer[$fd] .= $data;
$buffer = &$this->buffer[$fd];

do
{
if ($this->length[$fd] === 0)
{
$n = unpack('Nlen', substr($buffer, 0, 4));
$this->length[$fd] = $n['len'];
if ($n['len'] > self::MAX_PACKAGE_LEN)
{
$this->serv->close($fd);
return;
}
}

if (strlen($buffer) >= $this->length[$fd])
{
$this->onPackage($fd, substr($buffer, 0, $this->length[$fd]));
$buffer = substr($buffer, $this->length[$fd]);
$this->length[$fd] = 0;
}
else
{
break;
}
} while(strlen($buffer) > 0);
$data = json_decode($data, true);
$fd = $data["socketfd"];

$result = array(
"code" => "0",
"msg" => "ok",
"data" => $data,
);
$serv->send($fd, json_encode($result));
}

function onClose($serv, $fd)
function onFinish($serv, $task_id, $data)
{
unset($this->buffer[$fd], $this->length[$fd]);

}

function run($host, $port)
function onClose($serv, $fd)
{
register_shutdown_function(array($this, 'errorHandler'));
$this->serv = new swoole_server($host, $port);
file_put_contents(PID_FILE_NAME, posix_getpid());

$this->serv->set(array(
'max_request' => 0,
// 'dispatch_mode' => 3,
'open_length_check' => true,
'package_max_length' => 81920,
'package_length_type' => 'n', //see php pack()
'package_length_offset' => 0,
'package_body_offset' => 2,
'worker_num' => 2,
));

$this->serv->on('receive', array($this, 'onReceive'));
$this->serv->on('close', array($this, 'onClose'));
$this->serv->start();
}

function errorHandler()
{
if(!empty($this->current_fd))
{
$rsp = Proxy::shutdown_handler();
$rsp && $this->serv->send($this->current_fd, $rsp);
}
//if (!empty($this->current_fd)) {
// $rsp = Proxy::shutdown_handler();
// $rsp && $this->serv->send($this->current_fd, $rsp);
//}
}
}
86 changes: 0 additions & 86 deletions examples/server/fixed_header_server1.7.3.php

This file was deleted.

2 changes: 2 additions & 0 deletions src/server/process.c
Expand Up @@ -116,6 +116,8 @@ static int swFactoryProcess_start(swFactory *factory)
{
if (swPipeUnsock_create(&object->pipes[i], 1, SOCK_DGRAM) < 0)
{
sw_free(object->pipes);
object->pipes = NULL;
return SW_ERR;
}
serv->workers[i].pipe_master = object->pipes[i].getFd(&object->pipes[i], SW_PIPE_MASTER);
Expand Down
29 changes: 17 additions & 12 deletions thirdparty/plain_wrapper.c
Expand Up @@ -61,7 +61,7 @@ extern int php_get_gid_by_name(const char *name, gid_t *gid);

static size_t php_stdiop_write(php_stream *stream, const char *buf, size_t count);
static size_t php_stdiop_read(php_stream *stream, char *buf, size_t count);
static int php_stdiop_close(php_stream *stream, int close_handle);
static int sw_php_stdiop_close(php_stream *stream, int close_handle);
static int php_stdiop_stat(php_stream *stream, php_stream_statbuf *ssb);
static int php_stdiop_flush(php_stream *stream);
static int php_stdiop_seek(php_stream *stream, zend_off_t offset, int whence, zend_off_t *newoffset);
Expand Down Expand Up @@ -179,7 +179,7 @@ typedef struct {
static php_stream_ops sw_php_stream_stdio_ops = {
php_stdiop_write,
php_stdiop_read,
php_stdiop_close,
sw_php_stdiop_close,
php_stdiop_flush,
"STDIO/coroutine",
php_stdiop_seek,
Expand Down Expand Up @@ -312,7 +312,7 @@ static size_t php_stdiop_read(php_stream *stream, char *buf, size_t count)
return ret;
}

static int php_stdiop_close(php_stream *stream, int close_handle)
static int sw_php_stdiop_close(php_stream *stream, int close_handle)
{
int ret;
php_stdio_stream_data *data = (php_stdio_stream_data*)stream->abstract;
Expand Down Expand Up @@ -350,15 +350,20 @@ static int php_stdiop_close(php_stream *stream, int close_handle)
ret = fclose(data->file);
data->file = NULL;
}
} else if (data->fd != -1) {
if (data->lock_flag) {
swoole_coroutine_flock_ex(stream->orig_path, data->fd, LOCK_UN);
}
ret = close(data->fd);
data->fd = -1;
} else {
return 0; /* everything should be closed already -> success */
}
}
else if (data->fd != -1)
{
if (data->lock_flag == LOCK_EX || data->lock_flag == LOCK_SH)
{
swoole_coroutine_flock_ex(stream->orig_path, data->fd, LOCK_UN);
}
ret = close(data->fd);
data->fd = -1;
}
else
{
return 0; /* everything should be closed already -> success */
}
if (data->temp_name) {
#ifdef PHP_WIN32
php_win32_ioutil_unlink(ZSTR_VAL(data->temp_name));
Expand Down

0 comments on commit 0c5f9ba

Please sign in to comment.