-
-
Notifications
You must be signed in to change notification settings - Fork 166
Closed
Labels
Description
I'm using the browser http client against a react http server that sends and receives NDJson feeds in a stream to stream fashion (like a packet of Json query for the DB that stands behind, the DB handle each query and feeds the results).
But at one point the input stream is paused by the request stream but the input stream is never resumed (the drain event is not triggered). I've stripped and sorted all interfering parts (haproxy, timeouts, ...).
I have found a workaround with an auto resume timeout in the input stream, it is working fine...
But my question : what am I doing wrong ?
Here is the kitchen side:
...
/**
* @return ResponseInterface
*/
public function ndjsonPipeQuery(string $method, string $path, callable $processItem, array $query = [], $body = '')
{
$loop = Loop::get();
if ($body instanceof \Generator) {
$body = new GeneratorStream($body, $loop, [
'auto_resume_timeout' => 0.5,
'memory_throttle' => 10 * 1024 * 1024,
]);
}
$l = 0;
$res = null;
$this->reactStream($method, $path, $query, $body)->then(function (ResponseInterface $response) use ($processItem, &$l, &$res) {
$res = $response;
$body = $response->getBody();
if ($body instanceof ReadableStreamInterface) {
$in = new \Clue\React\NDJson\Decoder($body, false, 512, 0, 10 * 1024 * 1024);
$in->on('data', function ($data) use ($processItem, &$l) {
++$l;
// $data = \json_decode($data);
\call_user_func_array($processItem, [$data]);
});
$body->on('error', function (\Exception $e) {
throw $e;
});
$in->on('error', function (\Exception $e) {
throw $e;
});
} else {
throw new \LogicException("D'oh!?");
}
}, function (\RuntimeException $reason) {
throw $reason;
});
$loop->run();
return $res;
}
/**
* @return PromisePromiseInterface<ResponseInterface,\Exception>
*/
public function reactStream(string $method, string $path, array $query, $body = '', array $headers = [])
{
$reactBrowser = new \React\Http\Browser();
$headers['User-Agent'] = 'xxx';
$headers['X-Api-Key'] = $this->assertOption('api_key');
$url = $this->generateUri($path, $query);
return $reactBrowser->requestStreaming($method, $url, $headers, $body);
}
...The input stream is homemade. It's wrapping a generator and handle memory throttling.
<?php
namespace XXX\Stream;
use Evenement\EventEmitterTrait;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;
/**
* Stream to wrap Generator.
*
* @author david
*/
class GeneratorStream implements ReadableStreamInterface
{
use EventEmitterTrait;
protected $cursor;
protected $closed = false;
protected $paused = false;
protected $ticking = false;
protected $loop;
protected $options;
protected $startMemory;
public function __construct(\Generator $cursor, LoopInterface $loop = null, array $options = [])
{
$this->cursor = $cursor;
$this->options = $options + [
'chunk_size' => 100,
'memory_throttle' => 0,
'memory_throttle_interval' => 0.1,
'auto_resume_timeout' => false,
];
$this->loop = $loop ?: Loop::get();
$this->startMemory = \memory_get_usage(true);
$this->loop->futureTick([$this, 'handle']);
}
public function pause()
{
if ($this->closed || $this->paused) {
return;
}
$this->paused = true;
}
public function resume()
{
if (!$this->paused || $this->closed) {
return;
}
$this->paused = false;
$this->handle();
}
public function pipe(WritableStreamInterface $dest, array $options = [])
{
return Util::pipe($this, $dest, $options);
}
public function isReadable()
{
return !$this->closed;
}
public function close()
{
if ($this->closed) {
return;
}
$this->paused = false;
$this->closed = true;
$this->emit('close');
$this->removeAllListeners();
}
/**
* @internal
*/
public function handle()
{
if ($this->closed) {
return;
}
if ($this->paused) {
$this->throttle();
return;
}
$chunkSize = $this->options['chunk_size'];
$memoryThrottle = $this->options['memory_throttle'];
$startMemory = $this->startMemory;
// on va limiter l'emprunte mémoire
$usedMemory = 0;
$i = 0;
while ($this->cursor->valid() && (($chunkSize > 0 && $i < $chunkSize) || 0 == $chunkSize)) {
$raw = $this->cursor->current();
// il faut que l'item soit stringifiable
$data = (string) $raw;
$this->emit('data', [$data]);
$this->cursor->next();
++$i;
if ($memoryThrottle > 0) {
$usedMemory = \memory_get_usage(true) - $startMemory;
if ($usedMemory > $memoryThrottle) {
break;
}
}
}
if ($this->cursor->valid()) {
$this->throttle();
} else {
$this->emit('end');
$this->close();
}
}
protected $pausedAt;
protected function throttle()
{
$memoryThrottle = $this->options['memory_throttle'];
$memoryThrottleInterval = $this->options['memory_throttle_interval'];
$startMemory = $this->startMemory;
$usedMemory = \memory_get_usage(true) - $startMemory;
if ($this->paused) {
$autoResumeTimeout = \floatval($this->options['auto_resume_timeout'] ?? false);
if ($autoResumeTimeout > 0) {
if (empty($this->pausedAt)) {
$this->pausedAt = \microtime(true);
} else {
if (\microtime(true) > $this->pausedAt + $autoResumeTimeout) {
$this->pausedAt = null;
// ugly but effective
$this->resume();
}
}
}
}
if ($memoryThrottle > 0 && $usedMemory > $memoryThrottle && $memoryThrottleInterval > 0) {
$this->loop->addTimer($memoryThrottleInterval, [$this, 'handle']);
} else {
$this->loop->futureTick([$this, 'handle']);
}
}
}composer infos
clue/ndjson-react v1.2.0 Streaming newline-delimited JSON (NDJSON) p...
react/cache v1.1.1 Async, Promise-based cache interface for Re...
react/dns v1.10.0 Async DNS resolver for ReactPHP
react/event-loop v1.3.0 ReactPHP's core reactor event loop that lib...
react/http v1.8.0 Event-driven, streaming HTTP client and ser...
react/promise v2.9.0 A lightweight implementation of CommonJS Pr...
react/promise-stream v1.5.0 The missing link between Promise-land and S...
react/promise-timer v1.9.0 A trivial implementation of timeouts for Pr...
react/socket v1.12.0 Async, streaming plaintext TCP/IP and secur...
react/stream v1.2.0 Event-driven readable and writable streams ...