Skip to content

hyperf/single-flight-incubator

Repository files navigation

Hyperf Concurrent Tools

一个hyperf常用并发工具库,包括single-flight、barrier、semaphore、worker-pool

安装

composer require hyperf/single-flight-incubtor

基本使用

所有例子都在examples目录下,更多用法请参考tests目录

single-flight

$ret = [];
$barrierKey = uniqid();

run(static function () use (&$ret, $barrierKey) {
    for ($i = 0; $i < 10; ++$i) {
        go(static function () use (&$ret, $barrierKey, $i) {
            $ret[] = SingleFlight::do($barrierKey, static function () use ($i) {
                // ensure that other coroutines can be scheduled at the same time
                usleep(1000);
                return [Coroutine::getCid() => $i];
            });
        });
    }
});

if (count(array_unique($ret)) === 1) {
    $ret = var_export($ret, true);
    printf("%s\n只有一个协程会执行闭包逻辑,其他协程等待其结果进行复用\n", $ret);
}

barrier

$parties = 10;
$barrier = new CounterBarrier($parties);
$sleepMs = 5;

run(static function () use ($parties, $barrier, $sleepMs) {
    for ($i = 0; $i < $parties - 1; ++$i) {
        go(static function () use ($barrier) {
            $waitAt = microtime(true);
            $barrier->await();
            // your biz logic here
            $resumeAt = microtime(true);
            $elapsed = ($resumeAt - $waitAt) * 1000;
            printf("协程 [%d] 等待 %.2f 毫秒后,恢复执行\n", Coroutine::getCid(), $elapsed);
        });
    }

    go(static function () use ($barrier, $sleepMs) {
        usleep($sleepMs * 1000);
        printf("协程 [%d] 作为最后一个协程,等待 %d 毫秒后加入屏障,同其他协程一起执行\n", Coroutine::getCid(), $sleepMs);
        $barrier->await();
        // your biz logic here
    });
});

semaphore

run(static function () {
    $sema = new Semaphore(3);

    go(static function () use ($sema) {
        $sleepSec = 1;
        $tokens = 1;
        defer(static function () use ($sema, $sleepSec, $tokens) {
            $sema->release($tokens);
            printf("协程 [%d] 占用信号量 %d 秒后释放\n", Coroutine::getCid(), $sleepSec);
        });

        $acquireAt = time();
        $sema->acquire($tokens);
        $resumedAt = time();
        $elapsed = $resumedAt - $acquireAt;
        printf("协程 [%d] 于 %d 秒后获取信号量成功\n", Coroutine::getCid(), $elapsed);
        sleep($sleepSec);
    });

    $chan = new Channel();
    go(static function () use ($sema, $chan) {
        $sleepSec = 2;
        $tokens = 2;
        defer(static function () use ($sema, $sleepSec, $tokens) {
            $sema->release($tokens);
            printf("协程 [%d] 占用信号量 %d 秒后释放\n", Coroutine::getCid(), $sleepSec);
        });

        $acquireAt = microtime(true);
        $sema->acquire($tokens);
        // 唤醒下面一个协程
        $chan->close();
        $resumedAt = microtime(true);
        $elapsed = ($resumedAt - $acquireAt) * 1000;
        printf("协程 [%d] 于 %d 秒后获取信号量成功\n", Coroutine::getCid(), $elapsed);
        sleep($sleepSec);
    });

    go(static function () use ($sema, $chan) {
        // 确保此协程在前一个协程后尝试获取信号量
        $chan->pop();
        $tokens = 3;
        defer(static function () use ($sema, $tokens) {
            $sema->release($tokens);
            printf("协程 [%d] 释放信号量\n", Coroutine::getCid());
        });

        $acquireAt = time();
        $sema->acquire($tokens);
        $resumedAt = time();
        $elapsed = $resumedAt - $acquireAt;
        printf("协程 [%d] 于 %d 秒后获取信号量成功\n", Coroutine::getCid(), $elapsed);
    });
});

worker-pool

run(static function () {
    $config = new Config();
    $config->setCapacity(5);
    $pool = new WorkerPool($config);
    defer(static fn () => $pool->stop());

    // 投递异步任务
    $mockBiz = static fn () => Coroutine::getCid();
    $pool->submit($mockBiz);

    // 投递同步任务,可直接获取结果
    $ret = $pool->submit($mockBiz, sync: true);
    if (Coroutine::getCid() != $ret) {
        printf("同步任务投递到worker-pool中的工作协程执行\n");
    }

    // 投递异步任务,通过waitResult方法获取结果
    $task = new Task($mockBiz(...), sync: false);
    $pool->submitTask($task);
    $ret = $task->waitResult();
    if (Coroutine::getCid() != $ret) {
        printf("异步任务投递到worker-pool中的工作协程执行\n");
    }
});

About

No description, website, or topics provided.

Resources

License

Security policy

Stars

Watchers

Forks

Releases

No releases published

Sponsor this project

Packages

No packages published

Languages