Skip to content

Commit

Permalink
Merge remote-tracking branch 'troelskn/feature-concurrent-multi-curl'
Browse files Browse the repository at this point in the history
  • Loading branch information
kriswallsmith committed Jul 29, 2014
2 parents 972e05f + 9dd2121 commit 2962f50
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -4,7 +4,7 @@ php:
- 5.4
- 5.5

env: TEST_SERVER="http://127.0.0.1:8080/server.php" TEST_PROXY="127.0.0.1:3128"
env: TEST_SERVER="http://127.0.0.1:8080/server.php" TEST_PROXY="127.0.0.1:3128" PHP_FCGI_CHILDREN=10 PHP_FCGI_MAX_REQUESTS=10

before_install:
- echo "" | sudo add-apt-repository ppa:nginx/stable > /dev/null 2>&1
Expand Down
67 changes: 67 additions & 0 deletions lib/Buzz/Client/AbstractCurl.php
Expand Up @@ -13,6 +13,73 @@
*/
abstract class AbstractCurl extends AbstractClient
{
static $curl_error_codes = array(
CURLE_UNSUPPORTED_PROTOCOL => 'UNSUPPORTED_PROTOCOL',
CURLE_FAILED_INIT => 'FAILED_INIT',
CURLE_URL_MALFORMAT => 'URL_MALFORMAT',
CURLE_URL_MALFORMAT_USER => 'URL_MALFORMAT_USER',
CURLE_COULDNT_RESOLVE_PROXY => 'COULDNT_RESOLVE_PROXY',
CURLE_COULDNT_RESOLVE_HOST => 'COULDNT_RESOLVE_HOST',
CURLE_COULDNT_CONNECT => 'COULDNT_CONNECT',
CURLE_FTP_WEIRD_SERVER_REPLY => 'FTP_WEIRD_SERVER_REPLY',
CURLE_FTP_ACCESS_DENIED => 'FTP_ACCESS_DENIED',
CURLE_FTP_USER_PASSWORD_INCORRECT => 'FTP_USER_PASSWORD_INCORRECT',
CURLE_FTP_WEIRD_PASS_REPLY => 'FTP_WEIRD_PASS_REPLY',
CURLE_FTP_WEIRD_USER_REPLY => 'FTP_WEIRD_USER_REPLY',
CURLE_FTP_WEIRD_PASV_REPLY => 'FTP_WEIRD_PASV_REPLY',
CURLE_FTP_WEIRD_227_FORMAT => 'FTP_WEIRD_227_FORMAT',
CURLE_FTP_CANT_GET_HOST => 'FTP_CANT_GET_HOST',
CURLE_FTP_CANT_RECONNECT => 'FTP_CANT_RECONNECT',
CURLE_FTP_COULDNT_SET_BINARY => 'FTP_COULDNT_SET_BINARY',
CURLE_PARTIAL_FILE => 'PARTIAL_FILE',
CURLE_FTP_COULDNT_RETR_FILE => 'FTP_COULDNT_RETR_FILE',
CURLE_FTP_WRITE_ERROR => 'FTP_WRITE_ERROR',
CURLE_FTP_QUOTE_ERROR => 'FTP_QUOTE_ERROR',
CURLE_HTTP_NOT_FOUND => 'HTTP_NOT_FOUND',
CURLE_WRITE_ERROR => 'WRITE_ERROR',
CURLE_MALFORMAT_USER => 'MALFORMAT_USER',
CURLE_FTP_COULDNT_STOR_FILE => 'FTP_COULDNT_STOR_FILE',
CURLE_READ_ERROR => 'READ_ERROR',
CURLE_OUT_OF_MEMORY => 'OUT_OF_MEMORY',
CURLE_OPERATION_TIMEOUTED => 'OPERATION_TIMEOUTED',
CURLE_FTP_COULDNT_SET_ASCII => 'FTP_COULDNT_SET_ASCII',
CURLE_FTP_PORT_FAILED => 'FTP_PORT_FAILED',
CURLE_FTP_COULDNT_USE_REST => 'FTP_COULDNT_USE_REST',
CURLE_FTP_COULDNT_GET_SIZE => 'FTP_COULDNT_GET_SIZE',
CURLE_HTTP_RANGE_ERROR => 'HTTP_RANGE_ERROR',
CURLE_HTTP_POST_ERROR => 'HTTP_POST_ERROR',
CURLE_SSL_CONNECT_ERROR => 'SSL_CONNECT_ERROR',
CURLE_FTP_BAD_DOWNLOAD_RESUME => 'FTP_BAD_DOWNLOAD_RESUME',
CURLE_FILE_COULDNT_READ_FILE => 'FILE_COULDNT_READ_FILE',
CURLE_LDAP_CANNOT_BIND => 'LDAP_CANNOT_BIND',
CURLE_LDAP_SEARCH_FAILED => 'LDAP_SEARCH_FAILED',
CURLE_LIBRARY_NOT_FOUND => 'LIBRARY_NOT_FOUND',
CURLE_FUNCTION_NOT_FOUND => 'FUNCTION_NOT_FOUND',
CURLE_ABORTED_BY_CALLBACK => 'ABORTED_BY_CALLBACK',
CURLE_BAD_FUNCTION_ARGUMENT => 'BAD_FUNCTION_ARGUMENT',
CURLE_BAD_CALLING_ORDER => 'BAD_CALLING_ORDER',
CURLE_HTTP_PORT_FAILED => 'HTTP_PORT_FAILED',
CURLE_BAD_PASSWORD_ENTERED => 'BAD_PASSWORD_ENTERED',
CURLE_TOO_MANY_REDIRECTS => 'TOO_MANY_REDIRECTS',
CURLE_UNKNOWN_TELNET_OPTION => 'UNKNOWN_TELNET_OPTION',
CURLE_TELNET_OPTION_SYNTAX => 'TELNET_OPTION_SYNTAX',
CURLE_OBSOLETE => 'OBSOLETE',
CURLE_SSL_PEER_CERTIFICATE => 'SSL_PEER_CERTIFICATE',
CURLE_GOT_NOTHING => 'GOT_NOTHING',
CURLE_SSL_ENGINE_NOTFOUND => 'SSL_ENGINE_NOTFOUND',
CURLE_SSL_ENGINE_SETFAILED => 'SSL_ENGINE_SETFAILED',
CURLE_SEND_ERROR => 'SEND_ERROR',
CURLE_RECV_ERROR => 'RECV_ERROR',
CURLE_SHARE_IN_USE => 'SHARE_IN_USE',
CURLE_SSL_CERTPROBLEM => 'SSL_CERTPROBLEM',
CURLE_SSL_CIPHER => 'SSL_CIPHER',
CURLE_SSL_CACERT => 'SSL_CACERT',
CURLE_BAD_CONTENT_ENCODING => 'BAD_CONTENT_ENCODING',
CURLE_LDAP_INVALID_URL => 'LDAP_INVALID_URL',
CURLE_FILESIZE_EXCEEDED => 'FILESIZE_EXCEEDED',
CURLE_FTP_SSL_FAILED => 'FTP_SSL_FAILED'
);

protected $options = array();

public function __construct()
Expand Down
22 changes: 22 additions & 0 deletions lib/Buzz/Client/AsyncClientInterface.php
@@ -0,0 +1,22 @@
<?php

namespace Buzz\Client;

interface AsyncClientInterface extends BatchClientInterface
{
/**
* Returns true if all deferred requests have completed
*/
public function isDone();

/**
* Returns the number of requests currently in process.
*/
public function queueSize();

/**
* This will execute a tick, which may be a lot of work or none at all. You should keep calling until isDone()
*/
public function proceed();

}
104 changes: 81 additions & 23 deletions lib/Buzz/Client/MultiCurl.php
Expand Up @@ -6,48 +6,106 @@
use Buzz\Message\RequestInterface;
use Buzz\Exception\ClientException;

class MultiCurl extends AbstractCurl implements BatchClientInterface
class MultiCurl extends AbstractCurl implements AsyncClientInterface
{
private $queue = array();
protected $queue = array();

/**
* Populates the supplied response with the response for the supplied request.
*
* @param RequestInterface $request A request object
* @param MessageInterface $response A response object
* @param Array $options Pass options 'callback' to handle responses as they complete and 'errback' to handle transport errors (timeout etc.)
*/
public function send(RequestInterface $request, MessageInterface $response, array $options = array())
{
$this->queue[] = array($request, $response, $options);
}

public function flush()
public function isDone()
{
return empty($this->queue);
}

public function queueSize()
{
if (false === $curlm = curl_multi_init()) {
throw new ClientException('Unable to create a new cURL multi handle');
return count($this->queue);
}

public function proceed()
{
if ($this->isDone()) {
return;
}

// Setup
if (!isset($this->curlm)) {
if (false === $this->curlm = curl_multi_init()) {
throw new ClientException('Unable to create a new cURL multi handle');
}
}

// prepare a cURL handle for each entry in the queue
foreach ($this->queue as $i => &$queue) {
list($request, $response, $options) = $queue;
$curl = $queue[] = static::createCurlHandle();
$this->prepare($curl, $request, $options);
curl_multi_add_handle($curlm, $curl);
foreach (array_keys($this->queue) as $i) {
if (!isset($this->queue[$i][3])) {
list($request, $response, $options) = $this->queue[$i];
$curl = static::createCurlHandle();
$this->queue[$i][] = $curl;
if (isset($options['callback'])) {
unset($options['callback']);
}
if (isset($options['errback'])) {
unset($options['errback']);
}
$this->prepare($curl, $request, $options);
curl_multi_add_handle($this->curlm, $curl);
}
}

// Process outstanding perform
$active = null;
do {
$mrc = curl_multi_exec($curlm, $active);
} while (CURLM_CALL_MULTI_PERFORM == $mrc);

while ($active && CURLM_OK == $mrc) {
if (-1 != curl_multi_select($curlm)) {
do {
$mrc = curl_multi_exec($curlm, $active);
} while (CURLM_CALL_MULTI_PERFORM == $mrc);
$mrc = curl_multi_exec($this->curlm, $active);
} while ($active && CURLM_CALL_MULTI_PERFORM == $mrc);

// Handle any completed requests
while ($done = curl_multi_info_read($this->curlm)) {
foreach (array_keys($this->queue) as $i) {
list($request, $response, $options, $curl) = $this->queue[$i];
if ($curl === $done['handle']) {
if ($done['result'] === CURLE_OK) {
static::populateResponse($curl, curl_multi_getcontent($curl), $response);
curl_multi_remove_handle($this->curlm, $curl);
curl_close($curl);
unset($this->queue[$i]);
if (isset($options['callback'])) {
call_user_func($options['callback'], $request, $response, $options);
}
} else {
// Transport error
$error = self::$curl_error_codes[$done['result']];
curl_multi_remove_handle($this->curlm, $curl);
curl_close($curl);
unset($this->queue[$i]);
if (isset($options['errback'])) {
call_user_func($options['errback'], $request, $error, $done['result']);
}
}
}
}
}

// populate the responses
while (list($request, $response, $options, $curl) = array_shift($this->queue)) {
static::populateResponse($curl, curl_multi_getcontent($curl), $response);
curl_multi_remove_handle($curlm, $curl);
// Cleanup
if ($this->isDone()) {
curl_multi_close($this->curlm);
$this->curlm = null;
}
}

curl_multi_close($curlm);
public function flush()
{
while (!$this->isDone()) {
$this->proceed();
}
}
}
37 changes: 36 additions & 1 deletion test/Buzz/Test/Client/FunctionalTest.php
Expand Up @@ -13,6 +13,22 @@
use Buzz\Message\RequestInterface;
use Buzz\Message\Response;

class MultiCurlCallbackTarget
{
public $success = array();
public $error = array();

function success()
{
$this->success[] = func_get_args();
}

function error()
{
$this->error[] = func_get_args();
}
}

class FunctionalTest extends \PHPUnit_Framework_TestCase
{
protected function setUp()
Expand Down Expand Up @@ -207,12 +223,31 @@ public function testRedirectedToForbiddenProtocol()
$response = $this->send($client, $request);
}

public function testMultiCurlExecutesRequestsConcurently()
{
$listener = new MultiCurlCallbackTarget();
$client = new MultiCurl();
$client->setTimeout(10);
$t = microtime(true);
for ($i = 3; $i > 0; $i--) {
$request = new Request();
$request->fromUrl($_SERVER['TEST_SERVER'].'?delay='.$i);
$options = array('callback' => array($listener, 'success'), 'errback' => array($listener, 'error'));
$client->send($request, new Response(), $options);
}
$this->assertCount(0, $listener->success);
$client->flush();
$time = microtime(true) - $t;
$this->assertCount(3, $listener->success);
$this->assertLessThan(4, $time); // They should complete in ~3 secs.
}

public function provideClient()
{
return array(
array(new Curl()),
array(new FileGetContents()),
// array(new MultiCurl()),
array(new MultiCurl()),
);
}

Expand Down
4 changes: 4 additions & 0 deletions test/server.php
Expand Up @@ -5,6 +5,10 @@
die;
}

if (isset($_GET['delay'])) {
sleep($_GET['delay']);
}

echo json_encode(array(
'SERVER' => $_SERVER,
'GET' => $_GET,
Expand Down

0 comments on commit 2962f50

Please sign in to comment.