Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[discussion] swow是否支持fork操作? #176

Open
chaz6chez opened this issue Mar 21, 2023 · 7 comments
Open

[discussion] swow是否支持fork操作? #176

chaz6chez opened this issue Mar 21, 2023 · 7 comments
Labels
discussion Discuss things in this issue

Comments

@chaz6chez
Copy link

不知道swow是否支持fork操作?
如果fork后,所有的进程是否共用一个协程的调度执行线程?

在我的概念里,PHP自身进程的存在单线程的工作线程,而swow提供了另一个线程+调度器来hook PHP工作线程的一些协程操作,底层是通过libcat来提供事件循环的;libcat基于libuv,在我的概念里是支持fork和多线程的。

如果swow支持fork,那么我可以用sidecar的思想思考整个模型,我fork出来的每一个进程都将包含一个PHP工作线程+swow调度线程,那么实际上这样一个模型就很类似golang的GPM模型,尤其是在systemcall部分,因为golang的systemcall部分协程是在同一条P上进行执行的,并不会被多个P调度执行。

那么如果按照这样最乐观的情况考虑,PHP就可以实现一个类似GPM模型的多进程模型的程序。

@chaz6chez chaz6chez added the discussion Discuss things in this issue label Mar 21, 2023
@twose
Copy link
Member

twose commented Mar 23, 2023

支持 fork,但尽量在程序运行早期进行。

http://docs.libuv.org/en/v1.x/loop.html#c.uv_loop_fork

每个进程都有单独的事件循环,进程间可以通过 IPC 的方式进行协作。
做这个多进程模型也在计划内,但我不建议用 fork,而是通过开启新的进程以兼容 Windows 平台;另一个是 Swow 基于 libcat 基于 libuv,libuv 的异步文件 IO 是带线程的,所以会出现带线程 fork 的问题。

Swow 其实还隐藏了一个实验性功能,就是函数的序列化,你可以通过序列化函数投递给另一个进程实现跨进程任务投递。

@chaz6chez
Copy link
Author

支持 fork,但尽量在程序运行早期进行。

http://docs.libuv.org/en/v1.x/loop.html#c.uv_loop_fork

非常感谢,我认为在waitAll()执行之前进行fork就没有太大问题。

每个进程都有单独的事件循环,进程间可以通过 IPC 的方式进行协作。 做这个多进程模型也在计划内,但我不建议用 fork,而是通过开启新的进程以兼容 Windows 平台;另一个是 Swow 基于 libcat 基于 libuv,libuv 的异步文件 IO 是带线程的,所以会出现带线程 fork 的问题。

与上面同理,其实我在waitAll()执行前仅创建了一些IO的回调事件,在fork发生后我才对每个进程执行waitAll()。

Swow 其实还隐藏了一个实验性功能,就是函数的序列化,你可以通过序列化函数投递给另一个进程实现跨进程任务投递。

这个功能是使用PHP自带的系列化操作对函数进行序列化后通过如socket传递就可以传递函数任务了吗?

@chaz6chez chaz6chez reopened this Mar 23, 2023
@twose
Copy link
Member

twose commented Mar 24, 2023

是的 直接

serialize(static function () {});

就可以。

有一个小技巧,可以看相关的测试:https://github.com/swow/swow/tree/develop/ext/tests/swow_closure

@twose
Copy link
Member

twose commented Mar 24, 2023

能不能 fork 和 waitAll 关系不大。主要是在于 资源 和 多线程。

@chaz6chez
Copy link
Author

PHPUnit 10.0.18 by Sebastian Bergmann and contributors.

Runtime:       PHP 8.1.17

S                                                                   1 / 1 (100%)

Time: 00:01.364, Memory: 8.00 MB

OK, but some tests were skipped!
Tests: 1, Assertions: 0, Skipped: 1.

Fatal error: Kill Coroutine#0 failed when destruct object, reason: Kill coroutine 0 failed, reason: Coroutine is not in executing in Unknown on line 0

Aborted

请问,我是用phpunit进行测试的时候,始终会有这样的问题;我的测试用例及被测试对象中并没有使用任何kill相关的操作,这是怎么回事?

@twose
Copy link
Member

twose commented Mar 29, 2023

能否打包给我一份最小的复现例子? @chaz6chez

@chaz6chez
Copy link
Author

@twose

以下是被测试对象的大概样本,和为workerman提供的swow的event-loop class类似

class SwowLoop extends AbstractLoop
{

    /** @var bool  */
    protected bool $_stopped = false;

    /** @inheritDoc */
    public function getExtName(): string
    {
        return 'swow';
    }

    /** @inheritDoc */
    public function hasExt(): bool
    {
        return extension_loaded($this->getExtName());
    }

