From 80aa37ae19cb2b080c4dc5dfa759cc4ca0989ef1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gabriel=20Ostroluck=C3=BD?= Date: Sun, 21 Apr 2019 15:01:21 +0200 Subject: [PATCH] Add --buffer-size option This was introduced to be able to handle out of disk size errors. In such case, buffering stops and STDIN is consumed by Responder directly. Originally I wanted this mechanism to be triggered by write failure, but upstream is making this effort harder - see: https://github.com/amphp/byte-stream/pull/52 https://github.com/amphp/byte-stream/pull/54 --- bin/composer.json | 0 src/Bufferer/AbstractBufferer.php | 35 +++++++++++++ src/Bufferer/BuffererInterface.php | 22 -------- src/Bufferer/PipeBufferer.php | 71 ++++++++++++++----------- src/Bufferer/ResolvedBufferer.php | 13 +---- src/Command.php | 66 ++++++++++++++++++----- src/ProgressBar.php | 7 +-- src/Responder.php | 40 +++++++++----- tests/IntegrationTest.php | 84 ++++++++++++++++++++++++++++++ tests/ResponderTest.php | 4 +- 10 files changed, 245 insertions(+), 97 deletions(-) delete mode 100644 bin/composer.json create mode 100644 src/Bufferer/AbstractBufferer.php delete mode 100644 src/Bufferer/BuffererInterface.php create mode 100644 tests/IntegrationTest.php diff --git a/bin/composer.json b/bin/composer.json deleted file mode 100644 index e69de29..0000000 diff --git a/src/Bufferer/AbstractBufferer.php b/src/Bufferer/AbstractBufferer.php new file mode 100644 index 0000000..76c3cbd --- /dev/null +++ b/src/Bufferer/AbstractBufferer.php @@ -0,0 +1,35 @@ +filePath = $filePath; + } + + abstract public function __invoke(): Promise; + + abstract public function isBuffering(): bool; + + abstract public function waitForWrite(): Promise; + + abstract public function getMimeType(): Promise; + + abstract public function getCurrentProgress(): int; +} diff --git a/src/Bufferer/BuffererInterface.php b/src/Bufferer/BuffererInterface.php deleted file mode 100644 index c972710..0000000 --- a/src/Bufferer/BuffererInterface.php +++ /dev/null @@ -1,22 +0,0 @@ -logger = $logger; - $this->inputStream = new ResourceInputStream($inputStream); - $this->outputStream = new ResourceOutputStream($fOutput = $outputPath ? fopen($outputPath, 'wb') : tmpfile()); + $this->inputStream = $inputStream; + $this->outputStream = $outputStream; + $this->server = $server; $this->mimeType = new Deferred(); - $this->filePath = $outputPath ?: stream_get_meta_data($fOutput)['uri']; $this->progressBar = new ProgressBar($output, 0, 'buffer'); + $this->bufferSize = $bufferSize; + + $filePath = ''; + + if ($outputStream instanceof ResourceOutputStream) { + $filePath = stream_get_meta_data($outputStream->getResource())['uri']; + } + + parent::__construct($filePath); } public function __invoke(): Promise @@ -72,34 +80,39 @@ public function __invoke(): Promise $generator = function (): \Generator { $this->logger->debug("Saving stdin to $this->filePath"); - $bytesDownloaded = 0; while (null !== $chunk = yield $this->inputStream->read()) { yield $this->outputStream->write($chunk); - if ($bytesDownloaded === 0) { + if ($this->progressBar->step === 0) { $mimeType = (new \finfo(FILEINFO_MIME))->buffer($chunk); $this->logger->debug(sprintf('Stdin MIME type detected: "%s"', $mimeType)); $this->mimeType->resolve($mimeType); } - $this->progressBar->setProgress($bytesDownloaded += strlen($chunk)); + $this->progressBar->advance(strlen($chunk)); $this->resolveDeferrer(); + + if ($this->progressBar->step < $this->bufferSize) { + continue; + } + + $this->logger->warning( + 'Max buffer size reached. Disabling buffering and falling back to piping stdin to socket directly. No new client connections will be accepted.' + ); + $this->server->close(); + + break; } $this->buffering = false; $this->progressBar->finish(); - $this->logger->debug("Stdin transfer done, $bytesDownloaded bytes downloaded"); + $this->logger->debug("Buffering to file stopped, {$this->progressBar->step} bytes stored"); $this->resolveDeferrer(); }; return new Coroutine($generator()); } - public function getFilePath(): string - { - return $this->filePath; - } - public function getMimeType(): Promise { return $this->mimeType->promise(); diff --git a/src/Bufferer/ResolvedBufferer.php b/src/Bufferer/ResolvedBufferer.php index 32d9472..1ee4a4d 100644 --- a/src/Bufferer/ResolvedBufferer.php +++ b/src/Bufferer/ResolvedBufferer.php @@ -7,12 +7,8 @@ use Amp\Promise; use Amp\Success; -class ResolvedBufferer implements BuffererInterface +class ResolvedBufferer extends AbstractBufferer { - /** - * @var string - */ - private $filePath; /** * @var int */ @@ -24,7 +20,7 @@ class ResolvedBufferer implements BuffererInterface public function __construct(string $filePath) { - $this->filePath = $filePath; + parent::__construct($filePath); $this->filesize = filesize($filePath); $this->mimeType = (new \finfo(FILEINFO_MIME))->file($filePath); } @@ -34,11 +30,6 @@ public function __invoke(): Promise return new Success(); } - public function getFilePath(): string - { - return $this->filePath; - } - public function isBuffering(): bool { return false; diff --git a/src/Command.php b/src/Command.php index d0341e9..2923fe8 100644 --- a/src/Command.php +++ b/src/Command.php @@ -4,9 +4,14 @@ namespace Ostrolucky\Stdinho; +use Amp\ByteStream\ResourceInputStream; +use Amp\ByteStream\ResourceOutputStream; use Amp\Loop; +use Amp\Socket\Server; +use Ostrolucky\Stdinho\Bufferer\AbstractBufferer; use Ostrolucky\Stdinho\Bufferer\PipeBufferer; use Ostrolucky\Stdinho\Bufferer\ResolvedBufferer; +use Psr\Log\LoggerInterface; use Symfony\Component\Console\Exception\InvalidOptionException; use Symfony\Component\Console\Exception\LogicException; use Symfony\Component\Console\Input\InputArgument; @@ -52,6 +57,12 @@ protected function configure(): void 'Determines after how many client connections should program shut down', INF ) + ->addOption( + 'buffer-size', + 'b', + InputOption::VALUE_REQUIRED, + 'Buffer size in bytes. By default, it is 90% of available disk space' + ) ->setDescription('Turn any STDIN/STDOUT into HTTP server') ; } @@ -96,28 +107,55 @@ protected function initialize(InputInterface $input, OutputInterface $output): v */ protected function execute(InputInterface $input, OutputInterface $output): int { + $addressPort = $input->getArgument('addressPort'); + $bufferSize = $input->getOption('buffer-size'); + $connectionsLimit = (float)$input->getOption('connections-limit'); + $filePath = $input->getOption('file'); + + $server = listen($addressPort); $logger = new ConsoleLogger($firstSection = $output->section()); + $bufferer = $this->createBufferer($output, $logger, $server, $filePath, $bufferSize); - $bufferer = $this->hasStdin ? - new PipeBufferer($logger, STDIN, $input->getOption('file'), $output->section()) : - new ResolvedBufferer($input->getOption('file')) - ; + $firstSection->writeln( + "Connection opened at http://{$server->getAddress()}\nPress CTRL+C to exit.\n" + ); - $bufferHandler = asyncCoroutine($bufferer); - $clientHandler = asyncCoroutine(new Responder($logger, $bufferer, $output, $this->customHttpHeaders)); + Loop::run(function () use (&$connectionsLimit, $server, $logger, $output, $bufferer) { + asyncCoroutine($bufferer)(); - Loop::run(function () use ($input, $clientHandler, $firstSection, $bufferHandler) { - $bufferHandler(); - $server = listen($input->getArgument('addressPort')); - $firstSection->writeln( - "Connection opened at http://{$server->getAddress()}\nPress CTRL+C to exit.\n" - ); - $connectionsLimit = $input->getOption('connections-limit'); while ($connectionsLimit-- && ($socket = yield $server->accept())) { - $clientHandler($socket); + $responder = new Responder( + $logger, + $bufferer, + $output, + $this->customHttpHeaders, + new ResourceInputStream(fopen($bufferer->filePath, 'rb')) + ); + asyncCoroutine($responder)($socket); } }); return 0; } + + private function createBufferer( + ConsoleOutput $output, + LoggerInterface $logger, + Server $server, + ?string $filePath, + ?string $bufferSize + ): AbstractBufferer { + if (!$this->hasStdin) { + return new ResolvedBufferer($filePath); + } + + return new PipeBufferer( + $logger, + new ResourceInputStream(STDIN), + new ResourceOutputStream($filePath ? fopen($filePath, 'wb') : tmpfile()), + $output->section(), + $server, + (int)($bufferSize ?? disk_free_space($filePath ?: sys_get_temp_dir()) * .9) + ); + } } diff --git a/src/ProgressBar.php b/src/ProgressBar.php index d6bd815..dcfdaa3 100644 --- a/src/ProgressBar.php +++ b/src/ProgressBar.php @@ -102,12 +102,7 @@ public function __construct(ConsoleSectionOutput $output, int $max, string $form */ public function advance(int $step): void { - $this->setProgress($this->step + $step); - } - - public function setProgress(int $step): void - { - $this->step = $step; + $this->step += $step; $this->percent = $this->max ? (float)$step / $this->max : 0; if (microtime(true) - $this->lastWriteTime < ($this->output->isDecorated() ? .1 : 1)) { diff --git a/src/Responder.php b/src/Responder.php index b5507af..5e5d94e 100644 --- a/src/Responder.php +++ b/src/Responder.php @@ -4,10 +4,10 @@ namespace Ostrolucky\Stdinho; -use Amp\ByteStream\ResourceInputStream; +use Amp\ByteStream\InputStream; use Amp\ByteStream\StreamException; use Amp\Socket\Socket; -use Ostrolucky\Stdinho\Bufferer\BuffererInterface; +use Ostrolucky\Stdinho\Bufferer\AbstractBufferer; use Psr\Log\LoggerInterface; use Symfony\Component\Console\Output\ConsoleOutput; @@ -18,7 +18,7 @@ class Responder */ private $logger; /** - * @var BuffererInterface + * @var AbstractBufferer */ private $bufferer; /** @@ -29,20 +29,26 @@ class Responder * @var string[] */ private $customHttpHeaders = []; + /** + * @var InputStream + */ + private $inputStream; /** * @param string[] $customHttpHeaders */ public function __construct( LoggerInterface $logger, - BuffererInterface $bufferer, + AbstractBufferer $bufferer, ConsoleOutput $consoleOutput, - array $customHttpHeaders + array $customHttpHeaders, + InputStream $inputStream ) { $this->logger = $logger; $this->bufferer = $bufferer; $this->consoleOutput = $consoleOutput; $this->customHttpHeaders = $customHttpHeaders; + $this->inputStream = $inputStream; } public function __invoke(Socket $socket): \Generator @@ -52,7 +58,7 @@ public function __invoke(Socket $socket): \Generator $header = [ 'HTTP/1.1 200 OK', - 'Content-Disposition: inline; filename="'.basename($this->bufferer->getFilePath()).'"', + 'Content-Disposition: inline; filename="'.basename($this->bufferer->filePath).'"', 'Content-Type:'.yield $this->bufferer->getMimeType(), 'Connection: close', ]; @@ -68,8 +74,6 @@ public function __invoke(Socket $socket): \Generator $remoteAddress ); - $handle = new ResourceInputStream(fopen($this->bufferer->getFilePath(), 'rb')); - try { yield $socket->write(implode("\r\n", array_merge($header, $this->customHttpHeaders))."\r\n\r\n"); @@ -91,14 +95,23 @@ public function __invoke(Socket $socket): \Generator continue; } - if (($chunk = yield $handle->read()) === null) { - break; // No more buffering and client caught up to it -> finish download + if (null !== $chunk = yield $this->inputStream->read()) { + yield $socket->write($chunk); + + $progressBar->max = $this->bufferer->getCurrentProgress(); + $progressBar->advance(strlen($chunk)); + + continue; } - yield $socket->write($chunk); + if (!$this->bufferer->inputStream) { + break; // All input sources depleted -> finish download + } - $progressBar->max = $this->bufferer->getCurrentProgress(); - $progressBar->advance(strlen($chunk)); + // Use fallback inputStream - handy when PipeBufferer exits sooner than it consumes its inputStream + $this->inputStream = $this->bufferer->inputStream; + // Prevent other potential Responders to consume same inputStream. This can be solved in future + $this->bufferer->inputStream = null; } $progressBar->finish(); $this->logger->debug("$remoteAddress finished download"); @@ -106,7 +119,6 @@ public function __invoke(Socket $socket): \Generator $this->logger->debug("$remoteAddress aborted download"); } - $handle->close(); $socket->end(); } } diff --git a/tests/IntegrationTest.php b/tests/IntegrationTest.php new file mode 100644 index 0000000..10b61db --- /dev/null +++ b/tests/IntegrationTest.php @@ -0,0 +1,84 @@ +createMock(InputStream::class); + $buffererOutputStream = $this->createMock(OutputStream::class); + $responderInputStream = $this->createMock(InputStream::class); + $consoleOutput = $this->createMock(ConsoleOutput::class); + $sectionOutput = $this->createMock(ConsoleSectionOutput::class); + $server = $this->createMock(Server::class); + $logger = new TestLogger(); + + $bufferer = new PipeBufferer($logger, $buffererInputStream, $buffererOutputStream, $sectionOutput, $server, 3); + $responder = new Responder($logger, $bufferer, $consoleOutput, [], $responderInputStream); + + $socket = $this->getMockBuilder(ClientSocket::class) + ->disableOriginalConstructor() + ->setMethods(['read', 'write', 'getRemoteAddress', 'end']) + ->getMock() + ; + + $consoleOutput->method('section')->willReturn($sectionOutput); + $sectionOutput->method('getFormatter')->willReturn($this->createMock(OutputFormatterInterface::class)); + $socket->method('read')->willReturn(new Success('')); + + $buffererInputStream->method('read')->willReturn( + new Delayed(0, $foo = 'foo'), + new Delayed(0, $bar = 'bar'), + new Delayed(0, $baz = 'baz'), + new Success() + ); + + $buffererOutputStream + ->expects($this->exactly(1)) + ->method('write') + ->willReturn(new Success()) + ; + + $responderInputStream + ->expects($this->exactly(2)) + ->method('read') + ->willReturn(new Success($foo), new Success()) + ; + + $socket + ->expects($this->exactly(4)) + ->method('write') + ->withConsecutive([Assert::stringContains('HTTP/1.1 200 OK')], [$foo], [$bar], [$baz]) + ->willReturn(new Success()) + ; + + $server->expects($this->exactly(1))->method('close'); + + Loop::run(function () use ($socket, $bufferer, $responder): void { + asyncCoroutine($bufferer)(); + asyncCoroutine($responder)($socket); + }); + + self::assertTrue($logger->hasWarningThatContains('Max buffer size reached')); + } +} diff --git a/tests/ResponderTest.php b/tests/ResponderTest.php index e3ed970..3d90a25 100644 --- a/tests/ResponderTest.php +++ b/tests/ResponderTest.php @@ -4,6 +4,7 @@ namespace Ostrolucky\Stdinho\Tests; +use Amp\ByteStream\InputStream; use Amp\Coroutine; use Amp\Socket\ClientSocket; use Amp\Success; @@ -22,7 +23,8 @@ public function testResponderHandlesClientAbruptDisconnect(): void $logger = new TestLogger(), new ResolvedBufferer(__FILE__), $this->createMock(ConsoleOutput::class), - [] + [], + $this->createMock(InputStream::class) ); $socket = $this->getMockBuilder(ClientSocket::class)