Permalink
Browse files

Fix nsq calls, do not init a new client each time.

  • Loading branch information...
1 parent d41f455 commit 494a27b9776efb168d71e71c4ee34c7f40d5b80a @cdujeu cdujeu committed Jun 2, 2016
@@ -803,8 +803,25 @@ public function switchAction(ServerRequestInterface &$request, ResponseInterface
if ($selection->isEmpty()) {
throw new PydioException("", 113);
}
+ $size = 0;
+ $nodes = $selection->buildNodes();
+ $bgSizeThreshold = 1*1024*1024;
+ $bgWorkerThreshold = 80*1024*1024;
+ foreach($nodes as $node){
+ $size += $node->getSizeRecursive();
+ }
+ $taskId = $request->getAttribute("pydio-task-id");
+ if($taskId === null && ($size > $bgSizeThreshold)){
+ $task = TaskService::actionAsTask($action, $httpVars);
+ if($size > $bgWorkerThreshold){
+ $task->setSchedule(new Schedule(Schedule::TYPE_ONCE_DEFER));
+ }
+ $response = TaskService::getInstance()->enqueueTask($task, $request, $response);
+ break;
+ }
+
$logMessages = array();
- $errorMessage = $this->delete($selection->getFiles(), $logMessages);
+ $errorMessage = $this->delete($selection->getFiles(), $logMessages, $taskId);
if (count($logMessages)) {
$logMessage = new UserMessage(join("\n", $logMessages));
}
@@ -875,7 +892,7 @@ public function switchAction(ServerRequestInterface &$request, ResponseInterface
throw new PydioException(TextEncoder::toUTF8(join("\n", $error)));
} else {
if (isSet($httpVars["force_copy_delete"])) {
- $errorMessage = $this->delete($selection->getFiles(), $logMessages);
+ $errorMessage = $this->delete($selection->getFiles(), $logMessages, $taskId);
if($errorMessage) {
if(!empty($taskId)) TaskService::getInstance()->updateTaskStatus($taskId, Task::STATUS_FAILED, "Error while deleting data: ".TextEncoder::toUTF8($errorMessage));
throw new PydioException(TextEncoder::toUTF8($errorMessage));
@@ -2100,7 +2117,7 @@ public function createEmptyFile(AJXP_Node $node, $content = "", $force = false)
}
- public function delete($selectedFiles, &$logMessages)
+ public function delete($selectedFiles, &$logMessages, $taskId = null)
{
$repoData = array(
'base_url' => $this->urlBase,
@@ -2117,14 +2134,20 @@ public function delete($selectedFiles, &$logMessages)
$logMessages[]=$mess[100]." ".TextEncoder::toUTF8($selectedFile);
continue;
}
- $this->deldir($fileToDelete, $repoData);
+ $this->deldir($fileToDelete, $repoData, $taskId);
if (is_dir($fileToDelete)) {
$logMessages[]="$mess[38] ".TextEncoder::toUTF8($selectedFile)." $mess[44].";
} else {
$logMessages[]="$mess[34] ".TextEncoder::toUTF8($selectedFile)." $mess[44].";
}
Controller::applyHook("node.change", array(new AJXP_Node($fileToDelete)));
}
+ if($taskId != null){
+ TaskService::getInstance()->updateTaskStatus($taskId, Task::STATUS_COMPLETE, "Done");
+ $nodesDiff = new NodesDiff();
+ $nodesDiff->remove($selectedFiles);
+ Controller::applyHook("msg.instant", array($nodesDiff->toXML(), $this->repository->getId()));
+ }
return null;
}
@@ -447,31 +447,35 @@ protected function changeMode($filePath, $repoData)
}
/**
- * @param $location
- * @param $repoData
+ * @param string $location
+ * @param array $repoData
+ * @param string $taskId
* @throws \Exception
*/
- protected function deldir($location, $repoData)
+ protected function deldir($location, $repoData, $taskId = null)
{
if (is_dir($location)) {
Controller::applyHook("node.before_path_change", array(new AJXP_Node($location)));
$all=opendir($location);
while (($file=readdir($all)) !== FALSE) {
if (is_dir("$location/$file") && $file !=".." && $file!=".") {
- $this->deldir("$location/$file", $repoData);
+ $this->deldir("$location/$file", $repoData, $taskId);
if (file_exists("$location/$file")) {
@rmdir("$location/$file");
+ if($taskId != null) TaskService::getInstance()->updateTaskStatus($taskId, Task::STATUS_RUNNING, "Deleting $file");
}
unset($file);
} elseif (!is_dir("$location/$file")) {
if (file_exists("$location/$file")) {
@unlink("$location/$file");
+ if($taskId != null) TaskService::getInstance()->updateTaskStatus($taskId, Task::STATUS_RUNNING, "Deleting $file");
}
unset($file);
}
}
closedir($all);
@rmdir($location);
+ if($taskId != null) TaskService::getInstance()->updateTaskStatus($taskId, Task::STATUS_RUNNING, "Deleting ".Utils::safeBasename($location));
} else {
if (file_exists("$location")) {
Controller::applyHook("node.before_path_change", array(new AJXP_Node($location)));
@@ -59,7 +59,10 @@
class MqManager extends Plugin
{
- private $wsClient;
+ /**
+ * @var nsqphp
+ */
+ private $nsqClient;
/**
* @var AJXP_MessageExchanger;
@@ -194,10 +197,12 @@ public function sendInstantMessage($xmlContent, $repositoryId, $targetUserId = n
$host = $this->getFilteredOption("WS_SERVER_HOST");
$port = $this->getFilteredOption("WS_SERVER_PORT");
if(!empty($host) && !empty($port)){
- // Publish on NSQ
- $nsq = new nsqphp;
- $nsq->publishTo($host, 1);
- $nsq->publish('im', new \nsqphp\Message\Message(json_encode($input)));
+ if(empty($this->nsqClient)){
+ // Publish on NSQ
+ $this->nsqClient = new nsqphp;
+ $this->nsqClient->publishTo($host, 1);
+ }
+ $this->nsqClient->publish('im', new \nsqphp\Message\Message(json_encode($input)));
}
$this->hasPendingMessage = true;
@@ -84,7 +84,7 @@
<hooks>
<serverCallback methodName="publishNodeChange" hookName="node.change" defer="true"/>
<serverCallback methodName="sendInstantMessage" hookName="msg.instant" />
- <serverCallback methodName="sendTaskMessage" hookName="msg.task" defer="true"/>
+ <serverCallback methodName="sendTaskMessage" hookName="msg.task" defer="true" dontBreakOnException="true"/>
<serverCallback methodName="sendToQueue" hookName="msg.queue_notification" />
<serverCallback methodName="appendRefreshInstruction" hookName="response.send" />
</hooks>

0 comments on commit 494a27b

Please sign in to comment.