Skip to content

Commit

Permalink
[HttpClient] Add portable HTTP/2 implementation based on Amp's HTTP c…
Browse files Browse the repository at this point in the history
…lient
  • Loading branch information
nicolas-grekas committed Jan 5, 2020
1 parent 0c6d64b commit 7de90cd
Show file tree
Hide file tree
Showing 17 changed files with 1,433 additions and 185 deletions.
2 changes: 2 additions & 0 deletions composer.json
Expand Up @@ -99,6 +99,8 @@
"symfony/yaml": "self.version"
},
"require-dev": {
"amphp/http-client": "^4.0",
"amphp/http-tunnel": "^1.0",
"cache/integration-tests": "dev-master",
"doctrine/annotations": "~1.0",
"doctrine/cache": "~1.6",
Expand Down
155 changes: 155 additions & 0 deletions src/Symfony/Component/HttpClient/AmpHttpClient.php
@@ -0,0 +1,155 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\HttpClient;

use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\Request;
use Amp\Http\Tunnel\Http1TunnelConnector;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Component\HttpClient\Internal\AmpClientState;
use Symfony\Component\HttpClient\Response\AmpResponse;
use Symfony\Component\HttpClient\Response\ResponseStream;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
use Symfony\Contracts\HttpClient\ResponseStreamInterface;
use Symfony\Contracts\Service\ResetInterface;

if (!interface_exists(DelegateHttpClient::class)) {
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\AmpHttpClient" as the "amphp/http-client" package is not installed. Try running "composer require amphp/http-client".');
}

/**
* A portable implementation of the HttpClientInterface contracts based on Amp's HTTP client.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
final class AmpHttpClient implements HttpClientInterface, LoggerAwareInterface, ResetInterface
{
use HttpClientTrait;
use LoggerAwareTrait;

private $defaultOptions = self::OPTIONS_DEFAULTS;

/** @var AmpClientState */
private $multi;

/**
* @param array $defaultOptions Default requests' options
* @param int $maxHostConnections The maximum number of connections to open
*
* @see HttpClientInterface::OPTIONS_DEFAULTS for available options
*/
public function __construct(array $defaultOptions = [], callable $clientConfigurator = null, int $maxHostConnections = 6, int $maxPendingPushes = 50)
{
$this->defaultOptions['buffer'] = $this->defaultOptions['buffer'] ?? \Closure::fromCallable([__CLASS__, 'shouldBuffer']);

if ($defaultOptions) {
[, $this->defaultOptions] = self::prepareRequest(null, null, $defaultOptions, $this->defaultOptions);
}

$this->multi = new AmpClientState($clientConfigurator, $maxHostConnections, $maxPendingPushes, $this->logger);
}

/**
* @see HttpClientInterface::OPTIONS_DEFAULTS for available options
*
* {@inheritdoc}
*/
public function request(string $method, string $url, array $options = []): ResponseInterface
{
[$url, $options] = self::prepareRequest($method, $url, $options, $this->defaultOptions);

$options['proxy'] = self::getProxy($options['proxy'], $url, $options['no_proxy']);

if (null !== $options['proxy'] && !class_exists(Http1TunnelConnector::class)) {
throw new \LogicException('You cannot use the "proxy" option as the "amphp/http-tunnel" package is not installed. Try running "composer require amphp/http-tunnel".');
}

if ('' !== $options['body'] && 'POST' === $method && !isset($options['normalized_headers']['content-type'])) {
$options['headers'][] = 'Content-Type: application/x-www-form-urlencoded';
}

if (!isset($options['normalized_headers']['user-agent'])) {
$options['headers'][] = 'User-Agent: Symfony HttpClient/Amp';
}

if (0 < $options['max_duration']) {
$options['timeout'] = min($options['max_duration'], $options['timeout']);
}

if ($options['resolve']) {
$this->multi->dnsCache = $options['resolve'] + $this->multi->dnsCache;
}

if ($options['peer_fingerprint'] && !isset($options['peer_fingerprint']['pin-sha256'])) {
throw new TransportException(__CLASS__.' supports only "pin-sha256" fingerprints.');
}

$request = new Request(implode('', $url), $method);

if ($options['http_version']) {
switch ((float) $options['http_version']) {
case 1.0: $request->setProtocolVersions(['1.0']); break;
case 1.1: $request->setProtocolVersions(['1.1', '1.0']); break;
default: $request->setProtocolVersions(['2', '1.1', '1.0']); break;
}
}

foreach ($options['headers'] as $v) {
$h = explode(': ', $v, 2);
$request->addHeader($h[0], $h[1]);
}

$request->setTcpConnectTimeout(1000 * $options['timeout']);
$request->setTlsHandshakeTimeout(1000 * $options['timeout']);
$request->setTransferTimeout(1000 * $options['max_duration']);

if ('' !== $request->getUri()->getUserInfo() && !$request->hasHeader('authorization')) {
$auth = explode(':', $request->getUri()->getUserInfo(), 2);
$auth = array_map('rawurldecode', $auth) + [1 => ''];
$request->setHeader('Authorization', 'Basic '.base64_encode(implode(':', $auth)));
}

return new AmpResponse($this->multi, $request, $options, $this->logger);
}

/**
* {@inheritdoc}
*/
public function stream($responses, float $timeout = null): ResponseStreamInterface
{
if ($responses instanceof AmpResponse) {
$responses = [$responses];
} elseif (!is_iterable($responses)) {
throw new \TypeError(sprintf('%s() expects parameter 1 to be an iterable of AmpResponse objects, %s given.', __METHOD__, \is_object($responses) ? \get_class($responses) : \gettype($responses)));
}

return new ResponseStream(AmpResponse::stream($responses, $timeout));
}

public function reset()
{
$this->multi->dnsCache = [];

if ($this->logger) {
foreach ($this->multi->pushedResponses as $authority => $pushedResponses) {
foreach ($pushedResponses as [$pushedUrl]) {
$this->logger->debug(sprintf('Unused pushed response: "%s"', $pushedUrl));
}
}
}

$this->multi->pushedResponses = [];
}
}
5 changes: 5 additions & 0 deletions src/Symfony/Component/HttpClient/CHANGELOG.md
@@ -1,6 +1,11 @@
CHANGELOG
=========

