Skip to content

Commit

Permalink
Add --buffer-size option
Browse files Browse the repository at this point in the history
This was introduced to be able to handle out of disk size errors.
In such case, buffering stops and STDIN is consumed by Responder directly.

Originally I wanted this mechanism to be triggered by write failure,
but upstream is making this effort harder - see:
amphp/byte-stream#52
amphp/byte-stream#54
  • Loading branch information
ostrolucky committed Apr 22, 2019
1 parent a3832df commit 80aa37a
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 97 deletions.
Empty file removed bin/composer.json
Empty file.
35 changes: 35 additions & 0 deletions src/Bufferer/AbstractBufferer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Ostrolucky\Stdinho\Bufferer;

use Amp\ByteStream\InputStream;
use Amp\Promise;

abstract class AbstractBufferer
{
/**
* @var string
*/
public $filePath;
/**
* @var InputStream|null
*/
public $inputStream;

public function __construct(string $filePath)
{
$this->filePath = $filePath;
}

abstract public function __invoke(): Promise;

abstract public function isBuffering(): bool;

abstract public function waitForWrite(): Promise;

abstract public function getMimeType(): Promise;

abstract public function getCurrentProgress(): int;
}
22 changes: 0 additions & 22 deletions src/Bufferer/BuffererInterface.php

This file was deleted.

71 changes: 42 additions & 29 deletions src/Bufferer/PipeBufferer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,39 @@

namespace Ostrolucky\Stdinho\Bufferer;

use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Coroutine;
use Amp\Deferred;
use Amp\Promise;
use Amp\Socket\Server;
use Ostrolucky\Stdinho\ProgressBar;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Output\ConsoleSectionOutput;

