Skip to content

swoole4.4-task消息队列模式,外部程序投递任务,接收字符串数据会截断前4个字节 #2872

@ghost

Description

我用运行了官方提供的消息队列示例代码
服务端:

<?php
$serv = new swoole_server("0.0.0.0", 7777, SWOOLE_BASE);

$serv->set(array(
    'task_worker_num' => 4,
    'task_ipc_mode' => 3,
    'message_queue_key' => 0x70,
    'task_tmpdir' => '/tmp/',
));

$serv->on('Receive', function(swoole_server $serv, $fd, $reactor_id, $data) {
});

$serv->on('Task', function (swoole_server $serv, $task_id, $reactor_id, $data) {
    echo "#{$serv->worker_id}\tonTask: [PID={$serv->worker_pid}]: TASK_ID=$task_id]\n";
    var_dump($data);
});

$serv->on('Finish', function (swoole_server $serv, $task_id, $data) {
    echo "Task#$task_id finished, data_len=".strlen($data).PHP_EOL;
});

$serv->on('workerStart', function($serv, $worker_id) {
    global $argv;
    if($worker_id >= $serv->setting['worker_num']) {
        swoole_set_process_name("php {$argv[0]}: task_worker");
    } else {
        swoole_set_process_name("php {$argv[0]}: worker");
    }
});

$serv->start();

客户端:

<?php

class SwooleTask
{
    protected $queueId;
    protected $workerId;
    protected $taskId = 1;

    const     SW_TASK_TMPFILE = 1;  //tmp file
    const   SW_TASK_SERIALIZE = 2;  //php serialize
    const    SW_TASK_NONBLOCK = 4;  //task

    const SW_EVENT_TASK = 7;

    /**
     * SwooleTask constructor.
     * @param $key
     * @param int $workerId
     * @throws Exception
     */
    function __construct($key, $workerId = 0)
    {
        $this->queueId = msg_get_queue($key);
        if ($this->queueId === false)
        {
            throw new \Exception("msg_get_queue() failed.");
        }
        $this->workerId = $workerId;
    }

    protected static function pack($taskId, $data)
    {
        $flags = self::SW_TASK_NONBLOCK;
        $type = self::SW_EVENT_TASK;
        if (!is_string($data))
        {
            $data = serialize($data);
            $flags |= self::SW_TASK_SERIALIZE;
        }
        if (strlen($data) >= 8180)
        {
            $tmpFile = tempnam('/tmp/', 'swoole.task');
            file_put_contents($tmpFile, $data);
            $data = pack('l', strlen($data)) . $tmpFile . "\0";
            $flags |= self::SW_TASK_TMPFILE;
            $len = 128 + 24;
        }
        else
        {
            $len = strlen($data);
        }

        return pack('lSsCCS', $taskId, $len, 0, $type, 0, $flags) . $data;
    }

    function dispatch($data)
    {
        $taskId = $this->taskId++;
        if (!msg_send($this->queueId, 2, self::pack($taskId, $data), false))
        {
            return false;
        }
        else
        {
            return $taskId;
        }
    }
}

echo "Sending text to msg queue.\n";
$task = new SwooleTask(0x70, 1);
//普通字符串
$task->dispatch("Hello from PHP!");`

运行结果:
image

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions