Skip to content
Browse files

updated iron_mq and iron_worker libs,fixed post message.

  • Loading branch information...
1 parent 421d8c6 commit 3789bc4ca3c9b5494ecafe54352f2c20a82aa0f1 @rkononov rkononov committed Feb 10, 2013
Showing with 822 additions and 531 deletions.
  1. +1 −1 index.php
  2. +2 −1 iw/queueWorker.php
  3. +343 −0 lib/IronCore.class.php
  4. +244 −226 lib/IronMQ.class.php
  5. +1 −0 lib/IronMQWrapper.php
  6. +229 −302 lib/IronWorker.class.php
  7. +1 −0 lib/IronWorkerWrapper.php
  8. +1 −1 mq/postMessage.php
View
2 index.php
@@ -180,7 +180,7 @@
</section>
<footer>
- <a href="http://iron.io" title="Messaging and Background Processing for Cloud Apps"><img src="http://www.iron.io/assets/resources/ironio-badge-red.png" alt="Iron.io Badge" /></a>
+ <a href="http://iron.io" title="Messaging and Background Processing for Cloud Apps"><img src="https://hud.iron.io/assets/ironio-logo-white.svg" alt="Iron.io Badge" /></a>
</footer>
<script>
View
3 iw/queueWorker.php
@@ -5,7 +5,7 @@
$url = $_REQUEST['url'];
$queue_name = $_REQUEST['queue_name'];
$name = "imageWorker.php";
-$tmpdir = $_SERVER['TMP_DIR'];
+/*$tmpdir = $_SERVER['TMP_DIR'];
if (empty($tmpdir)){
$tmpdir = dirname(__FILE__);
}
@@ -14,6 +14,7 @@
$file = IronWorker::zipDirectory(dirname(__FILE__)."/../workers", $zipName, true);
$res = $iw->postCode('imageWorker.php', $zipName, $name);
print_r($res);
+*/
$payload = array(
'iron_mq' => array(
'token' => $config['iron_mq']['token'],
View
343 lib/IronCore.class.php
@@ -0,0 +1,343 @@
+<?php
+/**
+ * Core functionality for Iron.io products
+ *
+ * @link https://github.com/iron-io/iron_core_php
+ * @link http://www.iron.io/
+ * @link http://dev.iron.io/
+ * @version 0.1.2
+ * @package IronCore
+ * @copyright BSD 2-Clause License. See LICENSE file.
+ */
+
+class IronCore{
+ protected $core_version = '0.1.2';
+
+ // should be overridden by child class
+ protected $client_version = null;
+ protected $client_name = null;
+ protected $product_name = null;
+ protected $default_values = null;
+
+ const HTTP_OK = 200;
+ const HTTP_CREATED = 201;
+ const HTTP_ACCEPTED = 202;
+
+ const POST = 'POST';
+ const PUT = 'PUT';
+ const GET = 'GET';
+ const DELETE = 'DELETE';
+
+ const header_accept = "application/json";
+ const header_accept_encoding = "gzip, deflate";
+
+ protected $url;
+ protected $token;
+ protected $api_version;
+ protected $version;
+ protected $project_id;
+ protected $headers;
+ protected $protocol;
+ protected $host;
+ protected $port;
+ protected $curl = null;
+
+ public $max_retries = 5;
+ public $debug_enabled = false;
+ public $ssl_verifypeer = true;
+ public $connection_timeout = 60;
+
+ public function __destruct() {
+ if ($this->curl != null){
+ curl_close($this->curl);
+ }
+ }
+
+ protected static function dateRfc3339($timestamp = 0) {
+ if ($timestamp instanceof DateTime) {
+ $timestamp = $timestamp->getTimestamp();
+ }
+ if (!$timestamp) {
+ $timestamp = time();
+ }
+ return gmdate('c', $timestamp);
+ }
+
+ protected static function json_decode($response){
+ $data = json_decode($response);
+ if (function_exists('json_last_error')){
+ $json_error = json_last_error();
+ if($json_error != JSON_ERROR_NONE) {
+ throw new JSON_Exception($json_error);
+ }
+ }elseif($data === null){
+ throw new JSON_Exception("Common JSON error");
+ }
+ return $data;
+ }
+
+
+ protected static function homeDir(){
+ if ($home_dir = getenv('HOME')){
+ // *NIX
+ return $home_dir.DIRECTORY_SEPARATOR;
+ }else{
+ // Windows
+ return getenv('HOMEDRIVE').getenv('HOMEPATH').DIRECTORY_SEPARATOR;
+ }
+ }
+
+ protected function debug($var_name, $variable){
+ if ($this->debug_enabled){
+ echo "{$var_name}: ".var_export($variable,true)."\n";
+ }
+ }
+
+ protected function userAgent(){
+ return "{$this->client_name}-{$this->client_version} (iron_core-{$this->core_version})";
+ }
+
+ /**
+ * Load configuration
+ *
+ * @param array|string|null $config_file_or_options
+ * array of options or name of config file
+ * @return array
+ * @throws InvalidArgumentException
+ */
+ protected function getConfigData($config_file_or_options){
+ if(is_string($config_file_or_options)){
+ if (!file_exists($config_file_or_options)){
+ throw new InvalidArgumentException("Config file $config_file_or_options not found");
+ }
+ $this->loadConfigFile($config_file_or_options);
+ }elseif(is_array($config_file_or_options)){
+ $this->loadFromHash($config_file_or_options);
+ }
+
+ $this->loadConfigFile('iron.ini');
+ $this->loadConfigFile('iron.json');
+
+ $this->loadFromEnv(strtoupper($this->product_name));
+ $this->loadFromEnv('IRON');
+
+ $this->loadConfigFile(self::homeDir() . '.iron.ini');
+ $this->loadConfigFile(self::homeDir() . '.iron.json');
+
+ $this->loadFromHash($this->default_values);
+
+ if (empty($this->token) || empty($this->project_id)){
+ throw new InvalidArgumentException("token or project_id not found in any of the available sources");
+ }
+ }
+
+
+ protected function loadFromHash($options){
+ if (empty($options)) return;
+ $this->setVarIfValue('token', $options);
+ $this->setVarIfValue('project_id', $options);
+ $this->setVarIfValue('protocol', $options);
+ $this->setVarIfValue('host', $options);
+ $this->setVarIfValue('port', $options);
+ $this->setVarIfValue('api_version', $options);
+ }
+
+ protected function loadFromEnv($prefix){
+ $this->setVarIfValue('token', getenv($prefix. "_TOKEN"));
+ $this->setVarIfValue('project_id', getenv($prefix. "_PROJECT_ID"));
+ $this->setVarIfValue('protocol', getenv($prefix. "_SCHEME"));
+ $this->setVarIfValue('host', getenv($prefix. "_HOST"));
+ $this->setVarIfValue('port', getenv($prefix. "_PORT"));
+ $this->setVarIfValue('api_version', getenv($prefix. "_API_VERSION"));
+ }
+
+ protected function setVarIfValue($key, $options_or_value){
+ if (!empty($this->$key)) return;
+ if (is_array($options_or_value)){
+ if (!empty($options_or_value[$key])){
+ $this->$key = $options_or_value[$key];
+ }
+ }else{
+ if (!empty($options_or_value)){
+ $this->$key = $options_or_value;
+ }
+ }
+ }
+
+ protected function loadConfigFile($file){
+ if (!file_exists($file)) return;
+ $data = @parse_ini_file($file, true);
+ if ($data === false){
+ $data = json_decode(file_get_contents($file), true);
+ }
+ if (!is_array($data)){
+ throw new InvalidArgumentException("Config file $file not parsed");
+ };
+
+ if (!empty($data[$this->product_name])) $this->loadFromHash($data[$this->product_name]);
+ if (!empty($data['iron'])) $this->loadFromHash($data['iron']);
+ $this->loadFromHash($data);
+ }
+
+ protected function apiCall($type, $url, $params = array(), $raw_post_data = null){
+ $url = "{$this->url}$url";
+
+ if ($this->curl == null) $this->curl = curl_init();
+
+ if (! isset($params['oauth'])) {
+ $params['oauth'] = $this->token;
+ }
+ switch ($type) {
+ case self::DELETE:
+ curl_setopt($this->curl, CURLOPT_URL, $url);
+ curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, self::DELETE);
+ curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
+ break;
+ case self::PUT:
+ curl_setopt($this->curl, CURLOPT_URL, $url);
+ curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, self::PUT);
+ curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
+ break;
+ case self::POST:
+ curl_setopt($this->curl, CURLOPT_URL, $url);
+ curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, self::POST);
+ curl_setopt($this->curl, CURLOPT_POST, true);
+ if ($raw_post_data){
+ curl_setopt($this->curl, CURLOPT_POSTFIELDS, $raw_post_data);
+ }else{
+ curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
+ }
+ break;
+ case self::GET:
+ curl_setopt($this->curl, CURLOPT_POSTFIELDS, null);
+ curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, self::GET);
+ curl_setopt($this->curl, CURLOPT_HTTPGET, true);
+ $url .= '?' . http_build_query($params);
+ curl_setopt($this->curl, CURLOPT_URL, $url);
+ break;
+ }
+ $this->debug("API $type", $url);
+ curl_setopt($this->curl, CURLOPT_SSL_VERIFYPEER, $this->ssl_verifypeer);
+ curl_setopt($this->curl, CURLOPT_RETURNTRANSFER, true);
+ curl_setopt($this->curl, CURLOPT_HTTPHEADER, $this->compiledHeaders());
+ curl_setopt($this->curl, CURLOPT_CONNECTTIMEOUT, $this->connection_timeout);
+ return $this->callWithRetries();
+ }
+
+ protected function callWithRetries(){
+ for ($retry = 0; $retry < $this->max_retries; $retry++){
+ $_out = curl_exec($this->curl);
+ if($_out === false) {
+ $this->reportHttpError(0, curl_error($this->curl));
+ }
+ $status = curl_getinfo($this->curl, CURLINFO_HTTP_CODE);
+ switch ($status) {
+ case self::HTTP_OK:
+ case self::HTTP_CREATED:
+ case self::HTTP_ACCEPTED:
+ return $_out;
+ case Http_Exception::INTERNAL_ERROR:
+ if (strpos($_out, "EOF") !== false){
+ self::waitRandomInterval($retry);
+ }else{
+ $this->reportHttpError($status, $_out);
+ }
+ break;
+ case Http_Exception::SERVICE_UNAVAILABLE:
+ self::waitRandomInterval($retry);
+ break;
+ default:
+ $this->reportHttpError($status, $_out);
+ }
+ }
+ $this->reportHttpError(503, "Service unavailable");
+ return null;
+ }
+
+ protected function reportHttpError($status, $text){
+ throw new Http_Exception("http error: {$status} | {$text}", $status);
+ }
+
+ /**
+ * Wait for a random time between 0 and (4^currentRetry * 100) milliseconds
+ *
+ * @static
+ * @param int $retry currentRetry number
+ */
+ protected static function waitRandomInterval($retry){
+ $max_delay = pow(4, $retry)*100*1000;
+ usleep(rand(0, $max_delay));
+ }
+
+ protected function compiledHeaders(){
+ # Set default headers if no headers set.
+ if ($this->headers == null){
+ $this->setCommonHeaders();
+ }
+
+ $headers = array();
+ foreach ($this->headers as $k => $v){
+ $headers[] = "$k: $v";
+ }
+ return $headers;
+ }
+
+ protected function setCommonHeaders(){
+ $this->headers = array(
+ 'Authorization' => "OAuth {$this->token}",
+ 'User-Agent' => $this->userAgent(),
+ 'Content-Type' => 'application/json',
+ 'Accept' => self::header_accept,
+ 'Accept-Encoding' => self::header_accept_encoding,
+ 'Connection' => 'Keep-Alive',
+ 'Keep-Alive' => '300'
+ );
+ }
+
+}
+
+/**
+ * The Http_Exception class represents an HTTP response status that is not 200 OK.
+ */
+class Http_Exception extends Exception{
+ const NOT_MODIFIED = 304;
+ const BAD_REQUEST = 400;
+ const NOT_FOUND = 404;
+ const NOT_ALLOWED = 405;
+ const CONFLICT = 409;
+ const PRECONDITION_FAILED = 412;
+ const INTERNAL_ERROR = 500;
+ const SERVICE_UNAVAILABLE = 503;
+}
+
+/**
+ * The JSON_Exception class represents an failures of decoding json strings.
+ */
+class JSON_Exception extends Exception {
+ public $error = null;
+ public $error_code = JSON_ERROR_NONE;
+
+ function __construct($error_code) {
+ $this->error_code = $error_code;
+ switch($error_code) {
+ case JSON_ERROR_DEPTH:
+ $this->error = 'Maximum stack depth exceeded.';
+ break;
+ case JSON_ERROR_CTRL_CHAR:
+ $this->error = "Unexpected control characted found.";
+ break;
+ case JSON_ERROR_SYNTAX:
+ $this->error = "Syntax error, malformed JSON";
+ break;
+ default:
+ $this->error = $error_code;
+ break;
+
+ }
+ parent::__construct();
+ }
+
+ function __toString() {
+ return $this->error;
+ }
+}
View
470 lib/IronMQ.class.php
@@ -5,57 +5,17 @@
*
* @link https://github.com/iron-io/iron_mq_php
* @link http://www.iron.io/products/mq
- * @link http://docs.iron.io/
- * @version 1.0
+ * @link http://dev.iron.io/
+ * @version 1.4.2
* @package IronMQPHP
* @copyright Feel free to copy, steal, take credit for, or whatever you feel like doing with this code. ;)
*/
-/**
- * The Http_Exception class represents an HTTP response status that is not 200 OK.
- */
-class Http_Exception extends Exception{
- const NOT_MODIFIED = 304;
- const BAD_REQUEST = 400;
- const NOT_FOUND = 404;
- const NOT_ALOWED = 405;
- const CONFLICT = 409;
- const PRECONDITION_FAILED = 412;
- const INTERNAL_ERROR = 500;
-}
class IronMQ_Exception extends Exception{
}
-/**
- * The JSON_Exception class represents an failures of decoding json strings.
- */
-class JSON_Exception extends Exception {
- public $error = null;
- public $error_code = JSON_ERROR_NONE;
-
- function __construct($error_code) {
- $this->error_code = $error_code;
- switch($error_code) {
- case JSON_ERROR_DEPTH:
- $this->error = 'Maximum stack depth exceeded.';
- break;
- case JSON_ERROR_CTRL_CHAR:
- $this->error = "Unexpected control characted found.";
- break;
- case JSON_ERROR_SYNTAX:
- $this->error = "Syntax error, malformed JSON";
- break;
- }
- parent::__construct();
- }
-
- function __toString() {
- return $this->error;
- }
-}
-
class IronMQ_Message {
private $body;
@@ -68,30 +28,26 @@ class IronMQ_Message {
/**
* Create a new message.
*
- * @param array|string $message
- * An array of message properties or a string of the message body.
- * Fields in message array:
- * Required:
- * - body: The message data, as a string.
- * Optional:
+ * @param string $message
+ * A message body
+ * @param array $properties
+ * An array of message properties
+ * Fields in $properties array:
* - timeout: Timeout, in seconds. After timeout, item will be placed back on queue. Defaults to 60.
* - delay: The item will not be available on the queue until this many seconds have passed. Defaults to 0.
* - expires_in: How long, in seconds, to keep the item on the queue before it is deleted. Defaults to 604800 (7 days). Maximum is 2592000 (30 days).
*/
- function __construct($message) {
- if(is_string($message)) {
- $this->setBody($message);
- } elseif(is_array($message)) {
- $this->setBody($message['body']);
- if(array_key_exists("timeout", $message)) {
- $this->setTimeout($message['timeout']);
- }
- if(array_key_exists("delay", $message)) {
- $this->setDelay($message['delay']);
- }
- if(array_key_exists("expires_in", $message)) {
- $this->setExpiresIn($message['expires_in']);
- }
+ function __construct($message, $properties = array()) {
+ $this->setBody($message);
+
+ if(array_key_exists("timeout", $properties)) {
+ $this->setTimeout($properties['timeout']);
+ }
+ if(array_key_exists("delay", $properties)) {
+ $this->setDelay($properties['delay']);
+ }
+ if(array_key_exists("expires_in", $properties)) {
+ $this->setExpiresIn($properties['expires_in']);
}
}
@@ -103,7 +59,7 @@ public function setBody($body) {
if(empty($body)) {
throw new InvalidArgumentException("Please specify a body");
} else {
- $this->body = $body;
+ $this->body = (string) $body;
}
}
@@ -159,35 +115,20 @@ public function asArray() {
}
}
-class IronMQ{
+class IronMQ extends IronCore {
- //Header Constants
- const header_user_agent = "IronMQ PHP v0.1";
- const header_accept = "application/json";
- const header_accept_encoding = "gzip, deflate";
- const HTTP_OK = 200;
- const HTTP_CREATED = 201;
- const HTTP_ACEPTED = 202;
-
- const POST = 'POST';
- const GET = 'GET';
- const DELETE = 'DELETE';
-
- public $debug_enabled = false;
-
- private $required_config_fields = array('token','project_id');
- private $default_values = array(
- 'protocol' => 'http',
+ protected $client_version = '1.4.2';
+ protected $client_name = 'iron_mq_php';
+ protected $product_name = 'iron_mq';
+ protected $default_values = array(
+ 'protocol' => 'https',
'host' => 'mq-aws-us-east-1.iron.io',
- 'port' => '80',
+ 'port' => '443',
'api_version' => '1',
);
- private $url;
- private $token;
- private $api_version;
- private $version;
- private $project_id;
+ const LIST_QUEUES_PER_PAGE = 30;
+ const GET_MESSAGE_TIMEOUT = 60;
/**
* @param string|array $config_file_or_options
@@ -203,27 +144,15 @@ class IronMQ{
* - port
* - api_version
*/
- function __construct($config_file_or_options){
- $config = $this->getConfigData($config_file_or_options);
- $token = $config['token'];
- $project_id = $config['project_id'];
-
- $protocol = empty($config['protocol']) ? $this->default_values['protocol'] : $config['protocol'];
- $host = empty($config['host']) ? $this->default_values['host'] : $config['host'];
- $port = empty($config['port']) ? $this->default_values['port'] : $config['port'];
- $api_version = empty($config['api_version'])? $this->default_values['api_version'] : $config['api_version'];
-
- $this->url = "$protocol://$host:$port/$api_version/";
- $this->token = $token;
- $this->api_version = $api_version;
- $this->version = $api_version;
- $this->project_id = $project_id;
+ function __construct($config_file_or_options = null){
+ $this->getConfigData($config_file_or_options);
+ $this->url = "{$this->protocol}://{$this->host}:{$this->port}/{$this->api_version}/";
}
/**
* Switch active project
*
- * string @param $project_id Project ID
+ * @param string $project_id Project ID
* @throws InvalidArgumentException
*/
public function setProjectId($project_id) {
@@ -235,11 +164,22 @@ public function setProjectId($project_id) {
}
}
- public function getQueues($page = 0){
+ /**
+ * Get list of message queues
+ *
+ * @param int $page
+ * Zero-indexed page to view
+ * @param int $per_page
+ * Number of queues per page
+ */
+ public function getQueues($page = 0, $per_page = self::LIST_QUEUES_PER_PAGE) {
$url = "projects/{$this->project_id}/queues";
$params = array();
- if($page > 0) {
- $params['page'] = $page;
+ if($page !== 0) {
+ $params['page'] = (int) $page;
+ }
+ if($per_page !== self::LIST_QUEUES_PER_PAGE) {
+ $params['per_page'] = (int) $per_page;
}
$this->setJsonHeaders();
return self::json_decode($this->apiCall(self::GET, $url, $params));
@@ -253,189 +193,282 @@ public function getQueues($page = 0){
* @return mixed
*/
public function getQueue($queue_name) {
- $url = "projects/{$this->project_id}/queues/{$queue_name}";
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue";
$this->setJsonHeaders();
return self::json_decode($this->apiCall(self::GET, $url));
}
/**
+ * Clear all messages from queue.
+ *
+ * @param string $queue_name
+ * @return mixed
+ */
+ public function clearQueue($queue_name) {
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue/clear";
+ $this->setJsonHeaders();
+ return self::json_decode($this->apiCall(self::POST, $url));
+ }
+
+ /**
* Push a message on the queue
*
* Examples:
* <code>
* $ironmq->postMessage("test_queue", "Hello world");
* </code>
* <code>
- * $ironmq->postMessage("test_queue", array(
- * "body" => "Test Message"
- * "timeout" => 120,
+ * $ironmq->postMessage("test_queue", "Test Message", array(
+ * 'timeout' => 120,
* 'delay' => 2,
* 'expires_in' => 2*24*3600 # 2 days
* ));
* </code>
*
* @param string $queue_name Name of the queue.
- * @param array|string $message
+ * @param string $message
+ * @param array $properties
* @return mixed
*/
- public function postMessage($queue_name, $message) {
- $msg = new IronMQ_Message($message);
+ public function postMessage($queue_name, $message, $properties = array()) {
+ $msg = new IronMQ_Message($message, $properties);
$req = array(
"messages" => array($msg->asArray())
);
$this->setCommonHeaders();
- $url = "projects/{$this->project_id}/queues/{$queue_name}/messages";
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue/messages";
$res = $this->apiCall(self::POST, $url, $req);
- return self::json_decode($res);
+ $decoded = self::json_decode($res);
+ $decoded->id = $decoded->ids[0];
+ return $decoded;
}
/**
* Push multiple messages on the queue
*
+ * Example:
+ * <code>
+ * $ironmq->postMessages("test_queue", array("Lorem", "Ipsum"), array(
+ * 'timeout' => 120,
+ * 'delay' => 2,
+ * 'expires_in' => 2*24*3600 # 2 days
+ * ));
+ * </code>
+ *
* @param string $queue_name Name of the queue.
* @param array $messages array of messages, each message same as for postMessage() method
+ * @param array $properties array of message properties, applied to each message in $messages
* @return mixed
*/
- public function postMessages($queue_name, $messages) {
+ public function postMessages($queue_name, $messages, $properties = array()) {
$req = array(
"messages" => array()
);
foreach($messages as $message) {
- $msg = new IronMQ_Message($message);
+ $msg = new IronMQ_Message($message, $properties);
array_push($req['messages'], $msg->asArray());
}
$this->setCommonHeaders();
- $url = "projects/{$this->project_id}/queues/{$queue_name}/messages";
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue/messages";
$res = $this->apiCall(self::POST, $url, $req);
return self::json_decode($res);
}
- public function getMessages($queue_name, $count=1) {
- $url = "projects/{$this->project_id}/queues/{$queue_name}/messages";
+ /**
+ * Get multiplie messages from queue
+ *
+ * @param string $queue_name Queue name
+ * @param int $count
+ * @param int $timeout
+ * @return array|null array of messages or null
+ */
+ public function getMessages($queue_name, $count = 1, $timeout = self::GET_MESSAGE_TIMEOUT) {
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue/messages";
$params = array();
- if($count > 1) {
- $params['count'] = $count;
+ if($count !== 1) {
+ $params['n'] = (int) $count;
+ }
+ if($timeout !== self::GET_MESSAGE_TIMEOUT) {
+ $params['timeout'] = (int) $timeout;
}
$this->setJsonHeaders();
$response = $this->apiCall(self::GET, $url, $params);
$result = self::json_decode($response);
if(count($result->messages) < 1) {
return null;
} else {
- return $result;
+ return $result->messages;
}
}
- public function getMessage($queue_name) {
- return $this->getMessages($queue_name, 1);
+ /**
+ * Get single message from queue
+ *
+ * @param string $queue_name Queue name
+ * @param int $timeout
+ * @return mixed|null single message or null
+ */
+ public function getMessage($queue_name, $timeout = self::GET_MESSAGE_TIMEOUT) {
+ $messages = $this->getMessages($queue_name, 1, $timeout);
+ if ($messages){
+ return $messages[0];
+ }else{
+ return null;
+ }
}
+ /**
+ * Delete a Message from a Queue
+ * This call will delete the message. Be sure you call this after you’re done with a message or it will be placed back on the queue.
+ *
+ * @param $queue_name
+ * @param $message_id
+ * @return mixed
+ */
public function deleteMessage($queue_name, $message_id) {
$this->setCommonHeaders();
- $url = "projects/{$this->project_id}/queues/{$queue_name}/messages/{$message_id}";
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue/messages/{$message_id}";
return $this->apiCall(self::DELETE, $url);
}
- /* PRIVATE FUNCTIONS */
-
- private function compiledHeaders(){
-
- # Set default headers if no headers set.
- if ($this->headers == null){
- $this->setCommonHeaders();
+ /**
+ * Peek Messages on a Queue
+ * Peeking at a queue returns the next messages on the queue, but it does not reserve them.
+ *
+ * @param string $queue_name
+ * @return object|null message or null if queue is empty
+ */
+ public function peekMessage($queue_name) {
+ $messages = $this->peekMessages($queue_name, 1);
+ if ($messages == null) {
+ return null;
+ } else {
+ return $messages[0];
}
+ }
- $headers = array();
- foreach ($this->headers as $k => $v){
- $headers[] = "$k: $v";
+ /**
+ * Peek Messages on a Queue
+ * Peeking at a queue returns the next messages on the queue, but it does not reserve them.
+ *
+ * @param string $queue_name
+ * @param int $count The maximum number of messages to peek. Maximum is 100.
+ * @return array|null array of messages or null if queue is empty
+ */
+ public function peekMessages($queue_name, $count) {
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue/messages/peek";
+ $params = array();
+ if($count !== 1) {
+ $params['n'] = (int) $count;
}
- return $headers;
+ $this->setJsonHeaders();
+ $response = self::json_decode($this->apiCall(self::GET, $url, $params));
+ return $response->messages;
}
- private function apiCall($type, $url, $params = array()){
- $url = "{$this->url}$url";
-
- $s = curl_init();
- if (! isset($params['oauth'])) {
- $params['oauth'] = $this->token;
- }
- switch ($type) {
- case self::DELETE:
- $fullUrl = $url . '?' . http_build_query($params);
- $this->debug('apiCall fullUrl', $fullUrl);
- curl_setopt($s, CURLOPT_URL, $fullUrl);
- curl_setopt($s, CURLOPT_CUSTOMREQUEST, self::DELETE);
- break;
- case self::POST:
- $this->debug('apiCall url', $url);
- curl_setopt($s, CURLOPT_URL, $url);
- curl_setopt($s, CURLOPT_POST, true);
- curl_setopt($s, CURLOPT_POSTFIELDS, json_encode($params));
- break;
- case self::GET:
- $fullUrl = $url . '?' . http_build_query($params);
- $this->debug('apiCall fullUrl', $fullUrl);
- curl_setopt($s, CURLOPT_URL, $fullUrl);
- break;
- }
+ /**
+ * Touch a Message on a Queue
+ * Touching a reserved message extends its timeout by the duration specified when the message was created, which is 60 seconds by default.
+ *
+ * @param string $queue_name
+ * @param string $message_id
+ * @return mixed
+ */
+ public function touchMessage($queue_name, $message_id) {
+ $this->setJsonHeaders();
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue/messages/{$message_id}/touch";
+ return self::json_decode($this->apiCall(self::POST, $url));
+ }
- curl_setopt($s, CURLOPT_RETURNTRANSFER, true);
- curl_setopt($s, CURLOPT_HTTPHEADER, $this->compiledHeaders());
- $_out = curl_exec($s);
- $status = curl_getinfo($s, CURLINFO_HTTP_CODE);
- curl_close($s);
- switch ($status) {
- case self::HTTP_OK:
- case self::HTTP_CREATED:
- case self::HTTP_ACEPTED:
- $out = $_out;
- break;
- default:
- throw new Http_Exception("http error: {$status} | {$_out}", $status);
- }
- return $out;
+ /**
+ * Release a Message on a Queue
+ * Releasing a reserved message unreserves the message and puts it back on the queue as if the message had timed out.
+ *
+ * @param string $queue_name
+ * @param string $message_id
+ * @return mixed
+ */
+ public function releaseMessage($queue_name, $message_id) {
+ $this->setJsonHeaders();
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue/messages/{$message_id}/release";
+ return self::json_decode($this->apiCall(self::POST, $url));
}
+ /**
+ * Updates the queue object
+ *
+ * @param string $queue_name
+ * @param array $options Parameters to change. keys:
+ * - "subscribers" url's to subscribe to
+ * - "push_type" multicast (default) or unicast.
+ * - "retries" Number of retries. 3 by default
+ * - "retries_delay" Delay between retries. 60 (seconds) by default
+ */
+ public function updateQueue($queue_name, $options) {
+ $this->setJsonHeaders();
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue";
+ return self::json_decode($this->apiCall(self::POST, $url, $options));
+ }
/**
- * @param array|string $config_file_or_options
- * array of options or name of config file
- * @return array
- * @throws InvalidArgumentException
+ * Add Subscriber to a Queue
+ *
+ * Example:
+ * <code>
+ * $ironmq->addSubscriber("test_queue", array("url" => "http://example.com"));
+ * </code>
+ *
+ * @param string $queue_name
+ * @param array $subscriber_hash Subscriber. keys:
+ * - "url" Subscriber url
+ * @return mixed
*/
- private function getConfigData($config_file_or_options){
- if (is_string($config_file_or_options)){
- $ini = parse_ini_file($config_file_or_options, true);
- if ($ini === false){
- throw new InvalidArgumentException("Config file $config_file_or_options not found");
- }
- if (empty($ini['iron_mq'])){
- throw new InvalidArgumentException("Config file $config_file_or_options has no section 'iron_mq'");
- }
- $config = $ini['iron_mq'];
- }elseif(is_array($config_file_or_options)){
- $config = $config_file_or_options;
- }else{
- throw new InvalidArgumentException("Wrong parameter type");
- }
- foreach ($this->required_config_fields as $field){
- if (empty($config[$field])){
- throw new InvalidArgumentException("Required config key missing: '$field'");
- }
- }
- return $config;
+ public function addSubscriber($queue_name, $subscriber_hash) {
+ $this->setJsonHeaders();
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue/subscribers";
+ $options = array(
+ 'subscribers' => array($subscriber_hash)
+ );
+ return self::json_decode($this->apiCall(self::POST, $url, $options));
}
- private function setCommonHeaders(){
- $this->headers = array(
- 'Authorization' => "OAuth {$this->token}",
- 'User-Agent' => self::header_user_agent,
- 'Content-Type' => 'application/json',
- 'Accept' => self::header_accept,
- 'Accept-Encoding' => self::header_accept_encoding
+ /**
+ * Remove Subscriber from a Queue
+ *
+ * Example:
+ * <code>
+ * $ironmq->removeSubscriber("test_queue", array("url" => "http://example.com"));
+ * </code>
+ *
+ * @param string $queue_name
+ * @param array $subscriber_hash Subscriber. keys:
+ * - "url" Subscriber url
+ * @return mixed
+ */
+ public function removeSubscriber($queue_name, $subscriber_hash) {
+ $this->setJsonHeaders();
+ $queue = rawurlencode($queue_name);
+ $url = "projects/{$this->project_id}/queues/$queue/subscribers";
+ $options = array(
+ 'subscribers' => array($subscriber_hash)
);
+ return self::json_decode($this->apiCall(self::DELETE, $url, $options));
}
+
+ /* PRIVATE FUNCTIONS */
+
private function setJsonHeaders(){
$this->setCommonHeaders();
}
@@ -445,19 +478,4 @@ private function setPostHeaders(){
$this->headers['Content-Type'] ='multipart/form-data';
}
- private function debug($var_name, $variable){
- if ($this->debug_enabled){
- echo "{$var_name}: ".var_export($variable,true)."\n";
- }
- }
-
- private static function json_decode($response){
- $data = json_decode($response);
- $json_error = json_last_error();
- if($json_error != JSON_ERROR_NONE) {
- throw new JSON_Exception($json_error);
- }
- return $data;
- }
-
-}
+}
View
1 lib/IronMQWrapper.php
@@ -1,4 +1,5 @@
<?php
+include("IronCore.class.php");
include("IronMQ.class.php");
$ironmq = new IronMQ(__DIR__.'/../config.ini');
?>
View
531 lib/IronWorker.class.php
@@ -5,95 +5,36 @@
*
* @link https://github.com/iron-io/iron_worker_php
* @link http://www.iron.io/
- * @link http://docs.iron.io/
- * @version 1.0
+ * @link http://dev.iron.io/
+ * @version 1.3.7
* @package IronWorkerPHP
* @copyright Feel free to copy, steal, take credit for, or whatever you feel like doing with this code. ;)
*/
/**
- * The Http_Exception class represents an HTTP response status that is not 200 OK.
+ * IronWorker internal exceptions representation
*/
-class Http_Exception extends Exception{
- const NOT_MODIFIED = 304;
- const BAD_REQUEST = 400;
- const NOT_FOUND = 404;
- const NOT_ALOWED = 405;
- const CONFLICT = 409;
- const PRECONDITION_FAILED = 412;
- const INTERNAL_ERROR = 500;
-}
-
-/**
- * The JSON_Exception class represents an failures of decoding json strings.
- */
-class JSON_Exception extends Exception {
- public $error = null;
- public $error_code = JSON_ERROR_NONE;
-
- function __construct($error_code) {
- $this->error_code = $error_code;
- switch($error_code) {
- case JSON_ERROR_DEPTH:
- $this->error = 'Maximum stack depth exceeded.';
- break;
- case JSON_ERROR_CTRL_CHAR:
- $this->error = "Unexpected control characted found.";
- break;
- case JSON_ERROR_SYNTAX:
- $this->error = "Syntax error, malformed JSON";
- break;
- }
- parent::__construct();
- }
-
- function __toString() {
- return $this->error;
- }
-}
-
-
class IronWorker_Exception extends Exception{
}
-
/**
* Class that wraps IronWorker API calls.
*/
-class IronWorker{
-
- //Header Constants
- const header_user_agent = "IronWorker PHP v0.1";
- const header_accept = "application/json";
- const header_accept_encoding = "gzip, deflate";
- const HTTP_OK = 200;
- const HTTP_CREATED = 201;
- const HTTP_ACEPTED = 202;
-
- const POST = 'POST';
- const GET = 'GET';
- const DELETE = 'DELETE';
+class IronWorker extends IronCore{
- public $debug_enabled = false;
-
- private $required_config_fields = array('token','project_id');
- private $default_values = array(
- 'protocol' => 'http',
+ protected $client_version = '1.3.7';
+ protected $client_name = 'iron_worker_php';
+ protected $product_name = 'iron_worker';
+ protected $default_values = array(
+ 'protocol' => 'https',
'host' => 'worker-aws-us-east-1.iron.io',
- 'port' => '80',
+ 'port' => '443',
'api_version' => '2',
);
- private $url;
- private $token;
- private $api_version;
- private $version;
- private $project_id;
- private $headers;
-
/**
- * @param string|array $config_file_or_options
+ * @param string|array|null $config_file_or_options
* Array of options or name of config file.
* Fields in options array or in config:
*
@@ -105,22 +46,50 @@ class IronWorker{
* - host
* - port
* - api_version
+ *
+ * Configuration data will be searched in this locations:
+ * 1. passed to class constructor
+ * 2a. config file iron.ini in current directory
+ * 2b. config file iron.json in current directory
+ * 3a. environment variables IRON_WORKER_TOKEN and others
+ * 3b. environment variables IRON_TOKEN and others
+ * 4a. config file ~/.iron.ini in user home dir
+ * 4b. config file ~/.iron.json in user home dir
+ *
*/
- function __construct($config_file_or_options){
- $config = $this->getConfigData($config_file_or_options);
- $token = $config['token'];
- $project_id = $config['project_id'];
-
- $protocol = empty($config['protocol']) ? $this->default_values['protocol'] : $config['protocol'];
- $host = empty($config['host']) ? $this->default_values['host'] : $config['host'];
- $port = empty($config['port']) ? $this->default_values['port'] : $config['port'];
- $api_version = empty($config['api_version'])? $this->default_values['api_version'] : $config['api_version'];
+ function __construct($config_file_or_options = null){
+ $this->getConfigData($config_file_or_options);
+ $this->url = "{$this->protocol}://{$this->host}:{$this->port}/{$this->api_version}/";
+ }
- $this->url = "$protocol://$host:$port/$api_version/";
- $this->token = $token;
- $this->api_version = $api_version;
- $this->version = $api_version;
- $this->project_id = $project_id;
+ /**
+ * Zips and uploads your code
+ *
+ * Shortcut for zipDirectory() + postCode()
+ *
+ * @param string $directory Directory with worker files
+ * @param string $run_filename This file will be launched as worker
+ * @param string $code_name Referenceable (unique) name for your worker
+ * @param array $options Optional parameters:
+ * - "max_concurrency" The maximum number of tasks that should be run in parallel.
+ * - "retries" The number of auto-retries of failed task.
+ * - "retries_delay" Delay in seconds between retries.
+ * @return bool Result of operation
+ * @throws Exception
+ */
+ public function upload($directory, $run_filename, $code_name, $options = array()){
+ $temp_file = tempnam(sys_get_temp_dir(), 'iron_worker_php');
+ if (!self::zipDirectory($directory, $temp_file, true)){
+ unlink($temp_file);
+ return false;
+ }
+ try{
+ $this->postCode($run_filename, $temp_file, $code_name, $options);
+ }catch(Exception $e){
+ unlink($temp_file);
+ throw $e;
+ }
+ return true;
}
/**
@@ -210,10 +179,26 @@ public function getProjects(){
return $projects->projects;
}
- public function getTasks(){
+ /**
+ * List Tasks
+ *
+ * @param int $page Page. Default is 0, maximum is 100.
+ * @param int $per_page The number of tasks to return per page. Default is 30, maximum is 100.
+ * @param array $options Optional URL Parameters
+ * Filter by Status: the parameters queued, running, complete, error, cancelled, killed, and timeout will all filter by their respective status when given a value of 1. These parameters can be mixed and matched to return tasks that fall into any of the status filters. If no filters are provided, tasks will be displayed across all statuses.
+ * - "from_time" Limit the retrieved tasks to only those that were created after the time specified in the value. Time should be formatted as the number of seconds since the Unix epoch.
+ * - "to_time" Limit the retrieved tasks to only those that were created before the time specified in the value. Time should be formatted as the number of seconds since the Unix epoch.
+ * @return mixed
+ */
+ public function getTasks($page = 0, $per_page = 30, $options = array()){
$url = "projects/{$this->project_id}/tasks";
$this->setJsonHeaders();
- $task = self::json_decode($this->apiCall(self::GET, $url));
+ $params = array(
+ 'page' => $page,
+ 'per_page' => $per_page
+ );
+ $params = array_merge($options, $params);
+ $task = self::json_decode($this->apiCall(self::GET, $url, $params));
return $task->tasks;
}
@@ -223,10 +208,14 @@ public function getProjectDetails(){
return json_decode($this->apiCall(self::GET, $url));
}
- public function getCodes(){
- $this->setJsonHeaders();
+ public function getCodes($page = 0, $per_page = 30){
$url = "projects/{$this->project_id}/codes";
- $codes = self::json_decode($this->apiCall(self::GET, $url));
+ $this->setJsonHeaders();
+ $params = array(
+ 'page' => $page,
+ 'per_page' => $per_page
+ );
+ $codes = self::json_decode($this->apiCall(self::GET, $url, $params));
return $codes->codes;
}
@@ -245,29 +234,34 @@ public function getCodeDetails($code_id){
* @param string $filename This file will be launched as worker
* @param string $zipFilename zip file containing code to execute
* @param string $name referenceable (unique) name for your worker
+ * @param array $options Optional parameters:
+ * - "max_concurrency" The maximum number of tasks that should be run in parallel.
+ * - "retries" The number of auto-retries of failed task.
+ * - "retries_delay" Delay in seconds between retries.
* @return mixed
*/
- public function postCode($filename, $zipFilename, $name){
+ public function postCode($filename, $zipFilename, $name, $options = array()){
// Add IronWorker functions to the uploaded worker
- $this->addHeaderToArchive($zipFilename, $filename);
+ $this->addRunnerToArchive($zipFilename, $filename);
$this->setPostHeaders();
$ts = time();
$runtime_type = $this->runtimeFileType($filename);
$sendingData = array(
- "code_name" => $name,
- "name" => $name,
+ "code_name" => $name,
+ "name" => $name,
"standalone" => True,
- "runtime" => $runtime_type,
- "file_name" => $filename,
- "version" => $this->version,
- "timestamp" => $ts,
- "oauth" => $this->token,
+ "runtime" => $runtime_type,
+ "file_name" => "runner.php",
+ "version" => $this->version,
+ "timestamp" => $ts,
+ "oauth" => $this->token,
"class_name" => $name,
- "options" => array(),
+ "options" => array(),
"access_key" => $name
);
+ $sendingData = array_merge($sendingData, $options);
$url = "projects/{$this->project_id}/codes";
$post = array(
"data" => json_encode($sendingData),
@@ -289,18 +283,24 @@ public function deleteSchedule($schedule_id){
'schedule_id' => $schedule_id
);
- return $this->apiCall(self::POST, $url, $request);
+ return self::json_decode($this->apiCall(self::POST, $url, $request));
}
/**
* Get information about all schedules for project
*
+ * @param int $page
+ * @param int $per_page
* @return mixed
*/
- public function getSchedules(){
- $this->setJsonHeaders();
+ public function getSchedules($page = 0, $per_page = 30){
$url = "projects/{$this->project_id}/schedules";
- $schedules = self::json_decode($this->apiCall(self::GET, $url));
+ $this->setJsonHeaders();
+ $params = array(
+ 'page' => $page,
+ 'per_page' => $per_page
+ );
+ $schedules = self::json_decode($this->apiCall(self::GET, $url, $params));
return $schedules->schedules;
}
@@ -336,13 +336,13 @@ public function postScheduleSimple($name, $payload = array(), $delay = 1){
/**
* Schedules task
*
- * @param string $name Package name
- * @param array $payload Payload for task
- * @param int $start_at Time of first run in unix timestamp format. Example: time()+2*60
- * @param int $run_every Time in seconds between runs. If omitted, task will only run once.
- * @param int $end_at Time tasks will stop being enqueued in unix timestamp format.
- * @param int $run_times Number of times to run task.
- * @param int $priority Priority queue to run the job in (0, 1, 2). p0 is default.
+ * @param string $name Package name
+ * @param array $payload Payload for task
+ * @param int|DateTime $start_at Time of first run in unix timestamp format or as DateTime instance. Example: time()+2*60
+ * @param int $run_every Time in seconds between runs. If omitted, task will only run once.
+ * @param int|DateTime $end_at Time tasks will stop being enqueued in unix timestamp or as DateTime instance format.
+ * @param int $run_times Number of times to run task.
+ * @param int $priority Priority queue to run the job in (0, 1, 2). p0 is default.
* @return string Created Schedule id
*/
public function postScheduleAdvanced($name, $payload = array(), $start_at, $run_every = null, $end_at = null, $run_times = null, $priority = null){
@@ -439,23 +439,34 @@ public function deleteTask($task_id){
return $this->cancelTask($task_id);
}
- public function setTaskProgress($task_id, $percent, $msg = ''){
- if (empty($task_id)){
- throw new InvalidArgumentException("Please set task_id");
- }
- $url = "projects/{$this->project_id}/tasks/$task_id/progress";
- $request = array(
- 'percent' => $percent,
- 'msg' => $msg
- );
+ /**
+ * Wait while the task specified by task_id executes
+ *
+ * @param string $task_id Task ID
+ * @param int $sleep Delay between API invocations in seconds
+ * @param int $max_wait_time Maximum waiting time in seconds, 0 for infinity
+ * @return mixed $details Task details or false
+ */
+ public function waitFor($task_id, $sleep = 5, $max_wait_time = 0){
+ while(1){
+ $details = $this->getTaskDetails($task_id);
- $this->setCommonHeaders();
- return self::json_decode($this->apiCall(self::POST, $url, $request));
+ if ($details->status != 'queued' && $details->status != 'running'){
+ return $details;
+ }
+ if ($max_wait_time > 0){
+ $max_wait_time -= $sleep;
+ if ($max_wait_time <= 0) return false;
+ }
+
+ sleep($sleep);
+ }
+ return false;
}
- /* PRIVATE FUNCTIONS */
/**
+ * Schedule a task
*
* @param string $name
* @param array $options options contain:
@@ -467,7 +478,7 @@ public function setTaskProgress($task_id, $percent, $msg = ''){
* @param array $payload
* @return mixed
*/
- private function postSchedule($name, $options, $payload = array()){
+ public function postSchedule($name, $options, $payload = array()){
$url = "projects/{$this->project_id}/schedules";
$shedule = array(
'name' => $name,
@@ -486,20 +497,77 @@ private function postSchedule($name, $options, $payload = array()){
return $shedules->schedules[0]->id;
}
- private function compiledHeaders(){
-
- # Set default headers if no headers set.
- if ($this->headers == null){
- $this->setCommonHeaders();
+ /**
+ * Set a Task’s Progress
+ *
+ * Example (inside a worker):
+ * <code>
+ * require_once "phar://iron_worker.phar";
+ * $worker = new IronWorker(); # assuming you have iron.json inside a worker
+ * $args = getArgs();
+ * $task_id = $args['task_id'];
+ * $worker->setProgress($task_id, 50, "Task is half-done");
+ * </code>
+ *
+ * @param string $task_id Task ID
+ * @param int $percent An integer, between 0 and 100 inclusive, that describes the completion of the task.
+ * @param string $msg Any message or data describing the completion of the task. Must be a string value, and the 64KB request limit applies.
+ * @return mixed
+ * @throws InvalidArgumentException
+ */
+ public function setProgress($task_id, $percent, $msg = ''){
+ if (empty($task_id)){
+ throw new InvalidArgumentException("Please set task_id");
}
+ $url = "projects/{$this->project_id}/tasks/$task_id/progress";
+ $request = array(
+ 'percent' => $percent,
+ 'msg' => $msg
+ );
+
+ $this->setCommonHeaders();
+ $res = $this->apiCall(self::POST, $url, $request);
+ return self::json_decode($res);
+ }
+
+ /**
+ * Alias for setProgress()
+ *
+ * @param string $task_id Task ID
+ * @param int $percent
+ * @param string $msg
+ * @return mixed
+ */
+ public function setTaskProgress($task_id, $percent, $msg = ''){
+ return $this->setProgress($task_id, $percent, $msg);
+ }
- $headers = array();
- foreach ($this->headers as $k => $v){
- $headers[] = "$k: $v";
+ /**
+ * Set a Task’s Progress. Work only inside a worker
+ *
+ * Example (inside a worker):
+ * <code>
+ * require_once "phar://iron_worker.phar";
+ * $worker = new IronWorker(); # assuming you have iron.json inside a worker
+ * $worker->setCurrentTaskProgress(50, "Task is half-done");
+ * </code>
+ * @param int $percent An integer, between 0 and 100 inclusive, that describes the completion of the task.
+ * @param string $msg Any message or data describing the completion of the task. Must be a string value, and the 64KB request limit applies.
+ * @return mixed
+ * @throws RuntimeException
+ */
+ public function setCurrentTaskProgress($percent, $msg = ''){
+ if (!function_exists('getArgs')){
+ throw new RuntimeException("Method can be used only inside a worker");
}
- return $headers;
+ $args = getArgs();
+ $task_id = $args['task_id'];
+
+ return $this->setProgress($task_id, $percent, $msg);
}
+ /* PRIVATE FUNCTIONS */
+
private function runtimeFileType($name) {
if(empty($name)){
return false;
@@ -519,98 +587,10 @@ private function runtimeFileType($name) {
}
}
- private function apiCall($type, $url, $params = array(), $raw_post_data = null){
- $url = "{$this->url}$url";
-
- $s = curl_init();
- if (! isset($params['oauth'])) {
- $params['oauth'] = $this->token;
- }
- switch ($type) {
- case self::DELETE:
- $fullUrl = $url . '?' . http_build_query($params);
- $this->debug('apiCall fullUrl', $fullUrl);
- curl_setopt($s, CURLOPT_URL, $fullUrl);
- curl_setopt($s, CURLOPT_CUSTOMREQUEST, self::DELETE);
- break;
- case self::POST:
- $this->debug('apiCall url', $url);
- curl_setopt($s, CURLOPT_URL, $url);
- curl_setopt($s, CURLOPT_POST, true);
- if ($raw_post_data){
- curl_setopt($s, CURLOPT_POSTFIELDS, $raw_post_data);
- }else{
- curl_setopt($s, CURLOPT_POSTFIELDS, json_encode($params));
- }
- break;
- case self::GET:
- $fullUrl = $url . '?' . http_build_query($params);
- $this->debug('apiCall fullUrl', $fullUrl);
- curl_setopt($s, CURLOPT_URL, $fullUrl);
- break;
- }
-
- curl_setopt($s, CURLOPT_RETURNTRANSFER, true);
- curl_setopt($s, CURLOPT_HTTPHEADER, $this->compiledHeaders());
- $_out = curl_exec($s);
- $status = curl_getinfo($s, CURLINFO_HTTP_CODE);
- curl_close($s);
- switch ($status) {
- case self::HTTP_OK:
- case self::HTTP_CREATED:
- case self::HTTP_ACEPTED:
- $out = $_out;
- break;
- default:
- throw new Http_Exception("http error: {$status} | {$_out}", $status);
- }
- return $out;
- }
-
-
- /**
- * @param array|string $config_file_or_options
- * array of options or name of config file
- * @return array
- * @throws InvalidArgumentException
- */
- private function getConfigData($config_file_or_options){
- if (is_string($config_file_or_options)){
- $ini = parse_ini_file($config_file_or_options, true);
- if ($ini === false){
- throw new InvalidArgumentException("Config file $config_file_or_options not found");
- }
- if (empty($ini['iron_worker'])){
- throw new InvalidArgumentException("Config file $config_file_or_options has no section 'iron_worker'");
- }
- $config = $ini['iron_worker'];
- }elseif(is_array($config_file_or_options)){
- $config = $config_file_or_options;
- }else{
- throw new InvalidArgumentException("Wrong parameter type");
- }
- foreach ($this->required_config_fields as $field){
- if (empty($config[$field])){
- throw new InvalidArgumentException("Required config key missing: '$field'");
- }
- }
- return $config;
- }
-
private function getFileContent($filename){
return file_get_contents($filename);
}
- private function setCommonHeaders(){
- $this->headers = array(
- 'Authorization' => "OAuth {$this->token}",
- 'User-Agent' => self::header_user_agent,
- 'Content-Type' => 'application/json',
- 'Accept' => self::header_accept,
- 'Accept-Encoding' => self::header_accept_encoding
- );
- }
-
private function setJsonHeaders(){
$this->setCommonHeaders();
}
@@ -620,12 +600,6 @@ private function setPostHeaders(){
$this->headers['Content-Type'] ='multipart/form-data';
}
- private function debug($var_name, $variable){
- if ($this->debug_enabled){
- echo "{$var_name}: ".var_export($variable,true)."\n";
- }
- }
-
private static function fileNamesRecursive($dir, $base_dir = ''){
$dir .= DIRECTORY_SEPARATOR;
$files = scandir($dir);
@@ -645,115 +619,68 @@ private static function fileNamesRecursive($dir, $base_dir = ''){
return $names;
}
- private static function dateRfc3339($timestamp = 0) {
-
- if (!$timestamp) {
- $timestamp = time();
- }
- $date = date('Y-m-d\TH:i:s', $timestamp);
-
- $matches = array();
- if (preg_match('/^([\-+])(\d{2})(\d{2})$/', date('O', $timestamp), $matches)) {
- $date .= $matches[1].$matches[2].':'.$matches[3];
- } else {
- $date .= 'Z';
- }
- return $date;
- }
-
- private static function json_decode($response){
- $data = json_decode($response);
- $json_error = json_last_error();
- if($json_error != JSON_ERROR_NONE) {
- throw new JSON_Exception($json_error);
- }
- return $data;
- }
-
/**
* Contain php code that adds to worker before upload
*
+ * @param string $worker_file_name
* @return string
*/
- private function workerHeader(){
- $header = <<<'EOL'
+ private function workerHeader($worker_file_name){
+ $header = <<<EOL
<?php
/*IRON_WORKER_HEADER*/
function getArgs(){
- global $argv;
- $args = array('task_id' => null, 'dir' => null, 'payload' => array());
- foreach($argv as $k => $v){
- if (empty($argv[$k+1])) continue;
- if ($v == '-id') $args['task_id'] = $argv[$k+1];
- if ($v == '-d') $args['dir'] = $argv[$k+1];
- if ($v == '-payload' && file_exists($argv[$k+1])){
- $args['payload'] = json_decode(file_get_contents($argv[$k+1]));
+ global \$argv;
+ \$args = array('task_id' => null, 'dir' => null, 'payload' => array());
+ foreach(\$argv as \$k => \$v){
+ if (empty(\$argv[\$k+1])) continue;
+ if (\$v == '-id') \$args['task_id'] = \$argv[\$k+1];
+ if (\$v == '-d') \$args['dir'] = \$argv[\$k+1];
+ if (\$v == '-payload' && file_exists(\$argv[\$k+1])){
+ \$args['payload'] = file_get_contents(\$argv[\$k + 1]);
+ \$parsed_payload = json_decode(\$args['payload']);
+ if (\$parsed_payload != null) {
+ \$args['payload'] = \$parsed_payload;
+ }
}
}
- return $args;
+ return \$args;
}
function getPayload(){
- $args = getArgs();
- return $args['payload'];
+ \$args = getArgs();
+ return \$args['payload'];
}
- function setProgress($percent, $msg = ''){
- $args = getArgs();
- $task_id = $args['task_id'];
- $base_url = '[URL]';
- $project_id = '[PROJECT_ID]';
- $headers = [HEADERS];
-
- $url = "{$base_url}projects/$project_id/tasks/$task_id/progress";
- $params = array(
- 'percent' => $percent,
- 'msg' => $msg
- );
-
- $s = curl_init();
- curl_setopt($s, CURLOPT_URL, $url);
- curl_setopt($s, CURLOPT_POST, true);
- curl_setopt($s, CURLOPT_POSTFIELDS, json_encode($params));
- curl_setopt($s, CURLOPT_RETURNTRANSFER, true);
- curl_setopt($s, CURLOPT_HTTPHEADER, $headers);
- $out = curl_exec($s);
- curl_close($s);
- return json_decode($out);
- }
-
- ?>
+ require dirname(__FILE__)."/[SCRIPT]";
EOL;
$header = str_replace(
- array('[PROJECT_ID]','[URL]','[HEADERS]'),
- array($this->project_id, $this->url, var_export($this->compiledHeaders(), true)),
+ array('[PROJECT_ID]','[URL]','[HEADERS]','[SCRIPT]'),
+ array($this->project_id, $this->url, var_export($this->compiledHeaders(), true), $worker_file_name),
$header
);
return trim($header," \n\r");
}
- private function addHeaderToArchive($archive, $worker_file_name){
+ private function addRunnerToArchive($archive, $worker_file_name){
$zip = new ZipArchive;
- if (!$zip->open($archive) === true) {
+ if (!$zip->open($archive, ZIPARCHIVE::CREATE) === true) {
$zip->close();
- throw new IronWorker_Exception("Archive $archive not found!");
+ throw new IronWorker_Exception("Archive $archive was not found!");
}
- if (! $worker_content = $zip->getFromName($worker_file_name)){
+ if ($zip->statName($worker_file_name) === false){
$zip->close();
- throw new IronWorker_Exception("File $worker_file_name in archive $archive not found!");
+ throw new IronWorker_Exception("File $worker_file_name in archive $archive was not found!");
}
- if (strpos($worker_content, '/*IRON_WORKER_HEADER*/') === false){
- // add header
- if (!$zip->addFromString($worker_file_name, $this->workerHeader().$worker_content)){
- throw new IronWorker_Exception("Adding Header to the worker failed");
- }
+ if (!$zip->addFromString('runner.php', $this->workerHeader($worker_file_name))){
+ throw new IronWorker_Exception("Adding Runner to the worker failed");
}
$zip->close();
return true;
}
-}
+}
View
1 lib/IronWorkerWrapper.php
@@ -1,4 +1,5 @@
<?php
+include("IronCore.class.php");
include("IronWorker.class.php");
$iw = new IronWorker(__DIR__.'/../config.ini');
?>
View
2 mq/postMessage.php
@@ -2,6 +2,6 @@
include(__DIR__.'/../lib/IronMQWrapper.php');
$url = $_REQUEST['url'];
$queue_name = $_REQUEST['queue_name'];
-$res = $ironmq->postMessage($queue_name, array("body" => $url));
+$res = $ironmq->postMessage($queue_name, $url);
echo("Message posted");
?>

0 comments on commit 3789bc4

Please sign in to comment.
Something went wrong with that request. Please try again.