Permalink
Browse files

Use TaskService for FTP Background Upload, Zip Delegation, Indexation,

  • Loading branch information...
1 parent dabc448 commit b2ca40054ba85ad66bb7eeab227e03345635d8ad @cdujeu cdujeu committed May 21, 2016
@@ -37,6 +37,8 @@
use Pydio\Core\Utils\Utils;
use Pydio\Core\Controller\XMLWriter;
use Pydio\Core\Utils\TextEncoder;
+use Pydio\Tasks\Task;
+use Pydio\Tasks\TaskService;
defined('AJXP_EXEC') or die( 'Access not allowed');
@@ -94,16 +96,13 @@ public function uploadActions(ServerRequestInterface &$request, ResponseInterfac
{
switch ($request->getAttribute("action")) {
- case "trigger_remote_copy":
- if(!$this->hasFilesToCopy()) break;
- $toCopy = $this->getFileNameToCopy();
- $x = new SerializableResponseStream();
- $response = $response->withBody($x);
- $x->addChunk(new BgActionTrigger("next_to_remote", array(), "Copying file ".$toCopy." to ftp server"));
- break;
-
case "next_to_remote":
- if(!$this->hasFilesToCopy()) break;
+ $taskId = $request->getAttribute("pydio-task-id");
+ if(!$this->hasFilesToCopy()) {
+ TaskService::getInstance()->updateTaskStatus($taskId, Task::STATUS_COMPLETE, "");
+ break;
+ }
+
$fData = $this->getNextFileToCopy();
$nextFile = '';
if ($this->hasFilesToCopy()) {
@@ -116,6 +115,7 @@ public function uploadActions(ServerRequestInterface &$request, ResponseInterfac
$destPath = TextEncoder::fromPostedFileName($destPath);
$node = new AJXP_Node($destPath);
$this->logDebug("Copying file to server", array("from"=>$fData["tmp_name"], "to"=>$destPath, "name"=>$fData["name"]));
+ TaskService::getInstance()->updateTaskStatus($taskId, Task::STATUS_RUNNING, "Uploading file ".$fData["name"]);
try {
Controller::applyHook("node.before_create", array(&$node));
$fp = fopen($destPath, "w");
@@ -133,15 +133,19 @@ public function uploadActions(ServerRequestInterface &$request, ResponseInterfac
Controller::applyHook("node.change", array(null, &$node));
} catch (\Exception $e) {
+ TaskService::getInstance()->updateTaskStatus($taskId, Task::STATUS_FAILED, "");
$this->logDebug("Error during ftp copy", array($e->getMessage(), $e->getTrace()));
}
$this->logDebug("FTP Upload : shoud trigger next or reload nextFile=$nextFile");
$x = new SerializableResponseStream();
$response = $response->withBody($x);
if ($nextFile!='') {
$x->addChunk(new BgActionTrigger("next_to_remote", array(), "Copying file ".TextEncoder::toUTF8($nextFile)." to remote server"));
+ $newTask = TaskService::actionAsTask("next_to_remote", []);
+ TaskService::getInstance()->enqueueTask($newTask);
} else {
$x->addChunk(new BgActionTrigger("reload_node", array(), "Upload done, reloading client."));
+ TaskService::getInstance()->updateTaskStatus($taskId, Task::STATUS_COMPLETE, "");
}
break;
@@ -236,6 +240,9 @@ public function uploadActions(ServerRequestInterface &$request, ResponseInterfac
}*/
$this->writeUploadSuccess($request, ["PREVENT_NOTIF" => true]);
+ $task = TaskService::actionAsTask("next_to_remote", []);
+ TaskService::getInstance()->enqueueTask($task);
+
} catch (\Exception $e) {
$errorCode = $e->getCode();
if(empty($errorCode)) $errorCode = 411;
@@ -302,10 +309,12 @@ public function storeFileToCopy($fileData)
$this->logDebug("Saving user temporary data", array($fileData));
$files[] = $fileData;
$user->saveTemporaryData("tmp_upload", $files);
+ /*
if(Utils::userAgentIsNativePydioApp()){
$this->logInfo("Up from",$_SERVER["HTTP_USER_AGENT"]." - direct triger of next to remote");
$this->uploadActions("next_to_remote", array(), array());
}
+ */
}
public function getFileNameToCopy()
@@ -30,6 +30,8 @@
use Pydio\Core\PluginFramework\Plugin;
use Pydio\Core\Utils\TextEncoder;
use Pydio\Access\Core\Model\NodesDiff;
+use Pydio\Tasks\Task;
+use Pydio\Tasks\TaskService;
defined('AJXP_EXEC') or die('Access not allowed');
@@ -41,10 +43,12 @@
class PluginCompression extends Plugin
{
/**
- * @param String $action
- * @param array $httpVars
- * @param array $fileVars
+ * @param \Psr\Http\Message\ServerRequestInterface $requestInterface
+ * @param \Psr\Http\Message\ResponseInterface $responseInterface
* @throws Exception
+ * @throws PydioException
+ * @throws \Pydio\Core\Exception\ActionNotFoundException
+ * @throws \Pydio\Core\Exception\AuthRequiredException
*/
public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$requestInterface, \Psr\Http\Message\ResponseInterface &$responseInterface)
{
@@ -90,25 +94,24 @@ public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$request
}
}
if (!$acceptedExtension) {
- file_put_contents($progressCompressionFileName, "Error : " . $messages["compression.16"]);
throw new PydioException($messages["compression.16"]);
}
$typeArchive = $httpVars["type_archive"];
-
+ $taskId = $requestInterface->getAttribute("pydio-task-id");
// LAUNCH IN BACKGROUND AND EXIT
- if (ConfService::backgroundActionsSupported() && !ConfService::currentContextIsCommandLine()) {
- $archivePath = $currentDirPath.$archiveName;
+ if(empty($taskId)){
+ $task = TaskService::actionAsTask("compression", $httpVars, $repository->getId(), "", [], Task::FLAG_STOPPABLE | Task::FLAG_HAS_PROGRESS);
+ $task->setLabel($messages["compression.5"]);
file_put_contents($progressCompressionFileName, $messages["compression.5"]);
- Controller::applyActionInBackground($repository->getId(), "compression", $httpVars);
- $serializableStream->addChunk(new BgActionTrigger("check_compression_status", array(
- "repository_id" => $repository->getId(),
- "compression_id" => $compressionId,
- "archive_path" => TextEncoder::toUTF8($archivePath)
- ), $messages["compression.5"], 2));
-
- return null;
+ TaskService::getInstance()->enqueueTask($task);
+ return;
}
+ $task = TaskService::getInstance()->getTaskById($taskId);
+ $postMessageStatus = function($message, $taskStatus, $progress = null) use($progressCompressionFileName, $task){
+ $this->operationStatus($progressCompressionFileName, $task, $message, $taskStatus, $progress);
+ };
+
$maxAuthorizedSize = 4294967296;
$currentDirUrlLength = strlen($currentDirUrl);
$tabFolders = array();
@@ -137,14 +140,14 @@ public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$request
}
//WE STOP IF IT'S JUST AN EMPTY FOLDER OR NO FILES
if (empty($tabFilesNames)) {
- file_put_contents($progressCompressionFileName, "Error : " . $messages["compression.17"]);
+ $postMessageStatus($messages["compression.17"], Task::STATUS_FAILED);
throw new PydioException($messages["compression.17"]);
}
try {
$tmpArchiveName = tempnam(Utils::getAjxpTmpDir(), "tar-compression") . ".tar";
$archive = new PharData($tmpArchiveName);
} catch (Exception $e) {
- file_put_contents($progressCompressionFileName, "Error : " . $e->getMessage());
+ $postMessageStatus($e->getMessage(), Task::STATUS_FAILED);
throw $e;
}
$counterCompression = 0;
@@ -154,17 +157,18 @@ public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$request
try {
$archive->addFile(AJXP_MetaStreamWrapper::getRealFSReference($fullPath), $fileName);
$counterCompression++;
- file_put_contents($progressCompressionFileName, sprintf($messages["compression.6"], round(($counterCompression / count($tabAllFiles)) * 100, 0, PHP_ROUND_HALF_DOWN) . " %"));
+ $percent = round(($counterCompression / count($tabAllFiles)) * 100, 0, PHP_ROUND_HALF_DOWN);
+ $postMessageStatus(sprintf($messages["compression.6"], $percent . " %"), Task::STATUS_RUNNING, $percent);
} catch (Exception $e) {
unlink($tmpArchiveName);
- file_put_contents($progressCompressionFileName, "Error : " . $e->getMessage());
+ $postMessageStatus($e->getMessage(), Task::STATUS_FAILED);
throw $e;
}
}
$finalArchive = $tmpArchiveName;
if ($typeArchive != ".tar") {
$archiveTypeCompress = substr(strrchr($typeArchive, "."), 1);
- file_put_contents($progressCompressionFileName, sprintf($messages["compression.7"], strtoupper($archiveTypeCompress)));
+ $postMessageStatus(sprintf($messages["compression.7"], strtoupper($archiveTypeCompress)), Task::STATUS_RUNNING);
if ($archiveTypeCompress == "gz") {
$archive->compress(Phar::GZ);
} elseif ($archiveTypeCompress == "bz2") {
@@ -181,7 +185,10 @@ public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$request
}
$newNode = new AJXP_Node($currentDirUrl . $archiveName);
Controller::applyHook("node.change", array(null, $newNode, false));
- file_put_contents($progressCompressionFileName, "SUCCESS");
+ $nodesDiff = new NodesDiff();
+ $nodesDiff->add($newNode);
+ Controller::applyHook("msg.instant", array($nodesDiff->toXML(), $repository->getId()));
+ $postMessageStatus("Finished", Task::STATUS_COMPLETE);
break;
@@ -236,7 +243,6 @@ public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$request
}
}
if ($acceptedArchive == false) {
- file_put_contents($progressExtractFileName, "Error : " . $messages["compression.15"]);
throw new PydioException($messages["compression.15"]);
}
$onlyFileName = substr($fileArchive, 0, -$extensionLength);
@@ -253,19 +259,19 @@ public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$request
}
// LAUNCHME IN BACKGROUND
- if (ConfService::backgroundActionsSupported() && !ConfService::currentContextIsCommandLine()) {
-
+ $taskId = $requestInterface->getAttribute("pydio-task-id");
+ // LAUNCH IN BACKGROUND AND EXIT
+ if(empty($taskId)){
+ $task = TaskService::actionAsTask("extraction", $httpVars, $repository->getId(), "", [], Task::FLAG_STOPPABLE | Task::FLAG_HAS_PROGRESS);
+ $task->setLabel($messages["compression.12"]);
file_put_contents($progressExtractFileName, $messages["compression.12"]);
- Controller::applyActionInBackground($repository->getId(), "extraction", $httpVars);
- $serializableStream->addChunk(new BgActionTrigger("check_extraction_status", array(
- "repository_id" => $repository->getId(),
- "extraction_id" => $extractId,
- "currentDirUrl" => $currentDirUrl,
- "onlyFileName" => $onlyFileName
- ), $messages["compression.12"], 2));
- return null;
-
+ TaskService::getInstance()->enqueueTask($task);
+ return;
}
+ $task = TaskService::getInstance()->getTaskById($taskId);
+ $postMessageStatus = function($message, $taskStatus, $progress = null) use($progressExtractFileName, $task){
+ $this->operationStatus($progressExtractFileName, $task, $message, $taskStatus, $progress);
+ };
mkdir($currentDirUrl . $onlyFileName, 0777, true);
chmod(AJXP_MetaStreamWrapper::getRealFSReference($currentDirUrl . $onlyFileName), 0777);
@@ -281,19 +287,22 @@ public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$request
try {
$archive->extractTo(AJXP_MetaStreamWrapper::getRealFSReference($currentDirUrl . $onlyFileName), $fileNameInArchive, false);
} catch (Exception $e) {
- file_put_contents($progressExtractFileName, "Error : " . $e->getMessage());
+ $postMessageStatus($e->getMessage(), Task::STATUS_FAILED);
throw new PydioException($e);
}
$counterExtract++;
- file_put_contents($progressExtractFileName, sprintf($messages["compression.13"], round(($counterExtract / $archive->count()) * 100, 0, PHP_ROUND_HALF_DOWN) . " %"));
+ $progress = round(($counterExtract / $archive->count()) * 100, 0, PHP_ROUND_HALF_DOWN);
+ $postMessageStatus(sprintf($messages["compression.13"], $progress."%"), Task::STATUS_RUNNING, $progress);
}
} catch (Exception $e) {
- file_put_contents($progressExtractFileName, "Error : " . $e->getMessage());
+ $postMessageStatus($e->getMessage(), Task::STATUS_FAILED);
throw new PydioException($e);
}
- file_put_contents($progressExtractFileName, "SUCCESS");
+ $postMessageStatus("Done", Task::STATUS_COMPLETE, 100);
$newNode = new AJXP_Node($currentDirUrl . $onlyFileName);
-
+ $nodesDiff = new NodesDiff();
+ $nodesDiff->add($newNode);
+ Controller::applyHook("msg.instant", array($nodesDiff->toXML(), $repository->getId()));
$indexRequest = \Zend\Diactoros\ServerRequestFactory::fromGlobals()
->withParsedBody(["file" => $newNode->getPath()])
->withAttribute("action", "index");
@@ -343,4 +352,28 @@ public function receiveAction(\Psr\Http\Message\ServerRequestInterface &$request
}
}
+
+ /**
+ * @param string $progressCompressionFileName
+ * @param Task $task
+ * @param string $message
+ * @param integer $taskStatus
+ * @param null|integer $progress
+ */
+ private function operationStatus($progressCompressionFileName, $task, $message, $taskStatus, $progress = null)
+ {
+ $fileMessage = $message;
+ if ($taskStatus == Task::STATUS_FAILED) {
+ $fileMessage = "Error : " . $message;
+ } else if ($taskStatus == Task::STATUS_COMPLETE) {
+ $fileMessage = "SUCCESS";
+ }
+ file_put_contents($progressCompressionFileName, $fileMessage);
+ $task->setStatusMessage($message);
+ $task->setStatus($taskStatus);
+ if ($progress != null) {
+ $task->setProgress($progress);
+ }
+ TaskService::getInstance()->updateTask($task);
+ }
}
Oops, something went wrong.

0 comments on commit b2ca400

Please sign in to comment.