Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions benchmark/server/http.php
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
<?php
$http = new swoole_http_server("127.0.0.1", 9501, SWOOLE_BASE);

use Swoole\Http\Server;
use Swoole\Http\Response;
use Swoole\Http\Request;

$http = new Server("127.0.0.1", 9501, SWOOLE_BASE);

$pool = [];

$http->set([
'worker_num' => 4,
// 'worker_num' => 4,
'hook_flags' => SWOOLE_HOOK_ALL,
]);

$http->on('request', function ($request, swoole_http_response $response) {
$response->header('Last-Modified', 'Thu, 18 Jun 2015 10:24:27 GMT');
$response->header('E-Tag', '55829c5b-17');
$response->header('Accept-Ranges', 'bytes');
$response->end("<h1>\nHello Swoole.\n</h1>");
$http->on('request', function (Request $request, Response $response) use (&$pool) {
// var_dump($request->server['request_uri']);
if ($request->server['request_uri'] == '/') {
$response->header('Last-Modified', 'Thu, 18 Jun 2015 10:24:27 GMT');
$response->header('E-Tag', '55829c5b-17');
$response->header('Accept-Ranges', 'bytes');
$response->end("<h1>\nHello Swoole.\n</h1>");
} elseif ($request->server['request_uri'] == '/redis') {
$redis = new redis;
$redis->connect('127.0.0.1', 6379);
$value = $redis->get('key');
$redis->close();
$pool[] = $redis;
$response->end("<h1>Value=" . $value . "</h1>");
} elseif ($request->server['request_uri'] == '/redis') {
$response->end("<pre>" . var_export($pool, 1) . "</pre>\n");
}
});

$http->start();
1 change: 1 addition & 0 deletions library/core/Constant.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Constant
const EVENT_WORKER_EXIT = 'workerExit';
const EVENT_WORKER_ERROR = 'workerError';
const EVENT_WORKER_STOP = 'workerStop';
const EVENT_PIPE_MESSAGE = 'pipeMessage';
const EVENT_MANAGER_START = 'managerStart';
const EVENT_MANAGER_STOP = 'managerStop';
const EVENT_ERROR = 'error';
Expand Down
1 change: 1 addition & 0 deletions php_swoole_library.h
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,7 @@ static const char* swoole_library_source_core_constant =
" const EVENT_WORKER_EXIT = 'workerExit';\n"
" const EVENT_WORKER_ERROR = 'workerError';\n"
" const EVENT_WORKER_STOP = 'workerStop';\n"
" const EVENT_PIPE_MESSAGE = 'pipeMessage';\n"
" const EVENT_MANAGER_START = 'managerStart';\n"
" const EVENT_MANAGER_STOP = 'managerStop';\n"
" const EVENT_ERROR = 'error';\n"
Expand Down
31 changes: 24 additions & 7 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1307,21 +1307,38 @@ static int swServer_tcp_close(swServer *serv, int session_id, int reset)
conn->close_actively = 1;
swTraceLog(SW_TRACE_CLOSE, "session_id=%d, fd=%d", session_id, conn->session_id);

int ret;
if (!swIsWorker())
int retval;
swWorker *worker;
swDataHead ev = { 0 };

if (swServer_dispatch_mode_is_mod(serv))
{
int worker_id = swServer_worker_schedule(serv, conn->fd, nullptr);
if (worker_id != (int) SwooleWG.id)
{
worker = swServer_get_worker(serv, worker_id);
goto _notify;
}
else
{
goto _close;
}
}
else if (!swIsWorker())
{
swWorker *worker = swServer_get_worker(serv, conn->fd % serv->worker_num);
swDataHead ev = {0};
worker = swServer_get_worker(serv, conn->fd % serv->worker_num);
_notify:
ev.type = SW_SERVER_EVENT_CLOSE;
ev.fd = session_id;
ev.reactor_id = conn->reactor_id;
ret = swWorker_send2worker(worker, &ev, sizeof(ev), SW_PIPE_MASTER);
retval = swWorker_send2worker(worker, &ev, sizeof(ev), SW_PIPE_MASTER);
}
else
{
ret = serv->factory.end(&serv->factory, session_id);
_close:
retval = serv->factory.end(&serv->factory, session_id);
}
return ret;
return retval;
}

void swServer_signal_init(swServer *serv)
Expand Down
27 changes: 3 additions & 24 deletions src/server/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,34 +376,13 @@ static int process_send_packet(swServer *serv, swPipeBuffer *buf, swSendData *re

static bool inline process_is_supported_send_yield(swServer *serv, swConnection *conn)
{
if (serv->dispatch_mode == SW_DISPATCH_FDMOD)
if (!swServer_dispatch_mode_is_mod(serv))
{
return conn->fd % serv->worker_num == SwooleWG.id;
}
else if (serv->dispatch_mode == SW_DISPATCH_IPMOD)
{
uint32_t key;
//IPv4
if (conn->socket_type == SW_SOCK_TCP)
{
key = conn->info.addr.inet_v4.sin_addr.s_addr;
}
//IPv6
else
{
#ifdef HAVE_KQUEUE
key = *(((uint32_t *) &conn->info.addr.inet_v6.sin6_addr) + 3);
#elif defined(_WIN32)
key = conn->info.addr.inet_v6.sin6_addr.u.Word[3];
#else
key = conn->info.addr.inet_v6.sin6_addr.s6_addr32[3];
#endif
}
return key % serv->worker_num == SwooleWG.id;
return false;
}
else
{
return false;
return swServer_worker_schedule(serv, conn->fd, nullptr) == SwooleWG.id;
}
}

Expand Down
68 changes: 68 additions & 0 deletions tests/swoole_server/close_in_non_current_worker.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
--TEST--
swoole_server: close in non-current worker
--SKIPIF--
<?php
require __DIR__ . '/../include/skipif.inc';
?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

use Swoole\Server;
use Swoole\Constant;

$pm = new SwooleTest\ProcessManager;

$socket = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_DGRAM, 0);

$pm->parentFunc = function ($pid) use ($pm) {
go(function () use ($pm) {
$client = new Swoole\Coroutine\Client(SWOOLE_SOCK_TCP );
if (!$client->connect('127.0.0.1', $pm->getFreePort())) {
exit("connect failed\n");
}
$client->send("close");
$data = $client->recv();
Assert::string($data);
Assert::length($data, 0);
echo "DONE\n";
});
Swoole\Event::wait();
$pm->kill();

global $socket;
$result[] = fgets($socket[1]);
$result[] = fgets($socket[1]);

Assert::eq($result[0], $result[1]);
};

$pm->childFunc = function () use ($pm) {
$serv = new Server('127.0.0.1', $pm->getFreePort(), SWOOLE_PROCESS);
$serv->set([
'worker_num' => 2,
'log_file' => TEST_LOG_FILE,
]);
$serv->on("workerStart", function ($serv) use ($pm) {
$pm->wakeup();
});
$serv->on('receive', function (Server $serv, $fd, $reactor_id, $data) {
global $socket;
fwrite($socket[0], $serv->worker_id."\n");
$serv->sendMessage(['close_fd' => $fd, 'worker_id' => $serv->worker_id], 1 - $serv->worker_id);
});
$serv->on(Constant::EVENT_CLOSE, function (Server $serv, $fd, $reactor_id) {
global $socket;
fwrite($socket[0], $serv->worker_id . "\n");
});
$serv->on(Constant::EVENT_PIPE_MESSAGE, function (Server $serv, $workerId, $msg) {
$serv->close($msg['close_fd']);
});
$serv->start();
};

$pm->childFirst();
$pm->run();
?>
--EXPECT--
DONE
69 changes: 69 additions & 0 deletions tests/swoole_server/close_in_task_worker.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
--TEST--
swoole_server: close in task worker
--SKIPIF--
<?php
require __DIR__ . '/../include/skipif.inc';
?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

use Swoole\Server;
use Swoole\Constant;

$pm = new SwooleTest\ProcessManager;

$socket = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_DGRAM, 0);

