Permalink
Browse files

Add a dedicated table for queues and stop using file-based queues

  • Loading branch information...
1 parent 5cc4f18 commit 60ee9fd243207c7335d791837431ef1b199b3167 @cdujeu cdujeu committed Oct 3, 2016
Showing with 50 additions and 4 deletions.
  1. +45 −4 core/src/plugins/mq.sql/SqlMessageExchanger.php
  2. +5 −0 core/src/plugins/mq.sql/create.sql
@@ -22,6 +22,8 @@
use Pydio\Core\Model\ContextInterface;
+use Pydio\Core\PluginFramework\SqlTableProvider;
+use Pydio\Core\Utils\DBHelper;
use Pydio\Core\Utils\FileHelper;
use Pydio\Core\Utils\Vars\OptionsHelper;
@@ -37,7 +39,7 @@
* @package AjaXplorer_Plugins
* @subpackage Mq
*/
-class SqlMessageExchanger extends Plugin implements IMessageExchanger
+class SqlMessageExchanger extends Plugin implements IMessageExchanger, SqlTableProvider
{
/**
@@ -77,9 +79,11 @@ public function loadChannel($channelName, $create = false)
if (isSet(self::$channels) && is_array(self::$channels[$channelName])) {
return;
}
- if (is_file($this->getPluginWorkDir()."/queues/channel-$channelName")) {
+ $res = dibi::query('SELECT [content] FROM [ajxp_mq_queues] WHERE [channel_name] = %s', $channelName);
+ if($res->count()){
if(!isset(self::$channels)) self::$channels = array();
- $data = FileHelper::loadSerialFile($this->getPluginWorkDir() . "/queues/channel-$channelName");
+ $single = $res->fetchSingle();
+ $data = unserialize(base64_decode($single));
if (is_array($data)) {
if(!is_array($data["MESSAGES"])) $data["MESSAGES"] = array();
if(!is_array($data["CLIENTS"])) $data["CLIENTS"] = array();
@@ -97,9 +101,36 @@ public function loadChannel($channelName, $create = false)
public function __destruct()
{
if (isSet(self::$channels) && is_array(self::$channels)) {
+ $inserts = [];
+ $deletes = [];
+ $driver = $this->sqlDriver["driver"];
foreach (self::$channels as $channelName => $data) {
if (is_array($data)) {
- FileHelper::saveSerialFile($this->getPluginWorkDir() . "/queues/channel-$channelName", $data);
+ if(isSet($data["CLIENTS"]) && count($data["CLIENTS"])) {
+ $serialized = base64_encode(serialize($data));
+ if($driver === "postgre"){
+ dibi::query("DELETE FROM [ajxp_mq_queues] WHERE [channel_name] = %s", $channelName);
+ dibi::query('INSERT INTO [ajxp_mq_queues]', ["channel_name" => $channelName, "content" => $serialized]);
+ }else{
+ $inserts[] = "('$channelName', '".$serialized."')";
+ }
+ }else{
+ $deletes[] = $channelName;
+ }
+ }
+ }
+ if(count($inserts)){
+ try{
+ dibi::query('REPLACE INTO [ajxp_mq_queues] ([channel_name],[content]) VALUES '.implode(",", $inserts));
+ }catch(\DibiException $dE){
+ $this->logError(__CLASS__, $dE->getMessage());
+ }
+ }
+ if(count($deletes)){
+ try{
+ dibi::query('DELETE FROM [ajxp_mq_queues] WHERE [channel_name] IN %s', $deletes);
+ }catch(\DibiException $dE){
+ $this->logError(__CLASS__, $dE->getMessage());
}
}
}
@@ -337,4 +368,14 @@ public function publishInstantMessage(ContextInterface $ctx, $channel, $message)
self::$channels[$channel]["MESSAGES"] = $clean;
}
+ /**
+ * @param array $param
+ * @return string
+ * @throws \Exception
+ */
+ public function installSQLTables($param)
+ {
+ $p = OptionsHelper::cleanDibiDriverParameters(isSet($param) && isSet($param["SQL_DRIVER"]) ? $param["SQL_DRIVER"] : $this->sqlDriver);
+ return DBHelper::runCreateTablesQuery($p, $this->getBaseDir() . "/create.sql");
+ }
}
@@ -0,0 +1,5 @@
+CREATE TABLE IF NOT EXISTS `ajxp_mq_queues` (
+ `channel_name` varchar(255) NOT NULL,
+ `content` text NOT NULL,
+ PRIMARY KEY (`channel_name`)
+) DEFAULT CHARSET=utf8;

0 comments on commit 60ee9fd

Please sign in to comment.