5.1.0
-----

* added `AmpHttpClient`, a portable HTTP/2 implementation based on Amp

4.4.0
-----

Expand Down
43 changes: 43 additions & 0 deletions src/Symfony/Component/HttpClient/HttpClientTrait.php
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\HttpClient;

use Symfony\Component\HttpClient\Exception\InvalidArgumentException;
use Symfony\Component\HttpClient\Exception\TransportException;

/**
* Provides the common logic from writing HttpClientInterface implementations.
Expand Down Expand Up @@ -541,6 +542,48 @@ private static function mergeQueryString(?string $queryString, array $queryArray
return implode('&', $replace ? array_replace($query, $queryArray) : ($query + $queryArray));
}

/**
* Loads proxy configuration from the same environment variables as curl when no proxy is explicitly set.
*/
private static function getProxy(?string $proxy, array $url, ?string $noProxy): ?array
{
if (null === $proxy) {
// Ignore HTTP_PROXY except on the CLI to work around httpoxy set of vulnerabilities
$proxy = $_SERVER['http_proxy'] ?? (\in_array(\PHP_SAPI, ['cli', 'phpdbg'], true) ? $_SERVER['HTTP_PROXY'] ?? null : null) ?? $_SERVER['all_proxy'] ?? $_SERVER['ALL_PROXY'] ?? null;

if ('https:' === $url['scheme']) {
$proxy = $_SERVER['https_proxy'] ?? $_SERVER['HTTPS_PROXY'] ?? $proxy;
}
}

if (null === $proxy) {
return null;
}

$proxy = (parse_url($proxy) ?: []) + ['scheme' => 'http'];

if (!isset($proxy['host'])) {
throw new TransportException('Invalid HTTP proxy: host is missing.');
}

if ('http' === $proxy['scheme']) {
$proxyUrl = 'tcp://'.$proxy['host'].':'.($proxy['port'] ?? '80');
} elseif ('https' === $proxy['scheme']) {
$proxyUrl = 'ssl://'.$proxy['host'].':'.($proxy['port'] ?? '443');
} else {
throw new TransportException(sprintf('Unsupported proxy scheme "%s": "http" or "https" expected.', $proxy['scheme']));
}

$noProxy = $noProxy ?? $_SERVER['no_proxy'] ?? $_SERVER['NO_PROXY'] ?? '';
$noProxy = $noProxy ? preg_split('/[\s,]+/', $noProxy) : [];

return [
'url' => $proxyUrl,
'auth' => isset($proxy['user']) ? 'Basic '.base64_encode(rawurldecode($proxy['user']).':'.rawurldecode($proxy['pass'] ?? '')) : null,
'no_proxy' => $noProxy,
];
}

