Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 3 additions & 39 deletions examples/task/msg_push.php
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
<?php


echo "Sending text to msg queue.\n";

class SwooleTask
{
protected $queueId;
protected $workerId;
protected $taskId = 0;

function __construct($key, $workerId = 0)
function __construct($key, $workerId)
{
$this->queueId = msg_get_queue($key);
if ($this->queueId === false)
Expand All @@ -19,42 +16,9 @@ function __construct($key, $workerId = 0)
$this->workerId = $workerId;
}

protected function pack($data)
{
$fromFd = 0;
$type = 7;
if (!is_string($data))
{
$data = serialize($data);
$fromFd |= 2;
}
if (strlen($data) >= 8180)
{
$tmpFile = tempnam('/tmp/', 'swoole.task');
file_put_contents($tmpFile, $data);
$data = pack('l', strlen($data)) . $tmpFile . "\0";
$fromFd |= 1;
$len = 128 + 24;
}
else
{
$len = strlen($data);
}
//typedef struct _swDataHead
//{
// int fd;
// uint16_t len;
// int16_t reactor_id;
// uint8_t type;
// uint8_t flags;
// uint16_t server_fd;
//} swDataHead;
return pack('lSsCCS', $this->taskId++, $len, $this->workerId, $type, 0, $fromFd) . $data;
}

function dispatch($data)
{
if (!msg_send($this->queueId, $this->workerId + 1, $this->pack($data), false))
if (!msg_send($this->queueId, $this->workerId + 1, Swoole\Server\Task::pack($data), false))
{
return false;
}
Expand All @@ -65,7 +29,7 @@ function dispatch($data)
}
}

$task = new SwooleTask(0x70001001);
$task = new SwooleTask(0x70001001, 0);
//普通字符串
$task->dispatch("Hello from PHP!");
//数组
Expand Down
25 changes: 24 additions & 1 deletion swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_server_finish, 0, 0, 1)
ZEND_ARG_INFO(0, data)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_server_task_pack, 0, 0, 1)
ZEND_ARG_INFO(0, data)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_server_reload, 0, 0, 0)
ZEND_END_ARG_INFO()

Expand Down Expand Up @@ -314,6 +318,7 @@ static PHP_METHOD(swoole_connection_iterator, __destruct);
* Server\Task
*/
static PHP_METHOD(swoole_server_task, finish);
static PHP_METHOD(swoole_server_task, pack);

static zend_function_entry swoole_server_methods[] = {
PHP_ME(swoole_server, __construct, arginfo_swoole_server__construct, ZEND_ACC_PUBLIC)
Expand Down Expand Up @@ -384,6 +389,7 @@ static const zend_function_entry swoole_connection_iterator_methods[] =
static const zend_function_entry swoole_server_task_methods[] =
{
PHP_ME(swoole_server_task, finish, arginfo_swoole_server_finish, ZEND_ACC_PUBLIC)
PHP_ME(swoole_server_task, pack, arginfo_swoole_server_task_pack, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
PHP_FE_END
};

Expand All @@ -405,7 +411,6 @@ static void php_swoole_onWorkerStop(swServer *, int worker_id);
static void php_swoole_onWorkerExit(swServer *serv, int worker_id);
static void php_swoole_onUserWorkerStart(swServer *serv, swWorker *worker);
static int php_swoole_onTask(swServer *, swEventData *task);
static int php_swoole_onTaskCo(swServer *, swEventData *task);
static int php_swoole_onFinish(swServer *, swEventData *task);
static void php_swoole_onWorkerError(swServer *serv, int worker_id, pid_t worker_pid, int exit_code, int signo);
static void php_swoole_onManagerStart(swServer *serv);
Expand Down Expand Up @@ -3686,6 +3691,24 @@ static PHP_METHOD(swoole_server_task, finish)
SW_CHECK_RETURN(php_swoole_task_finish(serv, data, (swEventData* )info));
}

static PHP_METHOD(swoole_server_task, pack)
{
swEventData buf = {0};
zval *data;

ZEND_PARSE_PARAMETERS_START(1, 1)
Z_PARAM_ZVAL(data)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

if (php_swoole_task_pack(&buf, data) < 0)
{
RETURN_FALSE;
}
swTask_type(&buf) |= (SW_TASK_NONBLOCK | SW_TASK_NOREPLY);

RETURN_STRINGL((char* )&buf, sizeof(buf.info) + buf.info.len);
}

static PHP_METHOD(swoole_server, bind)
{
zend_long fd = 0;
Expand Down
96 changes: 96 additions & 0 deletions tests/swoole_server/task/task_pack.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
--TEST--
Server/task: task pack
--SKIPIF--
<?php require __DIR__ . '/../../include/skipif.inc';
skip_if_function_not_exist('msg_get_queue');
?>
--FILE--
<?php
require __DIR__ . '/../../include/bootstrap.php';
const MSGQ_KEY = 0x70001001;

use Swoole\Server;

$pm = new SwooleTest\ProcessManager;
$result = new Swoole\Atomic(0);

$pm->parentFunc = function ($pid) use ($pm) {
$task = new class(MSGQ_KEY, 0) {
protected $queueId;
protected $workerId;

function __construct($key, $workerId) {
$this->queueId = msg_get_queue($key);
if ($this->queueId === false) {
throw new \Swoole\Exception("msg_get_queue() failed.");
}
$this->workerId = $workerId;
}

function dispatch($data) {
if (!msg_send($this->queueId, $this->workerId + 1, Swoole\Server\Task::pack($data), false)) {
return false;
} else {
return true;
}
}
};
//数组
$task->dispatch(array('data' => str_repeat('A', 1024), 'type' => 1));
//大包
$task->dispatch(array('data' => str_repeat('B', 1024 * 32), 'type' => 2));
//普通字符串
$task->dispatch(str_repeat('C', 512));
};

$pm->childFunc = function () use ($pm) {
ini_set('swoole.display_errors', 'Off');
$serv = new Server('127.0.0.1', $pm->getFreePort(), SWOOLE_BASE);
$serv->set(array(
"worker_num" => 1,
'task_worker_num' => 1,
'task_ipc_mode' => 3,
'message_queue_key' => MSGQ_KEY,
'log_file' => '/dev/null',
));
$serv->on("WorkerStart", function (Server $serv) use ($pm) {
$pm->wakeup();
});
$serv->on('receive', function (Server $serv, $fd, $rid, $data) {

});
$serv->on('task', function (Server $serv, $task_id, $worker_id, $data) {
global $result;
switch ($task_id) {
case 0:
Assert::isArray($data);
Assert::eq($data['type'], 1);
Assert::length($data['data'], 1024);
$result->add(1);
break;
case 1:
Assert::isArray($data);
Assert::eq($data['type'], 2);
Assert::length($data['data'], 1024 * 32);
$result->add(1);
break;
case 2:
Assert::assert(is_string($data));
Assert::length($data, 512);
$result->add(1);
$serv->shutdown();
break;
default:
break;
}
});

$serv->start();
};

$pm->childFirst();
$pm->run();

Assert::eq($result->get(), 3);
?>
--EXPECT--