Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Simplify Adapter and queue implementation #200

Closed
g41797 opened this issue May 13, 2024 · 3 comments
Closed

[Enhancement] Simplify Adapter and queue implementation #200

g41797 opened this issue May 13, 2024 · 3 comments

Comments

@g41797
Copy link

g41797 commented May 13, 2024

Main purpose of this proposal - to simplify adding new broker to yii3 queue and queue implementation in general:

  • queue/worker/adapter code will be completely generic, smaller and simpler
  • brokerinterface supports only communication details of specific broker
    • no knowledge about queue/worker/etc internals
interface BrokerInterface
{
    public function withChannel(string $channel): self;

    public function push(MessageInterface $job): ?IdEnvelope;

    public function jobStatus(IdEnvelope $job): ?JobStatus;

    public function pull(float $timeout): ?IdEnvelope;

    public function done(IdEnvelope $job): bool;
}

Adapter code is completelly generic and rather straightforward see below POC:

class Adapter implements AdapterInterface
{
    public function __construct(
        private BrokerFactoryInterface $brokerFactory,
        private string              $channelName = QueueFactoryInterface::DEFAULT_CHANNEL_NAME,
        private array               $brokerConfiguration = [],
        private ?LoggerInterface    $logger = null,
        private ?LoopInterface      $loop = null,
        private float               $timeout = 1.0,
    ) {
        if (null == $loop ) {
            $loop = new NullLoop();
        }

        if (null == $logger) {
            $this->logger = new NullLogger();
        }

        $this->getBroker();
    }

    public function push(MessageInterface $message): MessageInterface
    {
        return $this->broker->push($message);
    }

    public function status(int|string $id): JobStatus
    {

        $jobStatus = $this->broker->jobStatus($id);

        if ($jobStatus == null)
        {
            throw new \InvalidArgumentException('job does not exist');
        }

        return $jobStatus;
    }

    public function runExisting(callable $handlerCallback): void
    {
        $this->processJobs($handlerCallback, continueOnEmptyQueue: false);
    }

    public function subscribe(callable $handlerCallback): void
    {
        $this->processJobs($handlerCallback, continueOnEmptyQueue: true);
    }

    private function processJobs(callable $handlerCallback, bool $continueOnEmptyQueue): void
    {
        $result = true;
        while ($this->loop->canContinue() && $result === true) {
            $job = $this->broker->pull($this->timeout);
            if (null === $job) {
                if ($continueOnEmptyQueue)
                {
                    continue;
                }
                break;
            }
            $result = $handlerCallback($job);
            $this->broker->done($job->getId());
        }
    }

rest of the code see queue-nats repo

@samdark
Copy link
Member

samdark commented May 14, 2024

@viktorprogger what do you think?

@xepozz
Copy link
Contributor

xepozz commented May 14, 2024

I have some thoughts about the idea:

withConfig() – delete

I think it must be encapsulated into a broker/driver/adapter. Just use __construct() method.

isConnected() – delete

Not all brokers can support real-time connection. Some can just open a connection, do job (push/pull) and close, so-called connectionless operations, like filesystem or an HTTP based driver.
Moreover, there are no reasons to keep the method in the interface because there are no connect()/disconnect() methods.

submit() / next() – rename to push() / pull(), exist

See https://github.com/yiisoft/queue/blob/master/src/Adapter/AdapterInterface.php#L34, https://github.com/yiisoft/queue/blob/master/src/Adapter/AdapterInterface.php#L18

jobStatus() – move

See https://github.com/yiisoft/queue/blob/master/src/Adapter/AdapterInterface.php#L29

It's also present in another interface: https://github.com/yiisoft/queue/blob/master/src/QueueInterface.php#L46

next() – refactor

There are interesting things that next() (or pull()) can return a message object from the queue. Current implementation accepts a callable and process it inside.

update() – delete

Not all drivers can update status, because it's not just a property of an object.
If you want to change status from in process to waiting you must ask the worker that processes the job to stop it's work and return the job back to the queue. In some drivers it's impossible and the only possible way is to push the job as a new message.

Conclusion

I don't see any reasons to accept the proposal, there are no gain in UX / DX.

But I think it could be useful to have next($timeout) method to retrieve a message.

@g41797 g41797 changed the title [Enhancement] Simplify Adapter interface and queue implementation in general [Enhancement] Simplify Adapter and queue implementation May 14, 2024
@xepozz
Copy link
Contributor

xepozz commented May 20, 2024

@g41797 still no changes, all of this is available now.
You made a snippet for userland. There are place for this.

@g41797 g41797 closed this as not planned Won't fix, can't repro, duplicate, stale May 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants