Permalink
Browse files

Fix core.tasks tables and fix SqlMessageExchanger (close #1234)

  • Loading branch information...
1 parent 96743fd commit e0acdd84b5fd665df9d63ebf86e786f58e03eae3 @cdujeu cdujeu committed Oct 7, 2016
@@ -1,9 +1,7 @@
-DROP TABLE IF EXISTS ajxp_tasks;
-
CREATE TABLE IF NOT EXISTS ajxp_tasks (
- uid VARCHAR(255) NOT NULL ,
- type INTEGER NOT NULL,
- parent_uid VARCHAR(255) DEFAULT NULL,
+ uid VARCHAR(40) NOT NULL,
+ `type` INTEGER NOT NULL,
+ parent_uid VARCHAR(40) DEFAULT NULL,
flags INTEGER NOT NULL,
label VARCHAR(255) NOT NULL,
user_id VARCHAR(255) NOT NULL,
@@ -13,18 +11,31 @@ CREATE TABLE IF NOT EXISTS ajxp_tasks (
progress INTEGER NOT NULL,
schedule INTEGER NOT NULL,
schedule_value VARCHAR(255) DEFAULT NULL,
- action VARCHAR(255) NOT NULL,
- parameters VARCHAR(500) NOT NULL,
- nodes VARCHAR(500) NOT NULL,
+ `action` VARCHAR(255) NOT NULL,
+ parameters MEDIUMBLOB NOT NULL,
creation_date INTEGER NOT NULL DEFAULT 0,
status_update INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (uid)
-);
+) CHARACTER SET utf8 COLLATE utf8_unicode_ci;
CREATE INDEX ajxp_task_usr_idx ON ajxp_tasks (user_id);
CREATE INDEX ajxp_task_status_idx ON ajxp_tasks (status);
CREATE INDEX ajxp_task_type ON ajxp_tasks (type);
CREATE INDEX ajxp_task_schedule ON ajxp_tasks (schedule);
-CREATE INDEX ajxp_task_nodes_idx ON ajxp_tasks (nodes);
+
+
+CREATE TABLE IF NOT EXISTS `ajxp_tasks_nodes` (
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `task_uid` varchar(40) NOT NULL,
+ `node_base_url` varchar(255) NOT NULL,
+ `node_path` varchar(255) NOT NULL,
+
+ PRIMARY KEY (`id`)
+) CHARACTER SET utf8 COLLATE utf8_unicode_ci;
+
+
+CREATE INDEX ajxp_taskn_tuid_idx ON ajxp_tasks_nodes (task_uid);
+CREATE INDEX ajxp_taskn_base_idx ON ajxp_tasks_nodes (node_base_url);
+CREATE INDEX ajxp_taskn_path_idx ON ajxp_tasks_nodes (node_path);
@@ -1,5 +1,3 @@
-DROP TABLE IF EXISTS ajxp_tasks;
-
CREATE TABLE IF NOT EXISTS ajxp_tasks (
uid VARCHAR(255) NOT NULL ,
type INTEGER NOT NULL,
@@ -14,17 +12,26 @@ CREATE TABLE IF NOT EXISTS ajxp_tasks (
schedule INTEGER NOT NULL,
schedule_value VARCHAR(255) DEFAULT NULL,
action VARCHAR(255) NOT NULL,
- parameters VARCHAR(500) NOT NULL,
- nodes VARCHAR(500) NOT NULL,
+ parameters BYTEA NOT NULL,
creation_date INTEGER NOT NULL DEFAULT 0,
status_update INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (uid)
);
-
CREATE INDEX ajxp_task_usr_idx ON ajxp_tasks (user_id);
CREATE INDEX ajxp_task_status_idx ON ajxp_tasks (status);
CREATE INDEX ajxp_task_type ON ajxp_tasks (type);
CREATE INDEX ajxp_task_schedule ON ajxp_tasks (schedule);
-CREATE INDEX ajxp_task_nodes_idx ON ajxp_tasks (nodes);
+
+CREATE TABLE IF NOT EXISTS ajxp_tasks_nodes (
+ id serial PRIMARY KEY,
+ task_uid VARCHAR(40) NOT NULL,
+ node_base_url VARCHAR(255) NOT NULL,
+ node_path VARCHAR(255) NOT NULL
+);
+
+
+CREATE INDEX ajxp_taskn_tuid_idx ON ajxp_tasks_nodes (task_uid);
+CREATE INDEX ajxp_taskn_base_idx ON ajxp_tasks_nodes (node_base_url);
+CREATE INDEX ajxp_taskn_path_idx ON ajxp_tasks_nodes (node_path);
@@ -1,5 +1,3 @@
-DROP TABLE IF EXISTS ajxp_tasks;
-
CREATE TABLE IF NOT EXISTS ajxp_tasks (
uid VARCHAR(255) NOT NULL ,
type INTEGER NOT NULL,
@@ -14,17 +12,25 @@ CREATE TABLE IF NOT EXISTS ajxp_tasks (
schedule INTEGER NOT NULL,
schedule_value VARCHAR(255) DEFAULT NULL,
action VARCHAR(255) NOT NULL,
- parameters VARCHAR(500) NOT NULL,
- nodes VARCHAR(500) NOT NULL,
+ parameters TEXT NOT NULL,
creation_date INTEGER NOT NULL DEFAULT 0,
status_update INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (uid)
);
-
CREATE INDEX ajxp_task_usr_idx ON ajxp_tasks (user_id);
CREATE INDEX ajxp_task_status_idx ON ajxp_tasks (status);
CREATE INDEX ajxp_task_type ON ajxp_tasks (type);
CREATE INDEX ajxp_task_schedule ON ajxp_tasks (schedule);
-CREATE INDEX ajxp_task_nodes_idx ON ajxp_tasks (nodes);
+
+CREATE TABLE IF NOT EXISTS ajxp_tasks_nodes (
+ id integer NOT NULL primary key autoincrement,
+ task_uid TEXT NOT NULL,
+ node_base_url TEXT NOT NULL,
+ node_path TEXT NOT NULL
+);
+
+CREATE INDEX ajxp_taskn_tuid_idx ON ajxp_tasks_nodes (task_uid);
+CREATE INDEX ajxp_taskn_base_idx ON ajxp_tasks_nodes (node_base_url);
+CREATE INDEX ajxp_taskn_path_idx ON ajxp_tasks_nodes (node_path);
@@ -23,9 +23,11 @@
use Pydio\Access\Core\Model\AJXP_Node;
use Pydio\Core\Model\RepositoryInterface;
use Pydio\Core\Model\UserInterface;
+use Pydio\Log\Core\Logger;
use Pydio\Tasks\ITasksProvider;
use Pydio\Tasks\Schedule;
use Pydio\Tasks\Task;
+use \dibi as dibi;
defined('AJXP_EXEC') or die('Access not allowed');
@@ -55,12 +57,8 @@ protected function taskToDBValues(Task $task, $removeId = false){
"schedule" => $task->getSchedule()->getType(),
"schedule_value" => $task->getSchedule()->getValue(),
"action" => $task->getAction(),
- "parameters" => json_encode($task->getParameters()),
- "nodes" => ""
+ "parameters" => gzdeflate(json_encode($task->getParameters()), 9),
];
- if(count($task->nodes)){
- $values["nodes"] = "|||".implode("|||", $task->nodes)."|||";
- }
if(!$removeId){
// This is a creation
$values["creation_date"] = time();
@@ -93,22 +91,51 @@ protected function taskFromDBValues(\DibiRow $values){
$task->setAction($values["action"]);
$task->setCreationDate($values["creation_date"]);
$task->setStatusChangeDate($values["status_update"]);
- $task->setParameters(json_decode($values["parameters"], true));
- $nodes = explode("|||", trim($values["nodes"], "|||"));
- foreach ($nodes as $node) {
- if(!empty($node)) $task->attachToNode($node);
- }
+ $task->setParameters(json_decode(gzinflate($values["parameters"]), true));
+ $this->loadTaskNodes($task);
return $task;
}
/**
* @param Task $task
+ * @param bool $update
+ */
+ protected function insertOrUpdateNodes($task, $update = false){
+ if($update){
+ dibi::query("DELETE FROM [ajxp_tasks] WHERE [task_uid]=%s", $task->getId());
+ }
+ foreach($task->nodes as $nodeUrl){
+ $nodePath = parse_url($nodeUrl, PHP_URL_PATH);
+ if(empty($nodePath)) $nodePath = "/";
+ $nodeBaseUrl = preg_replace('/'. preg_quote($nodePath, '/') . '$/', "", $nodeUrl);
+ $values = [
+ "task_uid" => $task->getId(),
+ "node_base_url" => $nodeBaseUrl,
+ "node_path" => $nodePath
+ ];
+ dibi::query("INSERT INTO [ajxp_tasks_nodes] ", $values);
+ }
+ }
+
+ /**
+ * @param Task $task
+ */
+ protected function loadTaskNodes(&$task){
+ $rows = dibi::query("SELECT [node_base_url],[node_path] FROM [ajxp_tasks_nodes] WHERE [task_uid] = %s", $task->getId())->fetchAll();
+ foreach($rows as $dibiRow){
+ $task->attachToNode($dibiRow['node_base_url'].$dibiRow['node_path']);
+ }
+ }
+
+ /**
+ * @param Task $task
* @param Schedule $when
* @return Task
*/
public function createTask(Task $task, Schedule $when)
{
- \dibi::query("INSERT INTO [ajxp_tasks] ", $this->taskToDBValues($task));
+ dibi::query("INSERT INTO [ajxp_tasks] ", $this->taskToDBValues($task));
+ $this->insertOrUpdateNodes($task);
}
/**
@@ -117,7 +144,7 @@ public function createTask(Task $task, Schedule $when)
*/
public function getTaskById($taskId)
{
- $res = \dibi::query('SELECT * FROM [ajxp_tasks] WHERE [uid]=%s', $taskId);
+ $res = dibi::query('SELECT * FROM [ajxp_tasks] WHERE [uid]=%s', $taskId);
foreach ($res->fetchAll() as $row) {
return $this->taskFromDBValues($row);
}
@@ -131,9 +158,10 @@ public function getTaskById($taskId)
public function updateTask(Task $task)
{
try{
- \dibi::query("UPDATE [ajxp_tasks] SET ", $this->taskToDBValues($task, true), " WHERE [uid] =%s", $task->getId());
+ dibi::query("UPDATE [ajxp_tasks] SET ", $this->taskToDBValues($task, true), " WHERE [uid] =%s", $task->getId());
+ $this->insertOrUpdateNodes($task, true);
}catch (\DibiException $ex){
- $sql = $ex->getSql();
+ Logger::error(__CLASS__, __FUNCTION__, "Error while updating task: ".$ex->getSql());
}
}
@@ -143,7 +171,8 @@ public function updateTask(Task $task)
*/
public function deleteTask($taskId)
{
- \dibi::query("DELETE FROM [ajxp_tasks] WHERE uid=%s", $taskId);
+ dibi::query("DELETE FROM [ajxp_tasks] WHERE [uid]=%s", $taskId);
+ dibi::query("DELETE FROM [ajxp_tasks_nodes] WHERE [task_uid]=%s", $taskId);
}
/**
@@ -169,7 +198,7 @@ public function getChildrenTasks($taskId){
$tasks = [];
$where = [];
$where[] = array("[parent_uid] = %s", $taskId);
- $res = \dibi::query('SELECT * FROM [ajxp_tasks] WHERE %and', $where);
+ $res = dibi::query('SELECT * FROM [ajxp_tasks] WHERE %and', $where);
foreach ($res->fetchAll() as $row) {
$tasks[] = $this->taskFromDBValues($row);
}
@@ -193,7 +222,7 @@ public function getCurrentRunningTasks($user = null, $repository = null)
$where[] = array("[ws_id] = %s", $repository->getId());
}
$where[] = array("[status] IN (1,2,8,16)");
- $res = \dibi::query('SELECT * FROM [ajxp_tasks] WHERE %and', $where);
+ $res = dibi::query('SELECT * FROM [ajxp_tasks] WHERE %and', $where);
foreach ($res->fetchAll() as $row) {
$tasks[] = $this->taskFromDBValues($row);
}
@@ -208,12 +237,15 @@ public function getActiveTasksForNode(AJXP_Node $node)
{
$tasks = [];
try{
- $res = \dibi::query('SELECT * FROM [ajxp_tasks] WHERE [nodes] LIKE %s AND [status] NOT IN (1,4,8)', "%|||".$node->getUrl()."|||%");
+ $res = dibi::query("SELECT * FROM [ajxp_tasks],[ajxp_tasks_nodes] WHERE
+ [ajxp_tasks_nodes].[node_base_url] = %s
+ AND [ajxp_tasks_nodes].[node_path] = %s
+ AND [status] NOT IN (1,4,8)", rtrim($node->getContext()->getUrlBase(), '/'), $node->getPath());
foreach ($res->fetchAll() as $row) {
$tasks[] = $this->taskFromDBValues($row);
}
}catch(\DibiException $e){
- $sql = $e->getSql();
+ Logger::error(__CLASS__, __FUNCTION__, "Error while retrieving task for node: ".$e->getSql());
}
return $tasks;
}
@@ -253,7 +285,7 @@ public function getTasks($user = null, $repository = null, $status = -1, $schedu
$where[] = array("[parent_uid] = %s", $parentUid);
}
}
- $res = \dibi::query('SELECT * FROM [ajxp_tasks] WHERE %and', $where);
+ $res = dibi::query('SELECT * FROM [ajxp_tasks] WHERE %and', $where);
foreach ($res->fetchAll() as $row) {
$tasks[] = $this->taskFromDBValues($row);
}
@@ -83,14 +83,16 @@ public function loadChannel($channelName, $create = false)
if(empty($this->sqlDriver)) {
return;
}
- if(!dibi::isConnected()){
+ try {
+ dibi::getConnection();
+ }catch(\DibiException $db){
dibi::connect($this->sqlDriver);
}
$res = dibi::query('SELECT [content] FROM [ajxp_mq_queues] WHERE [channel_name] = %s', $channelName);
if($res->count()){
if(!isset(self::$channels)) self::$channels = array();
$single = $res->fetchSingle();
- $data = unserialize($single);
+ $data = unserialize(gzinflate($single));
if (is_array($data)) {
if(!is_array($data["MESSAGES"])) $data["MESSAGES"] = array();
if(!is_array($data["CLIENTS"])) $data["CLIENTS"] = array();
@@ -108,17 +110,21 @@ public function loadChannel($channelName, $create = false)
public function __destruct()
{
if (isSet(self::$channels) && is_array(self::$channels) && !empty($this->sqlDriver)) {
+ $channels = self::$channels;
+ self::$channels = null;
$inserts = [];
$insertValues = [];
$deletes = [];
$driver = $this->sqlDriver["driver"];
- if(!dibi::isConnected()){
+ try {
+ dibi::getConnection();
+ }catch(\DibiException $db){
dibi::connect($this->sqlDriver);
}
- foreach (self::$channels as $channelName => $data) {
+ foreach ($channels as $channelName => $data) {
if (is_array($data)) {
if(isSet($data["CLIENTS"]) && count($data["CLIENTS"])) {
- $serialized = serialize($data);
+ $serialized = gzdeflate(serialize($data), 9);
if($driver === "postgre"){
dibi::query("DELETE FROM [ajxp_mq_queues] WHERE [channel_name] = %s", $channelName);
dibi::query('INSERT INTO [ajxp_mq_queues]', ["channel_name" => $channelName, "content" => $serialized]);

0 comments on commit e0acdd8

Please sign in to comment.