    /** @inheritDoc */
    public function addReadStream($stream, Closure $handler): void
    {
        if(\is_resource($stream) and !isset($this->_readFds[$key = (int)$stream])){
            $this->_reads[$key] = null;
            $this->_readFds[$key] = $stream;
            Coroutine::run(function () use ($handler, $key): void {
                try {
                    $this->_reads[$key] = Coroutine::getCurrent();
                    while (!$this->_stopped) {
                        if (!isset($this->_readFds[$key])) {
                            break;
                        }
                        if ($this->_reads[$key] === null) {
                            continue;
                        }
                        $event = stream_poll_one($stream = $this->_readFds[$key], STREAM_POLLIN | STREAM_POLLHUP);

                        if ($event !== STREAM_POLLNONE) {
                            \call_user_func($handler, $stream);
                        }
                        if ($event !== STREAM_POLLIN) {
                            $this->delReadStream($stream);
                            break;
                        }
                    }
                } catch (\RuntimeException) {
                    $this->delReadStream($stream);
                }
            });
        }
    }

    /** @inheritDoc */
    public function delReadStream($stream): void
    {
        if(
            \is_resource($stream) and
            isset($this->_readFds[$key = (int)$stream]) and
            isset($this->_reads[$key])
        ){
            unset($this->_readFds[$key], $this->_reads[$key]);
        }
    }

    /** @inheritDoc */
    public function addWriteStream($stream, Closure $handler): void
    {
        if(\is_resource($stream) and !isset($this->_writeFds[$key = (int)$stream])) {
            $this->_writes[$key] = null;
            $this->_writeFds[$key] = $stream;
            Coroutine::run(function () use ($handler, $key): void {
                try {
                    $this->_writes[$key] = Coroutine::getCurrent();
                    while (!$this->_stopped) {
                        if (!isset($this->_writeFds[$key])) {
                            break;
                        }
                        if ($this->_writes[$key] === null) {
                            continue;
                        }
                        $event = stream_poll_one($stream = $this->_writeFds[$key], STREAM_POLLOUT | STREAM_POLLHUP);

                        if ($event !== STREAM_POLLNONE) {
                            \call_user_func($handler, $stream);
                        }
                        if ($event !== STREAM_POLLOUT) {
                            $this->delWriteStream($stream);
                            break;
                        }
                    }
                } catch (\RuntimeException) {
                    $this->delWriteStream($stream);
                }
            });
        }
    }

    /** @inheritDoc */
    public function delWriteStream($stream): void
    {
        if(
            \is_resource($stream) and
            isset($this->_writeFds[$key = (int)$stream]) and
            isset($this->_writes[$key])
        ){
            unset($this->_writeFds[$key], $this->_writes[$key]);
        }
    }

    /** @inheritDoc */
    public function addSignal(int $signal, Closure $handler): void
    {
        if(!isset($this->_signals[$signal])){
            // 占位
            $this->_signals[$signal] = null;
            Coroutine::run(function () use ($signal, $handler): void {
                $this->_signals[$signal] = Coroutine::getCurrent();
                while (!$this->_stopped) {
                    try {
                        Signal::wait($signal);
                        if (!isset($this->_signals[$signal])) {
                            break;
                        }
                        if ($this->_signals[$signal] === null) {
                            continue;
                        }
                        \call_user_func($handler, $signal);
                    } catch (SignalException) {}
                }
            });
        }
    }

    /** @inheritDoc */
    public function delSignal(int $signal): void
    {
        if(isset($this->_signals[$signal])){
            unset($this->_signals[$signal]);
        }
    }

    /** @inheritDoc */
    public function addTimer(float $delay, float|false $repeat, Closure $handler): string
    {
        $delay = $this->_floatToInt($delay);
        $repeat = $this->_floatToInt($repeat);
        $coroutine = Coroutine::run(function () use ($delay, $repeat, $handler): void {
            $first = true;
            while (!$this->_stopped) {
                if($repeat === false){
                    $this->_storage->del(spl_object_hash(Coroutine::getCurrent()));
                    break;
                }
                if($first){
                    msleep($delay);
                }else{
                    msleep($repeat);
                }
                \call_user_func($handler);
                $first = false;
            }
        });
        return $this->_storage->add(spl_object_hash($coroutine), $coroutine->getId());
    }

    /** @inheritDoc */
    public function delTimer(string $timerId): void
    {
        $id = $this->_storage->get($timerId);
        if($id !== null){
            Coroutine::get($id)->kill();
        }
        $this->_storage->del($timerId);
    }

    /** @inheritDoc */
    public function run(): void
    {
        $this->_stopped = false;
        waitAll();
    }

    /** @inheritDoc */
    public function stop(): void
    {
        $this->_stopped = true;
        Coroutine::killAll();
    }

    /** @inheritDoc */
    public function destroy(): void
    {
        $this->stop();
        $this->clear();
    }