private static function shouldBuffer(array $headers): bool
{
if (null === $contentType = $headers['content-type'][0] ?? null) {
Expand Down
129 changes: 129 additions & 0 deletions src/Symfony/Component/HttpClient/Internal/AmpBody.php
@@ -0,0 +1,129 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\HttpClient\Internal;

use Amp\ByteStream\InputStream;
use Amp\ByteStream\ResourceInputStream;
use Amp\Http\Client\RequestBody;
use Amp\Promise;
use Amp\Success;
use Symfony\Component\HttpClient\Exception\TransportException;

/**
* @author Nicolas Grekas <p@tchwork.com>
*
* @internal
*/
class AmpBody implements RequestBody, InputStream
{
private $body;
private $onProgress;
private $offset = 0;
private $length = -1;
private $uploaded = 0;

public function __construct($body, &$info, \closure $onProgress)
{
$this->body = $body;
$this->info = &$info;
$this->onProgress = $onProgress;

if (\is_resource($body)) {
$this->offset = ftell($body);
$this->length = fstat($body)['size'];
$this->body = new ResourceInputStream($body);
} elseif (\is_string($body)) {
$this->length = \strlen($body);
}
}

public function createBodyStream(): InputStream
{
return $this;
}

public function getHeaders(): Promise
{
return new Success([]);
}

public function getBodyLength(): Promise
{
return new Success($this->length - $this->offset);
}

public function read(): Promise
{
$this->info['size_upload'] += $this->uploaded;
$this->uploaded = 0;
($this->onProgress)();

$chunk = $this->doRead();
$chunk->onResolve(function ($e, $data) {
if (null !== $data) {
$this->uploaded = \strlen($data);
} else {
$this->info['upload_content_length'] = $this->info['size_upload'];
}
});

return $chunk;
}

public static function rewind(RequestBody $body): RequestBody
{
if (!$body instanceof self) {
return $body;
}

if ($body->body instanceof ResourceInputStream) {
fseek($body->body->getResource(), $body->offset);

return new $body($body->body, $body->info, $body->onProgress);
}

if (\is_string($body->body)) {
$body->offset = 0;
}

return $body;
}

private function doRead(): Promise
{
if ($this->body instanceof ResourceInputStream) {
return $this->body->read();
}

if (null === $this->offset || !$this->length) {
return new Success();
}

if (\is_string($this->body)) {
$this->offset = null;

return new Success($this->body);
}

if ('' === $data = ($this->body)(16372)) {
$this->offset = null;

return new Success();
}

if (!\is_string($data)) {
throw new TransportException(sprintf('Return value of the "body" option callback must be string, %s returned.', \gettype($data)));
}

return new Success($data);
}
}

0 comments on commit 7de90cd

Please sign in to comment.