Skip to content

Commit

Permalink
[Messenger] log messages that are consumed by the command
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas-grekas committed Nov 24, 2018
1 parent 4c1e8bd commit ca482d8
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
22 changes: 11 additions & 11 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Expand Up @@ -94,15 +94,15 @@ protected function configure(): void
*/
protected function interact(InputInterface $input, OutputInterface $output)
{
$style = new SymfonyStyle($input, $output);
$io = new SymfonyStyle($input, $output);

if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
if (null === $receiverName) {
$style->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames));
$io->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
$style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$input->setArgument('receiver', $alternatives[0]);
}
}
Expand All @@ -111,17 +111,17 @@ protected function interact(InputInterface $input, OutputInterface $output)
$busName = $input->getOption('bus');
if ($this->busNames && !$this->busLocator->has($busName)) {
if (null === $busName) {
$style->block('Missing bus argument.', null, 'error', ' ', true);
$input->setOption('bus', $style->choice('Select one of the available buses', $this->busNames));
$io->block('Missing bus argument.', null, 'error', ' ', true);
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
$style->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);

if (1 === \count($alternatives)) {
if ($style->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
$input->setOption('bus', $alternatives[0]);
}
} else {
$input->setOption('bus', $style->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
}
}
}
Expand Down Expand Up @@ -156,7 +156,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
}

$worker = new Worker($receiver, $bus);
$worker->run();
$worker->run($output);
}

private function convertToBytes(string $memoryLimit): int
Expand Down
21 changes: 18 additions & 3 deletions src/Symfony/Component/Messenger/Worker.php
Expand Up @@ -11,13 +11,17 @@

namespace Symfony\Component\Messenger;

use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*
* @experimental in 4.2
*
* @final
*/
class Worker
{
Expand All @@ -33,20 +37,31 @@ public function __construct(ReceiverInterface $receiver, MessageBusInterface $bu
/**
* Receive the messages and dispatch them to the bus.
*/
public function run()
public function run(OutputInterface $output = null)
{
if (\function_exists('pcntl_signal')) {
pcntl_signal(SIGTERM, function () {
$this->receiver->stop();
});
}

$this->receiver->receive(function (?Envelope $envelope) {
$this->receiver->receive(function (?Envelope $envelope) use ($output) {
if (null === $envelope) {
return;
}

$this->bus->dispatch($envelope->with(new ReceivedStamp()));
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));

if (!$output) {
return;
}

$handlers = array();
foreach ($envelope->all(HandledStamp::class) as $stamp) {
$handlers[] = $stamp->getHandlerAlias() ?? $stamp->getCallableName();
}

$output->writeln(sprintf('Message "<comment>%s</>" handled by "<comment>%s</>"', \get_class($envelope->getMessage()), implode('", "', $handlers ?? array('???'))));
});
}
}

0 comments on commit ca482d8

Please sign in to comment.