Permalink
Browse files

Tasks management

  • Loading branch information...
1 parent 8086b00 commit 6d0b791b00ecc7e4311df838b01fdf0bcb0769ce @cdujeu cdujeu committed Jun 2, 2016
@@ -25,9 +25,9 @@
use Pydio\Conf\Core\AbstractAjxpUser;
use Pydio\Core\Services\ConfService;
use Pydio\Core\Controller\Controller;
-use Pydio\Core\Controller\XMLWriter;
use Pydio\Core\PluginFramework\Plugin;
use Pydio\Core\Utils\TextEncoder;
+use Pydio\Tasks\Schedule;
use Pydio\Tasks\Task;
use Pydio\Tasks\TaskService;
@@ -64,9 +64,10 @@ public function applyAction(\Psr\Http\Message\ServerRequestInterface $requestInt
if (isSet($httpVars["verbose"]) && $httpVars["verbose"] == "true") {
$this->verboseIndexation = true;
}
-
- if (ConfService::backgroundActionsSupported() && !ConfService::currentContextIsCommandLine()) {
+ $taskId = $requestInterface->getAttribute("pydio-task-id");
+ if (empty($taskId)) {
$task = TaskService::actionAsTask("index", $httpVars, "", "", [$nodes[0]->getUrl()], Task::FLAG_STOPPABLE | Task::FLAG_RESUMABLE);
+ $task->setSchedule(new Schedule(Schedule::TYPE_ONCE_DEFER));
TaskService::getInstance()->enqueueTask($task, $requestInterface, $responseInterface);
$responseInterface = new \Zend\Diactoros\Response\EmptyResponse();
return null;
@@ -25,10 +25,7 @@
if(!pydio.getUserSelection().isEmpty()){
crtDir = pydio.getUserSelection().getUniqueNode().getPath();
}
- //var bgManager = pydio.getController().getBackgroundTasksManager();
- //bgManager.queueAction("index", {file:crtDir}, MessageHash["index.lucene.8"].replace('%s', crtDir));
- //bgManager.next();
- PydioTasks.Store.enqueueActionTask("Indexation", "index", {file:crtDir}, [crtDir]);
+ PydioApi.getClient().request({"get_action":"index", "file":crtDir}, function(transport){});
]]></clientCallback>
<serverCallback methodName="applyAction" restParams="/file+" developerComment="Trigger re-indexation of a directory, recursively">
<input_param name="file" type="string" description="Folder to index (can be empty for root)"/>
@@ -204,6 +204,28 @@ public function sendInstantMessage($xmlContent, $repositoryId, $targetUserId = n
}
+ public function sendTaskMessage($content){
+
+ $this->logInfo("Core.mq", "Should now publish a message to NSQ :". json_encode($content));
+ $host = $this->getFilteredOption("WS_SERVER_HOST");
+ $port = $this->getFilteredOption("WS_SERVER_PORT");
+ if(!empty($host) && !empty($port)){
+ // Publish on NSQ
+ try{
+ $nsq = new nsqphp;
+ $nsq->publishTo($host, 1);
+ $nsq->publish('task', new \nsqphp\Message\Message(json_encode($content)));
+ $this->logInfo("Core.mq", "Published a message to NSQ :". json_encode($content));
+ }catch (Exception $e){
+ $this->logError("Core.mq", "sendTaskMessage", $e->getMessage());
+ if(ConfService::currentContextIsCommandLine()){
+ print("Error while trying to send a task message ".json_encode($content)." : ".$e->getMessage());
+ }
+ }
+ }
+
+ }
+
public function appendRefreshInstruction(ResponseInterface &$responseInterface){
if(! $this->hasPendingMessage ){
return;
@@ -316,6 +338,39 @@ public function wsAuthenticate(ServerRequestInterface $requestInterface, Respons
}
+ public function switchWorkerOn($params)
+ {
+ $wDir = $this->getPluginWorkDir(true);
+ $pidFile = $wDir.DIRECTORY_SEPARATOR."worker-pid";
+ if (file_exists($pidFile)) {
+ $pId = file_get_contents($pidFile);
+ $unixProcess = new UnixProcess();
+ $unixProcess->setPid($pId);
+ $status = $unixProcess->status();
+ if ($status) {
+ throw new Exception("Worker seems to already be running!");
+ }
+ }
+ $cmd = ConfService::getCoreConf("CLI_PHP")." worker.php";
+ chdir(AJXP_INSTALL_PATH);
+ $process = Controller::runCommandInBackground($cmd, AJXP_CACHE_DIR."/cmd_outputs/worker.log");
+ if ($process != null) {
+ $pId = $process->getPid();
+ $wDir = $this->getPluginWorkDir(true);
+ file_put_contents($wDir.DIRECTORY_SEPARATOR."worker-pid", $pId);
+ return "SUCCESS: Started worker with process ID $pId";
+ }
+ return "SUCCESS: Started worker Server";
+ }
+
+ public function switchWorkerOff($params){
+ return $this->switchWebSocketOff($params, "worker");
+ }
+
+ public function getWorkerStatus($params){
+ return $this->getWebSocketStatus($params, "worker");
+ }
+
public function switchWebSocketOn($params)
{
$wDir = $this->getPluginWorkDir(true);
@@ -344,26 +399,26 @@ public function switchWebSocketOn($params)
return "SUCCESS: Started WebSocket Server";
}
- public function switchWebSocketOff($params)
+ public function switchWebSocketOff($params, $type = "ws")
{
$wDir = $this->getPluginWorkDir(true);
- $pidFile = $wDir.DIRECTORY_SEPARATOR."ws-pid";
+ $pidFile = $wDir.DIRECTORY_SEPARATOR."$type-pid";
if (!file_exists($pidFile)) {
- throw new Exception("No information found about WebSocket server");
+ throw new Exception("No information found about $type server");
} else {
$pId = file_get_contents($pidFile);
$unixProcess = new UnixProcess();
$unixProcess->setPid($pId);
$unixProcess->stop();
unlink($pidFile);
}
- return "SUCCESS: Killed WebSocket Server";
+ return "SUCCESS: Killed $type Server";
}
- public function getWebSocketStatus()
+ public function getWebSocketStatus($params, $type = "ws")
{
$wDir = $this->getPluginWorkDir(true);
- $pidFile = $wDir.DIRECTORY_SEPARATOR."ws-pid";
+ $pidFile = $wDir.DIRECTORY_SEPARATOR."$type-pid";
if (!file_exists($pidFile)) {
return "OFF";
} else {
@@ -23,6 +23,10 @@
<global_param group="CONF_MESSAGE[WebSocket Server]" type="button" name="SWITCH_WS_ON" choices="run_plugin_action:core.mq:switchWebSocketOn" label="CONF_MESSAGE[Run WebSocket Server]" description="CONF_MESSAGE[Switch the WS server ON]" mandatory="false"/>
<global_param group="CONF_MESSAGE[WebSocket Server]" type="button" name="SWITCH_WS_OFF" choices="run_plugin_action:core.mq:switchWebSocketOff" label="CONF_MESSAGE[Stop WebSocket Server]" description="CONF_MESSAGE[Switch the WS server OFF]" mandatory="false"/>
<global_param group="CONF_MESSAGE[WebSocket Server]" type="integer" name="POLLER_FREQUENCY" label="CONF_MESSAGE[Alternative poller frequency]" description="CONF_MESSAGE[If WebSocket server is not running, a polling mechanism will replace it. Fix the frequency of refresh, in seconds.]" mandatory="true" default="15" expose="true"/>
+ <global_param group="CONF_MESSAGE[Workers]" type="boolean" name="MQ_USE_WORKERS" label="CONF_MESSAGE[Use Workers]" description="CONF_MESSAGE[Send commands in background to workers waiting to execute them]" mandatory="false" default="false"/>
+ <global_param group="CONF_MESSAGE[Workers]" type="monitor" name="MQ_WORKER_STATUS" choices="run_plugin_action:core.mq:getWorkerStatus" label="CONF_MESSAGE[Worker Status]" description="CONF_MESSAGE[Try to detect if the worker is responding]" mandatory="false"/>
+ <global_param group="CONF_MESSAGE[Workers]" type="button" name="MQ_WORKER_SWITCH_ON" choices="run_plugin_action:core.mq:switchWorkerOn" label="CONF_MESSAGE[Start Worker]" description="CONF_MESSAGE[Switch a worker ON]" mandatory="false"/>
+ <global_param group="CONF_MESSAGE[Workers]" type="button" name="MQ_WORKER_SWITCH_OFF" choices="run_plugin_action:core.mq:switchWorkerOff" label="CONF_MESSAGE[Stop Worker]" description="CONF_MESSAGE[Switch a worker OFF]" mandatory="false"/>
</server_settings>
<class_definition classname="MqManager" filename="plugins/core.mq/class.MqManager.php"/>
<registry_contributions>
@@ -80,6 +84,7 @@
<hooks>
<serverCallback methodName="publishNodeChange" hookName="node.change" defer="true"/>
<serverCallback methodName="sendInstantMessage" hookName="msg.instant" />
+ <serverCallback methodName="sendTaskMessage" hookName="msg.task" defer="true"/>
<serverCallback methodName="sendToQueue" hookName="msg.queue_notification" />
<serverCallback methodName="appendRefreshInstruction" hookName="response.send" />
</hooks>
@@ -31,6 +31,8 @@
use Pydio\Core\Services\AuthService;
use Pydio\Core\Services\ConfService;
use Pydio\Core\Utils\Utils;
+use Pydio\Log\Core\AJXP_Logger;
+use Zend\Diactoros\ServerRequestFactory;
defined('AJXP_EXEC') or die('Access not allowed');
@@ -62,26 +64,41 @@ public static function getInstance(){
/**
* @param Task $task
- * @param ServerRequestInterface $request
- * @param ResponseInterface $response
+ * @param ServerRequestInterface|null $request
+ * @param ResponseInterface|null $response
* @throws \Pydio\Core\Exception\ActionNotFoundException
* @throws \Pydio\Core\Exception\AuthRequiredException
- * @return ResponseInterface
+ * @return ResponseInterface|null
*/
- public function enqueueTask(Task $task, ServerRequestInterface $request, ResponseInterface $response){
- if(ConfService::backgroundActionsSupported()){
+ public function enqueueTask(Task $task, ServerRequestInterface $request = null, ResponseInterface $response = null){
+
+ $workers = ConfService::getCoreConf("MQ_USE_WORKERS", "mq");
+ if($workers && !$task->getSchedule()->shouldRunNow()){
+ AJXP_Logger::getInstance()->logInfo("TaskService", "Enqueuing Task ".$task->getId());
+ $msg = ["pending_task" => $task->getId()];
+ Controller::applyHook("msg.task", [$msg]);
+ return $response;
+ }
+
+ if(ConfService::backgroundActionsSupported() && !ConfService::currentContextIsCommandLine()) {
+
Controller::applyTaskInBackground($task);
return $response;
+
}else{
$params = $task->getParameters();
$action = $task->getAction();
$id = $task->getId();
+ if(empty($request)){
+ $request = ServerRequestFactory::fromGlobals();
+ }
$request = $request
->withAttribute("action", $action)
->withAttribute("pydio-task-id", $id)
->withParsedBody($params);
return Controller::run($request);
}
+
}
protected function publishTaskUpdate(Task $task){

0 comments on commit 6d0b791

Please sign in to comment.