Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unnecessary 'resume' of an input stream #525

Open
wants to merge 3 commits into
base: 1.x
Choose a base branch
from

Conversation

skydiablo
Copy link

I have cleared up behavior here that I couldn't comprehend. A "resume" at this point leads to several streams that need to be managed again by the loop, which were previously removed by a close-event. With a high number of client requests, the number of possible streams becomes too high, leading to a crash. The streams added back to the loop here eventually disappear "automagically," but this is not really comprehensible to me. Additionally, there is a significant increase in memory usage.

@SimonFrings
Copy link
Member

Hey @skydiablo, thanks for your contribution 👍

This is actually a design choice made in #315 which @clue also slightly described in #210. Here's a "short" example that showcases why this decision was made:

When a client sends a request to the server, the server validates this request first and then forwards the incoming data to the application. If the application pauses the data flow, the server needs to notify the client to stop sending data (pause client). If the application then ends the entire data flow completely, no one can tell the server when to let the client continue. The server is also not able to inform the client about this and there's no way to detect the client closing the connection.

So in order for the client to continue sending more requests over the connection (or do anything at all), the previous request, that is currently paused, must first be finished. This also means, it's now the servers responsibility to decide when the client should resume. Since nothing meaningful can be done with the pending data from this point on, the server simply lets the client continue until its finished and discards all incoming data. Once done, the server can send a response and accept following requests.

I also saw your conversation with @clue on Gitter and the shared code snippet (down below) looks like it should've worked.

$server = new React\Http\HttpServer(
    new React\Http\Middleware\StreamingRequestMiddleware(),
    function (ServerRequestInterface $request, callable $next) {
        if (($request->getHeader('Content-Encoding')[0] ?? null) === 'gzip') {
            $decompressor = new \Clue\React\Zlib\Decompressor(ZLIB_ENCODING_GZIP);
            /** @var React\Stream\ReadableStreamInterface $body */
            $body = $request->getBody();
            $body->pipe($decompressor);
            $stream = new \React\Http\Io\ReadableBodyStream($decompressor);
            $request = $request->withBody($stream);
        }
        return $next($request);
    },
    function (ServerRequestInterface $request) {

        echo 'Receive data!' . PHP_EOL;
        $body = $request->getBody();

        return new React\Promise\Promise(function ($resolve, $reject) use ($body, ...

Can you share a full example to take a further look at this :)

@skydiablo
Copy link
Author

okay, i see... can i force to close the socket by my own?

@skydiablo
Copy link
Author

context: i have implemented an influxDB line-protokoll sever, to proxy this into another service. so i'm using the replaction API from indluxDB itself (https://docs.influxdata.com/influxdb/v2/write-data/replication/replicate-data/) to receive new data. the influxDB server is flooding me with sockets and data, and it seems that the connection is not instantly disconnected from influxDB server. so i have many open connection and hit the hard-limit of 1024 sockets in a short time. after some time, influxDB is closing the connection, but the connection seems never be reused. so may i can hard close this connection by my server.

@skydiablo
Copy link
Author

hmmm, i realy play around with this issue but i can not found a workaround with original code. maybe someone can put me in the right direction? i'm running all the time in the streams limit of 1024 ... the only thing what i can do is maybe put a load balancer in front of this and spawn some of my services?

@skydiablo
Copy link
Author

skydiablo commented Jun 7, 2024

okay, this is realy ugly but currently works for me, abstract:

/**
 * a hacky private property value extractor from any object
 * @return Closure function(object $object, string $property) {}
 */
function propertyReader(): Closure
{
    return function &(object $object, string $property) {
        $value = &Closure::bind(function &() use ($property) {
            return $this->$property;
        }, $object, $object)->__invoke();
        return $value;
    };
}


$server = new React\Http\HttpServer(
    new StreamingRequestMiddleware(),
    function (ServerRequestInterface $request, callable $next) {
        return $next($request->withAttribute('source-body', $request->getBody()));
    },
    new GzipStreamMiddleware(),
    function (ServerRequestInterface $request, callable $next) {
        /** @var \React\Http\Io\HttpBodyStream $sourceBody */
        $sourceBody = $request->getAttribute('source-body');
        /** @var \React\Http\Io\ChunkedDecoder $input */
        $input = $sourceBody->input; // public property
        $reader = propertyReader();
        /** @var \React\Http\Io\CloseProtectionStream $input */
        $closeInput = $input = $reader($input, 'input'); //input is private, we have to extract it...
        /** @var \React\Socket\Connection $input */
        $input = $reader($input, 'input'); //input is private, we have to extract it...
        $stream = $input->stream; // real stream
        $closeInput->on('close', function () use ($stream) {
            // the close handler from `CloseProtectionStream` is reattached the stream to the loop,
            // we do not need this and this will bring us to the internal PHP max stream count handling.
            // So we remove this stream and right now and all works fine, ugly but effective!
            Loop::get()->removeReadStream($stream);
        });
        return $next($request);
    },
    function (ServerRequestInterface $request) {
//          ...

my two cent to workaround this personal issue...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants