Permalink
Browse files

Update worker

  • Loading branch information...
1 parent 103ca50 commit 85e75454645ae8e1894203c8ba3f63bb126140c2 @cdujeu cdujeu committed Jun 2, 2016
Showing with 76 additions and 73 deletions.
  1. +76 −73 core/src/worker.php
View
@@ -23,14 +23,12 @@
use Pydio\Core\Services\ConfService;
use Pydio\Core\Controller\Controller;
use Pydio\Core\Controller\ShutdownScheduler;
-use Pydio\Core\Controller\XMLWriter;
use Pydio\Core\PluginFramework\PluginsService;
+use Pydio\Tasks\Schedule;
+use Pydio\Tasks\Task;
include_once("base.conf.php");
-//set_error_handler(array("AJXP_XMLWriter", "catchError"), E_ALL & ~E_NOTICE & ~E_STRICT );
-//set_exception_handler(array("AJXP_XMLWriter", "catchException"));
-
$pServ = PluginsService::getInstance();
ConfService::$useSession = false;
AuthService::$useSession = false;
@@ -42,118 +40,123 @@
ConfService::currentContextIsRestAPI("/api");
PluginsService::getInstance()->initActivePlugins();
-function applyTask($userId, $repoId, $actionName, $parameters){
+/**
+ * @param Task $task
+ * @param nsqphp\Logger\Stderr $logger
+ * @throws Exception
+ */
+function applyTask($task, $logger){
+
+ set_error_handler(function ($errno , $errstr , $errfile , $errline) use ($task){
+ if(error_reporting() == 0) return;
+ \Pydio\Tasks\TaskService::getInstance()->updateTaskStatus($task->getId(), Task::STATUS_FAILED, $errstr);
+ }, E_ALL & ~E_NOTICE & ~E_STRICT);
- print($userId." - ".$repoId." - ".$actionName." - ");
- print("Log User\n");
+ $userId = $task->getUserId();
+ $repoId = $task->getWsId();
+ $actionName = $task->getAction();
+ $parameters = $task->getParameters();
+
+ print($userId." - ".$repoId." - ".$actionName." - \n");
+ $logger->debug("Log User");
AuthService::logUser($userId, "", true);
- print("Find Repo\n");
+ $logger->debug("Find Repo");
if($repoId == 'pydio'){
ConfService::switchRootDir();
$repo = ConfService::getRepository();
}else{
$repo = ConfService::findRepositoryByIdOrAlias($repoId);
if ($repo == null) {
- throw new Exception("Cannot find repository with ID ".$repoId);
+ \Pydio\Tasks\TaskService::getInstance()->updateTaskStatus($task->getId(), Task::STATUS_FAILED, "Cannot find repository");
+ $logger->error("Cannot find repository with ID ".$repoId);
+ return;
}
ConfService::switchRootDir($repo->getId());
}
// DRIVERS BELOW NEED IDENTIFICATION CHECK
- print("Load Driver\n");
+ $logger->debug("Load Driver");
if (!AuthService::usersEnabled() || ConfService::getCoreConf("ALLOW_GUEST_BROWSING", "auth") || AuthService::getLoggedUser()!=null) {
ConfService::getConfStorageImpl();
ConfService::loadDriverForRepository($repo);
}
- print("Init plugins\n");
- PluginsService::getInstance()->initActivePlugins();
-
- $fakeRequest = \Zend\Diactoros\ServerRequestFactory::fromGlobals(array(), array(), $parameters)->withAttribute("action", $actionName);
+ $logger->debug("Init plugins");
+ $pServ = PluginsService::getInstance();
+ $pServ->initActivePlugins();
+
+ $fakeRequest = \Zend\Diactoros\ServerRequestFactory::fromGlobals(array(), array(), $parameters)
+ ->withAttribute("action", $actionName)
+ ->withAttribute("pydio-task-id", $task->getId());
+ ;
try{
$response = Controller::run($fakeRequest);
if($response !== false && ($response->getBody()->getSize() || $response instanceof \Zend\Diactoros\Response\EmptyResponse)) {
echo $response->getBody();
}
}catch (Exception $e){
- echo "ERROR : ".$e->getMessage()."\n";
- echo print_r($e->getTraceAsString())."\n";
+ $logger->error("ERROR : ".$e->getMessage());
+ $logger->error($e->getTraceAsString());
+ \Pydio\Tasks\TaskService::getInstance()->updateTaskStatus($task->getId(), Task::STATUS_FAILED, $e->getMessage());
}
- print("Empty ShutdownScheduler!\n");
+ $logger->debug("Empty ShutdownScheduler!");
ShutdownScheduler::getInstance()->callRegisteredShutdown();
- print("Invalidate\n");
+ $logger->debug("Invalidate");
ConfService::getInstance()->invalidateLoadedRepositories();
- print("Disconnect\n");
+ $logger->debug("Disconnecting");
AuthService::disconnect();
PluginsService::updateXmlRegistry(null, true);
-}
+ restore_error_handler();
-function deQueue(){
- $fName = AJXP_DATA_PATH."/plugins/mq.serial/worker-queue";
- if(file_exists($fName)){
- $data = file_get_contents($fName);
- if(!empty($data)){
- $decoded = json_decode($data, true);
- if(!empty($decoded) && is_array($decoded) && count($decoded)){
- $task = array_pop($decoded);
- file_put_contents($fName, json_encode($decoded));
- return $task;
- }
- }
- }
- return false;
}
-$method = isset($argv[1]) ? $argv[1] : 'nsq';
+function listen(){
-if($method == "nsq"){
-
- include("plugins/core.mq/vendor/autoload.php");
$logger = new nsqphp\Logger\Stderr;
$dedupe = new nsqphp\Dedupe\OppositeOfBloomFilterMemcached;
$lookup = new nsqphp\Lookup\FixedHosts('localhost:4150');
$requeueStrategy = new nsqphp\RequeueStrategy\FixedDelay;
$nsq = new nsqphp\nsqphp($lookup, $dedupe, $requeueStrategy, $logger);
- $channel = isset($argv[1]) ? $argv[1] : 'foo';
- $nsq->subscribe('pydio', $channel, function($msg) {
- echo "READ\t" . $msg->getId() . "\t" . $msg->getPayload() . "\n";
- $payload = json_decode($msg->getPayload(), true);
- $data = $payload["data"];
- applyTask($data["user_id"], $data["repository_id"], $data["action"], $data["parameters"]);
- });
- $nsq->run();
-
-
-}else{
-
- while(true){
- $task = deQueue();
- if($task !== false){
- try{
- print "--------------------------------------\n";
- print "Applying task ".$task["actionName"]."\n";
- print_r($task);
- applyTask($task["userId"], $task["repoId"], $task["actionName"], $task["parameters"]);
- }catch (Exception $e){
- print "Error : ".$e->getMessage()."\n";
+ $channel = 'worker';
+
+ $nsq->subscribe('task', $channel, function(\nsqphp\Message\Message $msg) use ($logger) {
+ $logger->debug("READ\t" . $msg->getId() . "\t" . $msg->getPayload());
+ $data = json_decode($msg->getPayload(), true);
+ if(isSet($data["pending_task"])){
+ $taskId = $data["pending_task"];
+ $task = \Pydio\Tasks\TaskService::getInstance()->getTaskById($taskId);
+ if($task instanceof Task ){
+ if($task->getStatus() == Task::STATUS_PENDING){
+ $logger->info("--------------------------------------");
+ $logger->info("Applying task ".$data["actionName"]);
+ try{
+ applyTask($task, $logger);
+ // ALTERNATIVE : SEND IN BG
+ //$task->setSchedule(new Schedule(Schedule::TYPE_ONCE_NOW));
+ //Controller::applyActionInBackground($task->getWsId(), $task->getAction(), $task->getParameters(), $task->getUserId(), "", $task->getId());
+ }catch (Exception $e){
+ $logger->error("Error : ".$e->getMessage());
+ $logger->error("Error : ".$e->getTraceAsString());
+ }
+ }else{
+ $logger->debug("Skipping Task, status is not pending ". $task->getStatus());
+ }
+ }else{
+ $logger->debug("Skipping, cannot find task for id ". $taskId);
}
- flush();
- }else{
- print "--------- nothing to do \n";
}
- print("5\r");
- sleep(1);
- print("4\r");
- sleep(1);
- print("3\r");
- sleep(1);
- print("2\r");
- sleep(1);
- print("1\r");
- sleep(1);
+ });
+
+ try{
+ $nsq->run();
+ }catch (Exception $e){
+ $logger->error("Socket Error ".$e->getMessage());
+ $logger->error("Restarting listener");
+ listen();
}
}
+listen();

0 comments on commit 85e7545

Please sign in to comment.