Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add priority queue functionality

Use a new 'priority' attribute in each document. Sort by this priority attribute when pulling jobs from the queue. Priority is completely optional
  • Loading branch information...
commit b959ae2b8f08ef5bc4af7a426fc34177f11cb351 1 parent 67b7845
lunaru authored
Showing with 38 additions and 16 deletions.
  1. +7 −3 lib/MongoFunctor.php
  2. +8 −8 lib/MongoJob.php
  3. +23 −5 lib/MongoQueue.php
View
10 lib/MongoFunctor.php
@@ -6,16 +6,20 @@ class MongoFunctor
{
protected $when;
protected $className = null;
-
- public function __construct($className, $when, $batch)
+ protected $batch;
+ protected $priority;
+
+ public function __construct($className, $when, $batch, $priority)
{
$this->className = $className;
$this->when = $when;
$this->batch = $batch;
+ $this->priority = $priority;
}
+
public function __call($method, $args)
{
- MongoQueue::push($this->className, $method, $args, $this->when, $this->batch);
+ MongoQueue::push($this->className, $method, $args, $this->when, $this->batch, $this->priority);
}
}
View
16 lib/MongoJob.php
@@ -5,26 +5,26 @@
abstract class MongoJob
{
- public static function later($delay = 0, $batch = false)
+ public static function later($delay = 0, $batch = false, $priority = null)
{
- return self::at(time() + $delay, $batch);
+ return self::at(time() + $delay, $batch, $priority);
}
- public static function at($time = null, $batch = false)
+ public static function at($time = null, $batch = false, $priority = null)
{
if ($time === null) $time = time();
$className = get_called_class();
- return new MongoFunctor($className, $time, $batch);
+ return new MongoFunctor($className, $time, $batch, $priority);
}
- public static function batchLater($delay = 0)
+ public static function batchLater($delay = 0, $priority = null)
{
- return self::later($delay, true);
+ return self::later($delay, true, $priority);
}
- public static function batchAt($time = null)
+ public static function batchAt($time = null, $priority = null)
{
- return self::at($time, true);
+ return self::at($time, true, $priority);
}
public static function run()
View
28 lib/MongoQueue.php
@@ -10,7 +10,7 @@
protected static $environmentLoaded = false;
- public static function push($className, $methodName, $parameters, $when, $batch = false)
+ public static function push($className, $methodName, $parameters, $when, $batch = false, $priority = null)
{
if (!$batch)
{
@@ -20,6 +20,7 @@ public static function push($className, $methodName, $parameters, $when, $batch
'object_method' => $methodName,
'parameters' => $parameters,
'when' => $when,
+ 'priority' => $priority,
'locked' => null,
'locked_at' => null,
'batch' => 1));
@@ -41,12 +42,24 @@ public static function push($className, $methodName, $parameters, $when, $batch
if ($job['ok'])
{
$job = $job['value'];
+ $touched = false;
+ // take the lower 'when'
if (!isset($job['when']) || $job['when'] > $when)
{
$job['when'] = $when;
- $collection->save($job);
+ $touched = true;
}
+
+ // take the higher 'priority'
+ if (!isset($job['priority']) || ($priority !== null && $job['priority'] < $priority))
+ {
+ $job['priority'] = $priority;
+ $touched = true;
+ }
+
+ if ($touched)
+ $collection->save($job);
}
}
}
@@ -74,7 +87,7 @@ public static function count()
return $collection->count($query);
}
- public static function run($class_name = null, $method_name = null)
+ public static function run($class_name = null, $method_name = null, $prioritize = true)
{
$db = self::getDatabase();
$environment = self::initializeEnvironment();
@@ -86,12 +99,17 @@ public static function run($class_name = null, $method_name = null)
if ($method_name)
$query['object_method'] = $method_name;
-
+
+ $sort = array('when' => 1);
+
+ if ($prioritize)
+ $sort = array('priority' => -1, 'when' => 1);
+
$job = $db->command(
array(
"findandmodify" => self::$collectionName,
"query" => $query,
- "sort" => array('when' => 1),
+ "sort" => $sort,
"update" => array('$set' => array('locked' => true, 'locked_at' => time()))
));
Please sign in to comment.
Something went wrong with that request. Please try again.