Skip to content

Commit

Permalink
Add support for distributed queueing by allowing initialization with …
Browse files Browse the repository at this point in the history
…multiple connections.
  • Loading branch information
lunaru committed Jan 8, 2011
1 parent d2ba6c8 commit 2c72522
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 14 deletions.
12 changes: 11 additions & 1 deletion README.textile
Expand Up @@ -85,7 +85,17 @@ db.mongo_queue.ensureIndex({locked: 1, when: 1});
If you plan to use the batching feature you want an index on the type of jobs already in the queue:

<pre>
db.mongo_queue.ensureIndex({object_class: 1, object_method: 1, parameters: 1, locked: 1});
db.mongo_queue.ensureIndex({object_class: 1, object_method: 1, parameters: 1, locked: 1});
</pre>

h4. Sharding

MongoQueue comes with built-in support for manual sharding. Note that due to the key restriction on updates for auto-sharding, the ability to scale with only auto-sharding is limited.

If you initialize @MongoQueue::$connection@ to an array instead of a single Mongo connection object, MongoQueue will insert or pull jobs at random from the set of connections. This way, you can have dedicated workers per physical machine and insert in a distributed way (or vice versa). For example:

<pre>
MongoQueue::$connection = array($mongo1, $mongo2, $mongo3, $mongo4);
</pre>

h4. Monitoring
Expand Down
49 changes: 36 additions & 13 deletions lib/MongoQueue.php
Expand Up @@ -26,8 +26,8 @@ public static function push($className, $methodName, $parameters, $when, $batch
}
else
{
$db = self::getDatabase();
$collection = self::getCollection();
$db = self::getDatabase(array('object_class' => $className, 'object_method' => $methodName, 'parameters' => $parameters));
$collection = $db->selectCollection(self::$collectionName);

$job = $db->command(
array(
Expand Down Expand Up @@ -106,44 +106,67 @@ public static function run($class_name = null)
}

// remove the job from the queue
self::getCollection()->remove(array('_id' => $jobID));
$db->selectCollection(self::$collectionName)->remove(array('_id' => $jobID));

return true;
}

return false;
}

protected static function getDatabase()
protected static function getConnection($hint = null)
{
if (is_array(self::$connection))
{
$count = count(self::$connection);

if (!$hint)
$hint = md5(rand());

// convert the hint into an index
$hint = abs(crc32(serialize($hint)) % $count);

return self::$connection[$hint];
}
else
{
return self::$connection;
}
}

protected static function getDatabase($hint = null)
{
$collection_name = self::$collectionName;
$connection = self::getConnection($hint);

if (self::$database == null)
throw new Exception("BaseMongoRecord::database must be initialized to a proper database string");

if (self::$connection == null)
if ($connection == null)
throw new Exception("BaseMongoRecord::connection must be initialized to a valid Mongo object");

if (!self::$connection->connected)
self::$connection->connect();
if (!$connection->connected)
$connection->connect();

return self::$connection->selectDB(self::$database);
return $connection->selectDB(self::$database);
}

protected static function getCollection()
protected static function getCollection($hint = null)
{
$collection_name = self::$collectionName;
$connection = self::getConnection($hint);

if (self::$database == null)
throw new Exception("BaseMongoRecord::database must be initialized to a proper database string");

if (self::$connection == null)
if ($connection == null)
throw new Exception("BaseMongoRecord::connection must be initialized to a valid Mongo object");

if (!self::$connection->connected)
self::$connection->connect();

if (!$connection->connected)
$connection->connect();

return self::$connection->selectCollection(self::$database, $collection_name);
return $connection->selectCollection(self::$database, $collection_name);
}

protected static function initializeEnvironment()
Expand Down

0 comments on commit 2c72522

Please sign in to comment.