diff --git a/src/Service/IteratePagesService.php b/src/Service/IteratePagesService.php index 98f9797503..6915c75f34 100644 --- a/src/Service/IteratePagesService.php +++ b/src/Service/IteratePagesService.php @@ -6,9 +6,11 @@ use Psr\Http\Message\ResponseInterface; use React\Promise\CancellablePromiseInterface; use RingCentral\Psr7\Request; +use Rx\AsyncSchedulerInterface; use Rx\Disposable\CallbackDisposable; use Rx\Observable; use Rx\ObserverInterface; +use Rx\Scheduler; use Rx\SchedulerInterface; use Rx\Subject\Subject; use function React\Promise\all; @@ -21,32 +23,57 @@ class IteratePagesService */ private $requestService; + /** + * @var AsyncSchedulerInterface + */ + private $scheduler; + /** * @param RequestService $requestService + * @param AsyncSchedulerInterface $scheduler */ - public function __construct(RequestService $requestService) + public function __construct(RequestService $requestService, AsyncSchedulerInterface $scheduler = null) { + $this->scheduler = $scheduler ?: Scheduler::getAsync(); $this->requestService = $requestService; } public function iterate(string $path): Observable { - return Observable::create(function ( - ObserverInterface $observer - ) use ($path) { - $subject = new Subject(); - $subject->asObservable()->subscribe( - [$observer, 'onNext'], - [$observer, 'onError'], - [$observer, 'onCompleted'] - ); - - $this->sendRequest($path, $subject); - - return new CallbackDisposable(function () use ($subject) { - $subject->dispose(); + $paths = new Subject(); + + return Observable::of($path, $this->scheduler) + ->merge($paths) + ->flatMap(function ($path) { + return Observable::fromPromise($this->requestService->handle(new Request('GET', $path))); + }) + ->do(function (ResponseInterface $response) use ($paths) { + if (!$response->hasHeader('link')) { + return; + } + + $links = [ + 'next' => false, + 'last' => false, + ]; + foreach (explode(', ', $response->getHeader('link')[0]) as $link) { + list($url, $rel) = explode('>; rel="', ltrim(rtrim($link, '"'), '<')); + if (isset($links[$rel])) { + $links[$rel] = $url; + } + } + + if ($links['next'] === false || $links['last'] === false) { + return; + } + + $this->scheduler->schedule(function () use ($paths, $links) { + $paths->onNext($links['next']); + }); + }) + ->map(function (ResponseInterface $response) { + return $response->getBody()->getJson(); }); - }); } private function sendRequest(string $path, Subject $subject) diff --git a/tests/Service/IteratePagesServiceTest.php b/tests/Service/IteratePagesServiceTest.php index c5572ca23a..18ee688862 100644 --- a/tests/Service/IteratePagesServiceTest.php +++ b/tests/Service/IteratePagesServiceTest.php @@ -10,6 +10,7 @@ use RingCentral\Psr7\Request; use RingCentral\Psr7\Response; use function React\Promise\resolve; +use Rx\Testing\TestScheduler; final class IteratePagesServiceTest extends TestCase { @@ -54,84 +55,13 @@ public function testHandle() $client->request($thirdRequest, Argument::type('array'))->shouldNotBeCalled(); $requestService = new RequestService($client->reveal()); - $iteratePagesService = new IteratePagesService($requestService); + $testScheduler = new TestScheduler(); + $iteratePagesService = new IteratePagesService($requestService, $testScheduler); $items = []; $completed = false; - $stream = $iteratePagesService->iterate($path)->subscribe( - function ($item) use (&$items, &$stream) { - $items[] = $item; - - if (count($items) == 2) { - $stream->dispose(); - } - }, - function ($t) { - throw $t; - }, - function () use (&$completed) { - $completed = true; - } - ); - - self::assertFalse($completed); - self::assertSame([$firstBody, $secondBody], $items); - } - - public function testHandleTakingTwoWillStillMakeThirdRequest() - { - $path = '/foo.bar'; - - $client = $this->prophesize(ClientInterface::class); - - /** - * First request - */ - $firstRequest = new Request('GET', '/foo.bar'); - $firstBody = ['a']; - $firstStream = new JsonStream($firstBody); - $firstHeaders = [ - 'Link' => [ - '; rel="next", ; rel="last"', - ], - ]; - $firstResponse = new Response(200, $firstHeaders, $firstStream); - $client->request($firstRequest, Argument::type('array'))->shouldBeCalled()->willReturn(resolve($firstResponse)); - - /** - * Second request - */ - $secondRequest = new Request('GET', 'https://api.example.com/1'); - $secondBody = ['b']; - $secondStream = new JsonStream($secondBody); - $secondHeaders = [ - 'Link' => [ - '; rel="next", ; rel="last"', - ], - ]; - $secondResponse = new Response(200, $secondHeaders, $secondStream); - $client->request($secondRequest, Argument::type('array'))->shouldBeCalled()->willReturn(resolve($secondResponse)); - /** - * Third request - */ - $thirdRequest = new Request('GET', 'https://api.example.com/2'); - $thirdBody = ['b']; - $thirdStream = new JsonStream($thirdBody); - $thirdHeaders = [ - 'Link' => [ - '; rel="last"', - ], - ]; - $thirdResponse = new Response(200, $thirdHeaders, $thirdStream); - $client->request($thirdRequest, Argument::type('array'))->shouldBeCalled()->willReturn(resolve($thirdResponse)); - - $requestService = new RequestService($client->reveal()); - $iteratePagesService = new IteratePagesService($requestService); - - $items = []; - $completed = false; - $iteratePagesService->iterate($path)->take(2)->subscribe( + $stream = $iteratePagesService->iterate($path)->take(2)->subscribe( function ($item) use (&$items, &$stream) { $items[] = $item; }, @@ -143,6 +73,8 @@ function () use (&$completed) { } ); + $testScheduler->start(); + self::assertTrue($completed); self::assertSame([$firstBody, $secondBody], $items); }