From c20ff88c1c64a6a5fd1b0290d7f927e41b1ba819 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Mon, 13 Nov 2017 15:36:01 +0100 Subject: [PATCH] Fixed `loop->watchers[w->fd] == w` and only run listeners that have events --- src/LibUvLoop.php | 78 ++++++++++++++++++++++++++++------------------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/src/LibUvLoop.php b/src/LibUvLoop.php index 5b3ae453..3001caeb 100644 --- a/src/LibUvLoop.php +++ b/src/LibUvLoop.php @@ -16,6 +16,7 @@ class LibUvLoop implements LoopInterface private $futureTickQueue; private $timerEvents; private $events = []; + private $flags = []; private $listeners = []; private $running; private $streamListener; @@ -65,10 +66,7 @@ public function removeReadStream($stream) unset($this->listeners[(int) $stream]['read']); - if (!isset($this->listeners[(int) $stream]['read']) - && !isset($this->listeners[(int) $stream]['write'])) { - unset($this->events[(int) $stream]); - } + $this->____removeStream($stream); } /** @@ -82,10 +80,7 @@ public function removeWriteStream($stream) unset($this->listeners[(int) $stream]['write']); - if (!isset($this->listeners[(int) $stream]['read']) - && !isset($this->listeners[(int) $stream]['write'])) { - unset($this->events[(int) $stream]); - } + $this->____removeStream($stream); } /** @@ -94,12 +89,10 @@ public function removeWriteStream($stream) public function removeStream($stream) { if (isset($this->events[(int) $stream])) { - - \uv_poll_stop($this->events[(int) $stream]); - unset($this->listeners[(int) $stream]['read']); unset($this->listeners[(int) $stream]['write']); - unset($this->events[(int) $stream]); + + $this->____removeStream($stream); } } @@ -207,6 +200,39 @@ public function stop() } private function addStream($stream) + { + // Run in tick or else things epically fail with loop->watchers[w->fd] == w + $this->futureTick(function () use ($stream) { + if (!isset($this->events[(int) $stream])) { + $this->events[(int) $stream] = \uv_poll_init_socket($this->uv, $stream); + } + + $this->pollStream($stream); + }); + } + + // To do: get latest changes in from react:master so we can use this method name internally + private function ____removeStream($stream) + { + // Run in tick or else things epically fail with loop->watchers[w->fd] == w + $this->futureTick(function () use ($stream) { + if (!isset($this->events[(int) $stream])) { + return; + } + + if (!isset($this->listeners[(int) $stream]['read']) + && !isset($this->listeners[(int) $stream]['write'])) { + \uv_poll_stop($this->events[(int) $stream]); + unset($this->events[(int) $stream]); + unset($this->flags[(int) $stream]); + return; + } + + $this->pollStream($stream); + }); + } + + private function pollStream($stream) { $flags = 0; if (isset($this->listeners[(int) $stream]['read'])) { @@ -217,14 +243,13 @@ private function addStream($stream) $flags |= \UV::WRITABLE; } - if (!isset($this->events[(int) $stream])) { - $event = \uv_poll_init_socket($this->uv, $stream); - $this->events[(int) $stream] = $event; - } else { - $event = $this->events[(int) $stream]; + if (isset($this->flags[(int) $stream]) && $this->flags[(int) $stream] == $flags) { + return; } - \uv_poll_start($event, $flags, $this->streamListener); + $this->flags[(int) $stream] = $flags; + + \uv_poll_start($this->events[(int) $stream], $flags, $this->streamListener); } /** @@ -236,24 +261,15 @@ private function createStreamListener() { $callback = function ($event, $status, $events, $stream) { if ($status !== 0) { - - $flags = 0; - if (isset($this->listeners[(int) $stream]['read'])) { - $flags |= \UV::READABLE; - } - - if (isset($this->listeners[(int) $stream]['write'])) { - $flags |= \UV::WRITABLE; - } - - \uv_poll_start($event, $flags, $this->streamListener); + unset($this->flags[(int) $stream]); + $this->pollStream($stream); } - if (isset($this->listeners[(int) $stream]['read'])) { + if (isset($this->listeners[(int) $stream]['read']) && $events & \UV::READABLE) { call_user_func($this->listeners[(int) $stream]['read'], $stream); } - if (isset($this->listeners[(int) $stream]['write'])) { + if (isset($this->listeners[(int) $stream]['write']) && $events & \UV::WRITABLE) { call_user_func($this->listeners[(int) $stream]['write'], $stream); } };