class PipeBufferer implements BuffererInterface
class PipeBufferer extends AbstractBufferer
{
/**
* @var LoggerInterface
*/
private $logger;
/**
* @var ResourceInputStream
* @var OutputStream
*/
private $inputStream;
private $outputStream;
/**
* @var ResourceOutputStream
* @var Server
*/
private $outputStream;

private $server;
/**
* @var Deferred
*/
private $mimeType;
/**
* @var string
*/
private $filePath;
/**
* @var ProgressBar
*/
private $progressBar;

/**
* @var bool
*/
Expand All @@ -49,57 +45,74 @@ class PipeBufferer implements BuffererInterface
* @var Deferred|null
*/
private $deferred;

/**
* @param resource $inputStream
* @var int
*/
private $bufferSize;

public function __construct(
LoggerInterface $logger,
$inputStream,
?string $outputPath,
ConsoleSectionOutput $output
InputStream $inputStream,
OutputStream $outputStream,
ConsoleSectionOutput $output,
Server $server,
int $bufferSize
) {
$this->logger = $logger;
$this->inputStream = new ResourceInputStream($inputStream);
$this->outputStream = new ResourceOutputStream($fOutput = $outputPath ? fopen($outputPath, 'wb') : tmpfile());
$this->inputStream = $inputStream;
$this->outputStream = $outputStream;
$this->server = $server;
$this->mimeType = new Deferred();
$this->filePath = $outputPath ?: stream_get_meta_data($fOutput)['uri'];
$this->progressBar = new ProgressBar($output, 0, 'buffer');
$this->bufferSize = $bufferSize;

$filePath = '';

if ($outputStream instanceof ResourceOutputStream) {
$filePath = stream_get_meta_data($outputStream->getResource())['uri'];
}

parent::__construct($filePath);
}

public function __invoke(): Promise
{
$generator = function (): \Generator {
$this->logger->debug("Saving stdin to $this->filePath");

$bytesDownloaded = 0;
while (null !== $chunk = yield $this->inputStream->read()) {
yield $this->outputStream->write($chunk);

if ($bytesDownloaded === 0) {
if ($this->progressBar->step === 0) {
$mimeType = (new \finfo(FILEINFO_MIME))->buffer($chunk);
$this->logger->debug(sprintf('Stdin MIME type detected: "%s"', $mimeType));
$this->mimeType->resolve($mimeType);
}

$this->progressBar->setProgress($bytesDownloaded += strlen($chunk));
$this->progressBar->advance(strlen($chunk));
$this->resolveDeferrer();

if ($this->progressBar->step < $this->bufferSize) {
continue;
}

$this->logger->warning(
'Max buffer size reached. Disabling buffering and falling back to piping stdin to socket directly. No new client connections will be accepted.'
);
$this->server->close();

break;
}

$this->buffering = false;
$this->progressBar->finish();
$this->logger->debug("Stdin transfer done, $bytesDownloaded bytes downloaded");
$this->logger->debug("Buffering to file stopped, {$this->progressBar->step} bytes stored");
$this->resolveDeferrer();
};

return new Coroutine($generator());
}

public function getFilePath(): string
{
return $this->filePath;
}

public function getMimeType(): Promise
{
return $this->mimeType->promise();
Expand Down
13 changes: 2 additions & 11 deletions src/Bufferer/ResolvedBufferer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
use Amp\Promise;
use Amp\Success;

class ResolvedBufferer implements BuffererInterface
class ResolvedBufferer extends AbstractBufferer
{
/**
* @var string
*/
private $filePath;
/**
* @var int
*/
Expand All @@ -24,7 +20,7 @@ class ResolvedBufferer implements BuffererInterface

public function __construct(string $filePath)
{
$this->filePath = $filePath;
parent::__construct($filePath);
$this->filesize = filesize($filePath);
$this->mimeType = (new \finfo(FILEINFO_MIME))->file($filePath);
}
Expand All @@ -34,11 +30,6 @@ public function __invoke(): Promise
return new Success();
}

public function getFilePath(): string
{
return $this->filePath;
}

public function isBuffering(): bool
{
return false;
Expand Down
66 changes: 52 additions & 14 deletions src/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

namespace Ostrolucky\Stdinho;

use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Loop;
use Amp\Socket\Server;
use Ostrolucky\Stdinho\Bufferer\AbstractBufferer;
use Ostrolucky\Stdinho\Bufferer\PipeBufferer;
use Ostrolucky\Stdinho\Bufferer\ResolvedBufferer;
use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Exception\InvalidOptionException;
use Symfony\Component\Console\Exception\LogicException;
use Symfony\Component\Console\Input\InputArgument;
Expand Down Expand Up @@ -52,6 +57,12 @@ protected function configure(): void
'Determines after how many client connections should program shut down',
INF
)
->addOption(
'buffer-size',
'b',
InputOption::VALUE_REQUIRED,
'Buffer size in bytes. By default, it is 90% of available disk space'
)
->setDescription('Turn any STDIN/STDOUT into HTTP server')
;
}
Expand Down Expand Up @@ -96,28 +107,55 @@ protected function initialize(InputInterface $input, OutputInterface $output): v
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$addressPort = $input->getArgument('addressPort');
$bufferSize = $input->getOption('buffer-size');
$connectionsLimit = (float)$input->getOption('connections-limit');
$filePath = $input->getOption('file');

$server = listen($addressPort);
$logger = new ConsoleLogger($firstSection = $output->section());
$bufferer = $this->createBufferer($output, $logger, $server, $filePath, $bufferSize);

$bufferer = $this->hasStdin ?
new PipeBufferer($logger, STDIN, $input->getOption('file'), $output->section()) :
new ResolvedBufferer($input->getOption('file'))
;
$firstSection->writeln(
"<info>Connection opened at http://{$server->getAddress()}\nPress CTRL+C to exit.</info>\n"
);

$bufferHandler = asyncCoroutine($bufferer);
$clientHandler = asyncCoroutine(new Responder($logger, $bufferer, $output, $this->customHttpHeaders));
Loop::run(function () use (&$connectionsLimit, $server, $logger, $output, $bufferer) {
asyncCoroutine($bufferer)();

Loop::run(function () use ($input, $clientHandler, $firstSection, $bufferHandler) {
$bufferHandler();
$server = listen($input->getArgument('addressPort'));
$firstSection->writeln(
"<info>Connection opened at http://{$server->getAddress()}\nPress CTRL+C to exit.</info>\n"
);
$connectionsLimit = $input->getOption('connections-limit');
while ($connectionsLimit-- && ($socket = yield $server->accept())) {
$clientHandler($socket);
$responder = new Responder(
$logger,
$bufferer,
$output,
$this->customHttpHeaders,
new ResourceInputStream(fopen($bufferer->filePath, 'rb'))
);
asyncCoroutine($responder)($socket);
}
});

return 0;
}

private function createBufferer(
ConsoleOutput $output,
LoggerInterface $logger,
Server $server,
?string $filePath,
?string $bufferSize
): AbstractBufferer {
if (!$this->hasStdin) {
return new ResolvedBufferer($filePath);
}

return new PipeBufferer(
$logger,
new ResourceInputStream(STDIN),
new ResourceOutputStream($filePath ? fopen($filePath, 'wb') : tmpfile()),
$output->section(),
$server,
(int)($bufferSize ?? disk_free_space($filePath ?: sys_get_temp_dir()) * .9)
);
}
}
7 changes: 1 addition & 6 deletions src/ProgressBar.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,7 @@ public function __construct(ConsoleSectionOutput $output, int $max, string $form
*/
public function advance(int $step): void
{
$this->setProgress($this->step + $step);
}

public function setProgress(int $step): void
{
$this->step = $step;
$this->step += $step;
$this->percent = $this->max ? (float)$step / $this->max : 0;

if (microtime(true) - $this->lastWriteTime < ($this->output->isDecorated() ? .1 : 1)) {
Expand Down

0 comments on commit 80aa37a

Please sign in to comment.