$pm->parentFunc = function ($pid) use ($pm) {
go(function () use ($pm) {
$client = new Swoole\Coroutine\Client(SWOOLE_SOCK_TCP );
if (!$client->connect('127.0.0.1', $pm->getFreePort())) {
exit("connect failed\n");
}
$client->send("close");
$data = $client->recv();
Assert::string($data);
Assert::length($data, 0);
echo "DONE\n";
});
Swoole\Event::wait();
$pm->kill();

global $socket;
$result[] = fgets($socket[1]);
$result[] = fgets($socket[1]);

Assert::eq($result[0], $result[1]);
};

$pm->childFunc = function () use ($pm) {
$serv = new Server('127.0.0.1', $pm->getFreePort(), SWOOLE_PROCESS);
$serv->set([
'worker_num' => 1,
'task_worker_num' => 1,
'log_file' => TEST_LOG_FILE,
]);
$serv->on("workerStart", function ($serv) use ($pm) {
$pm->wakeup();
});
$serv->on('receive', function (Server $serv, $fd, $reactor_id, $data) {
global $socket;
fwrite($socket[0], $serv->worker_id . "\n");
$serv->task(['close_fd' => $fd, 'worker_id' => $serv->worker_id]);
});
$serv->on(Constant::EVENT_CLOSE, function (Server $serv, $fd, $reactor_id) {
global $socket;
fwrite($socket[0], $serv->worker_id . "\n");
});
$serv->on(Constant::EVENT_TASK, function (Server $serv, $task_id, $worker_id, $msg) {
$serv->close($msg['close_fd']);
});
$serv->start();
};

$pm->childFirst();
$pm->run();
?>
--EXPECT--
DONE