diff --git a/src/Clients/Consumer/KafkaConsumer.php b/src/Clients/Consumer/KafkaConsumer.php index 9c0bc00..04ebebe 100644 --- a/src/Clients/Consumer/KafkaConsumer.php +++ b/src/Clients/Consumer/KafkaConsumer.php @@ -146,6 +146,11 @@ public function shutdown(): void { $this->logger->debug('Shutting down'); + $this->stop(); + } + + public function stop(): void + { $this->shouldRun = false; } @@ -160,7 +165,9 @@ private function doStart( callable|null $onPartitionEof = null, callable|null $onTimedOut = null, ): void { - $this->registerSignals($this->shouldRun); + $this->shouldRun = true; + $terminationCallback = fn () => $this->stop(); + $this->registerSignals($terminationCallback); while ($this->shouldRun) { $message = $this->consume($timeoutMs); diff --git a/src/Clients/Consumer/WithSignalControl.php b/src/Clients/Consumer/WithSignalControl.php index fba7346..4c9677a 100644 --- a/src/Clients/Consumer/WithSignalControl.php +++ b/src/Clients/Consumer/WithSignalControl.php @@ -4,6 +4,8 @@ namespace SimPod\Kafka\Clients\Consumer; +use Safe\Exceptions\PcntlException; + use function Safe\pcntl_signal; use function Safe\pcntl_sigprocmask; @@ -22,12 +24,13 @@ private function setupInternalTerminationSignal(ConsumerConfig $config): void $config->set('internal.termination.signal', SIGIO); } - private function registerSignals(bool &$shouldRun): void + /** + * @param callable():mixed $terminationCallback + * + * @throws PcntlException + */ + private function registerSignals(callable $terminationCallback): void { - $terminationCallback = static function () use (&$shouldRun): void { - $shouldRun = false; - }; - pcntl_signal(SIGTERM, $terminationCallback); pcntl_signal(SIGINT, $terminationCallback); pcntl_signal(SIGHUP, $terminationCallback);