Skip to content

Commit

Permalink
Parallel cURL implementation
Browse files Browse the repository at this point in the history
If a new "num_threads" > 1 option is passed using curl as consumer,  as many parallel cURL requests will be executed in parallel using curl_multi_exec
  • Loading branch information
manuelflara authored and jbwyme committed Jul 8, 2019
1 parent 48ebc9f commit 6f15000
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 24 deletions.
8 changes: 8 additions & 0 deletions lib/ConsumerStrategies/AbstractConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ protected function _handleError($code, $msg) {
}
}

/**
* Number of requests/batches that will be processed in parallel.
* @return int
*/
public function getNumParallelRequests() {
return 1;
}

/**
* Persist a batch of messages in whatever way the implementer sees fit
* @param array $batch an array of messages to consume
Expand Down
82 changes: 59 additions & 23 deletions lib/ConsumerStrategies/CurlConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ class ConsumerStrategies_CurlConsumer extends ConsumerStrategies_AbstractConsume
protected $_fork = null;


/**
* @var int number of cURL requests to run in parallel. 1 by default
*/
protected $_num_threads;


/**
* Creates a new CurlConsumer and assigns properties from the $options array
* @param array $options
Expand All @@ -56,6 +62,7 @@ function __construct($options) {
$this->_timeout = array_key_exists('timeout', $options) ? $options['timeout'] : 30;
$this->_protocol = array_key_exists('use_ssl', $options) && $options['use_ssl'] == true ? "https" : "http";
$this->_fork = array_key_exists('fork', $options) ? ($options['fork'] == true) : false;
$this->_num_threads = array_key_exists('num_threads', $options) ? max(1, intval($options['num_threads'])) : 1;

// ensure the environment is workable for the given settings
if ($this->_fork == true) {
Expand Down Expand Up @@ -88,7 +95,7 @@ public function persist($batch) {
if ($this->_fork) {
return $this->_execute_forked($url, $data);
} else {
return $this->_execute($url, $data);
return $this->_execute($url, $batch);
}
} else {
return true;
Expand All @@ -102,35 +109,57 @@ public function persist($batch) {
* @param $data
* @return bool
*/
protected function _execute($url, $data) {
protected function _execute($url, $batch) {
if ($this->_debug()) {
$this->_log("Making blocking cURL call to $url");
}

$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->_connect_timeout);
curl_setopt($ch, CURLOPT_TIMEOUT, $this->_timeout);
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, $data);
$response = curl_exec($ch);
if (false === $response) {
$curl_error = curl_error($ch);
$curl_errno = curl_errno($ch);
curl_close($ch);
$this->_handleError($curl_errno, $curl_error);
return false;
} else {
curl_close($ch);
if (trim($response) == "1") {
return true;
} else {
$mh = curl_multi_init();
$chs = array();

$batch_size = ceil(count($batch) / $this->_num_threads);
for ($i=0; $i<$this->_num_threads && !empty($batch); $i++) {
$ch = curl_init();
$chs[] = $ch;
$data = "data=" . $this->_encode(array_splice($batch, 0, $batch_size));
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, $this->_connect_timeout);
curl_setopt($ch, CURLOPT_TIMEOUT, $this->_timeout);
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, $data);
curl_multi_add_handle($mh,$ch);
}

do {
curl_multi_exec($mh, $running);
curl_multi_select($mh);
} while ($running > 0);

$info = curl_multi_info_read($mh);

$error = false;
foreach ($chs as $ch) {
$response = curl_multi_getcontent($ch);
if (false === $response) {
$this->_handleError(curl_errno($ch), curl_error($ch));
$error = true;
}
elseif ("1" != trim($response)) {
$this->_handleError(0, $response);
return false;
$error = true;
}
curl_multi_remove_handle($mh, $ch);
}

if (CURLE_OK != $info['result']) {
$this->_handleError($info['result'], "cURL error with code=".$info['result']);
$error = true;
}

curl_multi_close($mh);
return !$error;
}


Expand Down Expand Up @@ -218,4 +247,11 @@ public function getTimeout()
}


/**
* Number of requests/batches that will be processed in parallel using curl_multi_exec.
* @return int
*/
public function getNumThreads() {
return $this->_num_threads;
}
}
4 changes: 3 additions & 1 deletion lib/Producers/MixpanelBaseProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ public function __destruct() {
public function flush($desired_batch_size = 50) {
$queue_size = count($this->_queue);
$succeeded = true;
$num_threads = $this->_consumer->getNumThreads();

if ($this->_debug()) {
$this->_log("Flush called - queue size: ".$queue_size);
}

while($queue_size > 0 && $succeeded) {
$batch_size = min(array($queue_size, $desired_batch_size, $this->_options['max_batch_size']));
$batch_size = min(array($queue_size, $desired_batch_size*$num_threads, $this->_options['max_batch_size']*$num_threads));
$batch = array_splice($this->_queue, 0, $batch_size);
$succeeded = $this->_persist($batch);

Expand Down
2 changes: 2 additions & 0 deletions test/ConsumerStrategies/CurlConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ function callback() { }
"connect_timeout" => 1,
"use_ssl" => true,
"fork" => false,
"num_threads" => 5,
"error_callback" => 'callback'
));

Expand All @@ -75,6 +76,7 @@ function callback() { }
$this->assertEquals($consumer->getTimeout(), 2);
$this->assertEquals($consumer->getConnectTimeout(), 1);
$this->assertEquals($consumer->getProtocol(), "https");
$this->assertEquals($consumer->getNumThreads(), 5);
}

}
Expand Down

0 comments on commit 6f15000

Please sign in to comment.