    /** 获取小数点位数 */
    protected function _floatToInt(float|false $float): int|false
    {
        if($float === false){
            return false;
        }
        $float = $float * 1000;
        if($float < 0.0){
            throw new InvalidArgumentException('Minimum support 0.001');
        }
        return (int)($float);
    }
}

AbstractLoop

abstract class AbstractLoop
{
    /** @var resource[] */
    protected array $_readFds = [];

    /** @var resource[] */
    protected array $_writeFds = [];

    /** @var array All listeners for read event. */
    protected array $_reads = [];

    /** @var array All listeners for write event. */
    protected array $_writes = [];

    /** @var array Event listeners of signal. */
    protected array $_signals = [];

    /** @var Storage 定时器容器 */
    protected Storage $_storage;

    /**
     * @throws DriverExtNotFoundException
     */
    public function __construct()
    {
        if(!$this->hasExt()) {
            $extName = $this->getExtName();
            throw new DriverExtNotFoundException("php-ext: $extName not found. ");
        }
        $this->_storage = new Storage();
    }

    /**
     * @return resource[]
     */
    public function getReadFds(): array
    {
        return $this->_readFds;
    }

    /**
     * @return resource[]
     */
    public function getWriteFds(): array
    {
        return $this->_writeFds;
    }

    /**
     * @return array
     */
    public function getReads(): array
    {
        return $this->_reads;
    }

    /**
     * @return array
     */
    public function getWrites(): array
    {
        return $this->_writes;
    }

    /**
     * @return array
     */
    public function getSignals(): array
    {
        return $this->_signals;
    }

    /**
     * @return Storage
     */
    public function getStorage(): Storage
    {
        return $this->_storage;
    }

    /**
     * @return void
     */
    public function clear(): void
    {
        $this->_storage = new Storage();
        $this->_writeFds = [];
        $this->_readFds = [];
        $this->_writes = [];
        $this->_reads = [];
    }
}

Storage

final class Storage
{
    /** @var int  */
    private int $_count = 0;

    /** @var array storage */
    private array $_storage = [];

    /**
     * @param string $key
     * @param mixed|null $value
     * @return string
     */
    public function add(string $key, mixed $value): string
    {
        $this->_storage[$key] = $value;
        $this->_count ++;
        return $key;
    }

    /**
     * @param string $key
     * @param mixed|null $value
     * @return string
     */
    public function set(string $key, mixed $value): string
    {
        if($this->exist($key)){
            $this->_storage[$key] = $value;
        }
        return $key;
    }

    /**
     * @param string $key
     */
    public function del(string $key): void
    {
        unset($this->_storage[$key]);
        if ($this->_count > 0){
            $this->_count --;
        }
    }

    /**
     * @param string $key
     * @return mixed|null
     */
    public function get(string $key): mixed
    {
        return $this->exist($key) ? $this->_storage[$key] : null;
    }

    /**
     * @return int
     */
    public function count(): int
    {
        return $this->_count;
    }

    /**
     * @param string $key
     * @return bool
     */
    public function exist(string $key): bool
    {
        return isset($this->_storage[$key]);
    }

    /**
     * @return bool
     */
    public function isEmpty(): bool
    {
        return empty($this->_storage);
    }
}

以下是单元测试样本

/**
     * 测试@Future 的创建及自动销毁
     * More Tag @see LoopInterface::addTimer()
     *
     * @return void
     */
    public function testFuture(): void
    {
        $loop = new SwowLoop();
        $count = 0;
        // before create @Future
        $this->assertEquals(0, $loop->getStorage()->count());
        // create @Future
        $loop->addTimer(0.0, false, function () use(&$count) {
            $count ++;
        });
        // before loop start
        $this->assertEquals(0, $count);
        $this->assertEquals(1, $loop->getStorage()->count());
        // loop start
        $loop->addTimer(0.5, false, function () use(&$loop) {
            $loop->stop();
        });
        $loop->run();
        $this->assertEquals(1, $count);
        $this->assertEquals(0, $this->getLoop()->getStorage()->count());
    }

以下是我的php -m的信息

[PHP Modules]
Core
ctype
curl
date
dom
ds
ev
event
FFI
fileinfo
filter
ftp
hash
iconv
json
libxml
mbstring
mysqlnd
openssl
pcntl
pcre
PDO
pdo_sqlite
Phar
posix
readline
Reflection
session
SimpleXML
sockets
sodium
SPL
sqlite3
standard
tokenizer
xdebug
xml
xmlreader
xmlwriter
Zend OPcache
zip
zlib

[Zend Modules]
Xdebug
Zend OPcache

我在PHPunit中的setUp()函数中使用了dl('swow')来动态加载 ext-swow,没有使用php -d运行;

我的PHPunit版本是10.0.18

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discuss things in this issue
Projects
None yet
Development

No branches or pull requests

2 participants