Skip to content

Commit

Permalink
Serve exported files from datastore (#5942)
Browse files Browse the repository at this point in the history
* Serve exported files from datastore

* use access request

* fetch job export properties from wk

* Add token to link in frontend

* changelog

* Remove now-unused route
  • Loading branch information
fm3 committed Jan 11, 2022
1 parent ac1cb3b commit db4d937
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Expand Up @@ -12,6 +12,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released

### Added
- Added the possibility to add additional volume layers to an existing annotation via the left sidebar. [#5881](https://github.com/scalableminds/webknossos/pull/5881)
- Tiff exports are now served from the datastore module to prepare for remote datastores with webknossos-worker. [#5942](https://github.com/scalableminds/webknossos/pull/5942)

### Changed

Expand Down
13 changes: 0 additions & 13 deletions app/controllers/JobsController.scala
@@ -1,6 +1,5 @@
package controllers

import java.nio.file.{Files, Paths}
import java.util.Date

import com.mohiva.play.silhouette.api.Silhouette
Expand Down Expand Up @@ -179,16 +178,4 @@ class JobsController @Inject()(jobDAO: JobDAO,
}
}

def downloadExport(jobId: String, exportFileName: String): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
for {
jobIdValidated <- ObjectId.parse(jobId)
job <- jobDAO.findOne(jobIdValidated)
latestRunId <- job.latestRunId.toFox
organization <- organizationDAO.findOne(request.identity._organization)
filePath = Paths.get("binaryData", organization.name, ".export", latestRunId, exportFileName)
_ <- bool2Fox(Files.exists(filePath)) ?~> "job.export.fileNotFound"
} yield Ok.sendPath(filePath, inline = false)
}

}
20 changes: 19 additions & 1 deletion app/controllers/UserTokenController.scala
Expand Up @@ -16,12 +16,13 @@ import io.swagger.annotations._
import javax.inject.Inject
import models.annotation._
import models.binary.{DataSetDAO, DataSetService, DataStoreService}
import models.job.JobDAO
import models.user.{User, UserService}
import net.liftweb.common.{Box, Full}
import oxalis.security._
import play.api.libs.json.Json
import play.api.mvc.{Action, AnyContent, PlayBodyParsers, Result}
import utils.WkConf
import utils.{ObjectId, WkConf}

import scala.concurrent.ExecutionContext

Expand All @@ -44,6 +45,7 @@ class UserTokenController @Inject()(dataSetDAO: DataSetDAO,
annotationInformationProvider: AnnotationInformationProvider,
dataStoreService: DataStoreService,
tracingStoreService: TracingStoreService,
jobDAO: JobDAO,
wkSilhouetteEnvironment: WkSilhouetteEnvironment,
conf: WkConf,
sil: Silhouette[WkEnv])(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers)
Expand Down Expand Up @@ -98,6 +100,8 @@ class UserTokenController @Inject()(dataSetDAO: DataSetDAO,
handleDataSourceAccess(accessRequest.resourceId, accessRequest.mode, userBox)(sharingTokenAccessCtx)
case AccessResourceType.tracing =>
handleTracingAccess(accessRequest.resourceId.name, accessRequest.mode, userBox)
case AccessResourceType.jobExport =>
handleJobExportAccess(accessRequest.resourceId.name, accessRequest.mode, userBox)
case _ =>
Fox.successful(UserAccessAnswer(granted = false, Some("Invalid access token.")))
}
Expand Down Expand Up @@ -188,4 +192,18 @@ class UserTokenController @Inject()(dataSetDAO: DataSetDAO,
else UserAccessAnswer(granted = false, Some(s"No ${mode.toString} access to tracing"))
}
}

private def handleJobExportAccess(jobId: String, mode: AccessMode, userBox: Box[User]): Fox[UserAccessAnswer] =
if (mode != AccessMode.read)
Fox.successful(UserAccessAnswer(granted = false, Some(s"Unsupported acces mode for job exports: $mode")))
else {
for {
jobIdValidated <- ObjectId.parse(jobId)
jobBox <- jobDAO.findOne(jobIdValidated)(DBAccessContext(userBox)).futureBox
answer = jobBox match {
case Full(_) => UserAccessAnswer(granted = true)
case _ => UserAccessAnswer(granted = false, Some(s"No ${mode} access to job export"))
}
} yield answer
}
}
24 changes: 22 additions & 2 deletions app/controllers/WKRemoteDataStoreController.scala
Expand Up @@ -2,6 +2,7 @@ package controllers

import com.scalableminds.util.accesscontext.{AuthorizedAccessContext, GlobalAccessContext}
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.controllers.JobExportProperties
import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId
import com.scalableminds.webknossos.datastore.models.datasource.inbox.{InboxDataSourceLike => InboxDataSource}
import com.scalableminds.webknossos.datastore.services.{
Expand All @@ -13,14 +14,16 @@ import com.typesafe.scalalogging.LazyLogging
import javax.inject.Inject
import models.analytics.{AnalyticsService, UploadDatasetEvent}
import models.binary._
import models.job.JobDAO
import models.organization.OrganizationDAO
import models.user.User
import models.user.{User, UserDAO}
import net.liftweb.common.Full
import oxalis.mail.{MailchimpClient, MailchimpTag}
import oxalis.security.{WebknossosBearerTokenAuthenticatorService, WkSilhouetteEnvironment}
import play.api.i18n.{Messages, MessagesProvider}
import play.api.libs.json.{JsError, JsSuccess, JsValue}
import play.api.libs.json.{JsError, JsSuccess, JsValue, Json}
import play.api.mvc.{Action, AnyContent, PlayBodyParsers}
import utils.ObjectId

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -31,6 +34,8 @@ class WKRemoteDataStoreController @Inject()(
analyticsService: AnalyticsService,
organizationDAO: OrganizationDAO,
dataSetDAO: DataSetDAO,
userDAO: UserDAO,
jobDAO: JobDAO,
mailchimpClient: MailchimpClient,
wkSilhouetteEnvironment: WkSilhouetteEnvironment)(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers)
extends Controller
Expand Down Expand Up @@ -153,6 +158,21 @@ class WKRemoteDataStoreController @Inject()(
}
} yield Ok
}
}

def jobExportProperties(name: String, key: String, jobId: String): Action[AnyContent] = Action.async {
implicit request =>
dataStoreService.validateAccess(name, key) { _ =>
for {
jobIdValidated <- ObjectId.parse(jobId)
job <- jobDAO.findOne(jobIdValidated)(GlobalAccessContext)
jobOwner <- userDAO.findOne(job._owner)(GlobalAccessContext)
organization <- organizationDAO.findOne(jobOwner._organization)(GlobalAccessContext)
latestRunId <- job.latestRunId.toFox ?~> "job.notRun"
exportFileName <- job.exportFileName.toFox ?~> "job.noExportFileName"
jobExportProperties = JobExportProperties(jobId, latestRunId, organization.name, exportFileName)
} yield Ok(Json.toJson(jobExportProperties))
}
}

}
34 changes: 24 additions & 10 deletions app/models/job/Job.scala
Expand Up @@ -11,6 +11,7 @@ import com.scalableminds.webknossos.schema.Tables._
import com.typesafe.scalalogging.LazyLogging
import javax.inject.Inject
import models.analytics.{AnalyticsService, FailedJobEvent, RunJobEvent}
import models.binary.DataStoreDAO
import models.job.JobState.JobState
import models.organization.OrganizationDAO
import models.user.{MultiUserDAO, User, UserDAO}
Expand All @@ -24,7 +25,6 @@ import utils.{ObjectId, SQLClient, SQLDAO, WkConf}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

import scala.concurrent.duration._

case class Job(
Expand Down Expand Up @@ -56,18 +56,22 @@ case class Job(

def effectiveState: JobState = manualState.getOrElse(state)

def resultLink(organizationName: String): Option[String] =
def exportFileName: Option[String] = argAsStringOpt("export_file_name")

def datasetName: Option[String] = argAsStringOpt("dataset_name")

private def argAsStringOpt(key: String) = (commandArgs \ key).toOption.flatMap(_.asOpt[String])

def resultLink(organizationName: String, dataStorePublicUrl: String): Option[String] =
if (effectiveState != JobState.SUCCESS) None
else {
command match {
case "convert_to_wkw" =>
(commandArgs \ "dataset_name").toOption.flatMap(_.asOpt[String]).map { dataSetName =>
s"/datasets/$organizationName/$dataSetName/view"
datasetName.map { dsName =>
s"/datasets/$organizationName/$dsName/view"
}
case "export_tiff" =>
(commandArgs \ "export_file_name").toOption.flatMap(_.asOpt[String]).map { exportFileName =>
s"/api/jobs/${_id.id}/downloadExport/$exportFileName"
}
Some(s"$dataStorePublicUrl/data/exports/${_id.id}/download")
case "infer_nuclei" =>
returnValue.map { resultDatasetName =>
s"/datasets/$organizationName/$resultDatasetName/view"
Expand All @@ -92,7 +96,7 @@ class JobDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionContext)
Job(
ObjectId(r._Id),
ObjectId(r._Owner),
r._Datastore,
r._Datastore.trim,
r.command,
Json.parse(r.commandargs).as[JsObject],
state,
Expand All @@ -117,6 +121,13 @@ class JobDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionContext)
parsed <- parseAll(r)
} yield parsed

override def findOne(jobId: ObjectId)(implicit ctx: DBAccessContext): Fox[Job] =
for {
accessQuery <- readAccessQuery
r <- run(sql"select #$columns from #$existingCollectionName where #$accessQuery and _id = $jobId".as[JobsRow])
parsed <- parseFirst(r, jobId)
} yield parsed

def countUnassignedPendingForDataStore(_dataStore: String): Fox[Int] =
for {
r <- run(sql"""select count(_id) from #$existingCollectionName
Expand Down Expand Up @@ -221,6 +232,7 @@ class JobService @Inject()(wkConf: WkConf,
userDAO: UserDAO,
multiUserDAO: MultiUserDAO,
jobDAO: JobDAO,
dataStoreDAO: DataStoreDAO,
organizationDAO: OrganizationDAO,
analyticsService: AnalyticsService,
slackNotificationService: SlackNotificationService,
Expand Down Expand Up @@ -258,7 +270,8 @@ class JobService @Inject()(wkConf: WkConf,
for {
user <- userDAO.findOne(jobBeforeChange._owner)(GlobalAccessContext)
organization <- organizationDAO.findOne(user._organization)(GlobalAccessContext)
resultLink = jobAfterChange.resultLink(organization.name)
dataStore <- dataStoreDAO.findOneByName(jobBeforeChange._dataStore)(GlobalAccessContext)
resultLink = jobAfterChange.resultLink(organization.name, dataStore.publicUrl)
resultLinkMrkdwn = resultLink.map(l => s" <${wkConf.Http.uri}$l|Result>").getOrElse("")
multiUser <- multiUserDAO.findOne(user._multiUser)(GlobalAccessContext)
superUserLabel = if (multiUser.isSuperUser) " (for superuser)" else ""
Expand All @@ -277,7 +290,8 @@ class JobService @Inject()(wkConf: WkConf,
for {
owner <- userDAO.findOne(job._owner) ?~> "user.notFound"
organization <- organizationDAO.findOne(owner._organization) ?~> "organization.notFound"
resultLink = job.resultLink(organization.name)
dataStore <- dataStoreDAO.findOneByName(job._dataStore) ?~> "dataStore.notFound"
resultLink = job.resultLink(organization.name, dataStore.publicUrl)
} yield {
Json.obj(
"id" -> job._id.id,
Expand Down
2 changes: 1 addition & 1 deletion conf/webknossos.latest.routes
Expand Up @@ -88,6 +88,7 @@ PATCH /datastores/:name/status c
POST /datastores/:name/verifyUpload controllers.WKRemoteDataStoreController.validateDataSetUpload(name: String, key: String, token: String)
POST /datastores/:name/reportDatasetUpload controllers.WKRemoteDataStoreController.reportDatasetUpload(name: String, key: String, token: String, dataSetName: String, dataSetSizeBytes: Long)
POST /datastores/:name/deleteErroneous controllers.WKRemoteDataStoreController.deleteErroneous(name: String, key: String)
GET /datastores/:name/jobExportProperties controllers.WKRemoteDataStoreController.jobExportProperties(name: String, key: String, jobId: String)
POST /datastores/:name/validateUserAccess controllers.UserTokenController.validateAccessViaDatastore(name: String, key: String, token: Option[String])
POST /datastores controllers.DataStoreController.create
DELETE /datastores/:name controllers.DataStoreController.delete(name: String)
Expand Down Expand Up @@ -206,5 +207,4 @@ GET /jobs/run/computeMeshFile/:organizationName/:dataSetName c
GET /jobs/run/exportTiff/:organizationName/:dataSetName controllers.JobsController.runExportTiffJob(organizationName: String, dataSetName: String, bbox: String, layerName: Option[String], tracingId: Option[String], tracingVersion: Option[String], annotationId: Option[String], annotationType: Option[String], hideUnmappedIds: Option[Boolean], mappingName: Option[String], mappingType: Option[String])
GET /jobs/run/inferNuclei/:organizationName/:dataSetName controllers.JobsController.runInferNucleiJob(organizationName: String, dataSetName: String, layerName: Option[String])
GET /jobs/:id controllers.JobsController.get(id: String)
GET /jobs/:id/downloadExport/:exportFileName controllers.JobsController.downloadExport(id: String, exportFileName: String)
POST /jobs/:id/status controllers.WKRemoteWorkerController.updateJobStatus(key: String, id: String)
8 changes: 6 additions & 2 deletions frontend/javascripts/admin/job/job_list_view.js
Expand Up @@ -17,7 +17,7 @@ import {
import * as React from "react";

import type { APIJob } from "types/api_flow_types";
import { getJobs } from "admin/admin_rest_api";
import { getJobs, doWithToken } from "admin/admin_rest_api";
import Persistence from "libs/persistence";
import * as Utils from "libs/utils";
import FormattedDate from "components/formatted_date";
Expand Down Expand Up @@ -64,6 +64,7 @@ type State = {
isLoading: boolean,
jobs: Array<APIJob>,
searchQuery: string,
token: string,
};

const persistence: Persistence<State> = new Persistence(
Expand All @@ -78,6 +79,7 @@ class JobListView extends React.PureComponent<Props, State> {
isLoading: true,
jobs: [],
searchQuery: "",
token: "",
};

componentWillMount() {
Expand All @@ -100,10 +102,12 @@ class JobListView extends React.PureComponent<Props, State> {

async fetchData(): Promise<void> {
const jobs = await getJobs();
const token = await doWithToken(async t => t);
this.setState(
{
isLoading: false,
jobs,
token,
},
// refresh jobs according to the refresh interval
() => {
Expand Down Expand Up @@ -182,7 +186,7 @@ class JobListView extends React.PureComponent<Props, State> {
return (
<span>
{job.resultLink && (
<a href={job.resultLink} title="Download">
<a href={`${job.resultLink}?token=${this.state.token}`} title="Download">
<DownOutlined />
Download
</a>
Expand Down
@@ -0,0 +1,49 @@
package com.scalableminds.webknossos.datastore.controllers

import java.nio.file.{Files, Path, Paths}

import com.google.inject.Inject
import com.scalableminds.util.tools.FoxImplicits
import com.scalableminds.webknossos.datastore.DataStoreConfig
import com.scalableminds.webknossos.datastore.services.{
DSRemoteWebKnossosClient,
DataStoreAccessTokenService,
UserAccessRequest
}
import play.api.libs.json.{Json, OFormat}
import play.api.mvc.{Action, AnyContent}

import scala.concurrent.ExecutionContext

case class JobExportProperties(jobId: String, runId: String, organizationName: String, exportFileName: String) {

def fullPathIn(baseDir: Path): Path =
baseDir.resolve(organizationName).resolve(".export").resolve(runId).resolve(exportFileName)
}

object JobExportProperties {
implicit val jsonFormat: OFormat[JobExportProperties] = Json.format[JobExportProperties]
}

class ExportsController @Inject()(webKnossosClient: DSRemoteWebKnossosClient,
accessTokenService: DataStoreAccessTokenService,
config: DataStoreConfig)(implicit ec: ExecutionContext)
extends Controller
with FoxImplicits {

private val dataBaseDir: Path = Paths.get(config.Datastore.baseFolder)

def download(token: Option[String], jobId: String): Action[AnyContent] = Action.async { implicit request =>
accessTokenService.validateAccess(UserAccessRequest.downloadJobExport(jobId), token) {
AllowRemoteOrigin {
for {
exportProperties <- webKnossosClient.getJobExportProperties(jobId)
fullPath = exportProperties.fullPathIn(dataBaseDir)
_ <- bool2Fox(Files.exists(fullPath)) ?~> "job.export.fileNotFound"
} yield Ok.sendPath(fullPath, inline = false)
}
}

}

}
Expand Up @@ -19,7 +19,7 @@ object AccessMode extends ExtendedEnumeration {

object AccessResourceType extends ExtendedEnumeration {
type AccessResourceType = Value
val datasource, tracing, webknossos = Value
val datasource, tracing, webknossos, jobExport = Value
}

case class UserAccessRequest(resourceId: DataSourceId, resourceType: AccessResourceType.Value, mode: AccessMode.Value) {
Expand Down Expand Up @@ -48,6 +48,9 @@ object UserAccessRequest {
def writeTracing(tracingId: String): UserAccessRequest =
UserAccessRequest(DataSourceId(tracingId, ""), AccessResourceType.tracing, AccessMode.write)

def downloadJobExport(jobId: String): UserAccessRequest =
UserAccessRequest(DataSourceId(jobId, ""), AccessResourceType.jobExport, AccessMode.read)

def webknossos: UserAccessRequest =
UserAccessRequest(DataSourceId("webknossos", ""), AccessResourceType.webknossos, AccessMode.administrate)
}
Expand Down
Expand Up @@ -5,6 +5,7 @@ import com.google.inject.Inject
import com.google.inject.name.Named
import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.DataStoreConfig
import com.scalableminds.webknossos.datastore.controllers.JobExportProperties
import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler
import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId
import com.scalableminds.webknossos.datastore.models.datasource.inbox.InboxDataSourceLike
Expand Down Expand Up @@ -91,6 +92,12 @@ class DSRemoteWebKnossosClient @Inject()(
def deleteErroneousDataSource(id: DataSourceId): Fox[_] =
rpc(s"$webKnossosUri/api/datastores/$dataStoreName/deleteErroneous").addQueryString("key" -> dataStoreKey).post(id)

def getJobExportProperties(jobId: String): Fox[JobExportProperties] =
rpc(s"$webKnossosUri/api/datastores/$dataStoreName/jobExportProperties")
.addQueryString("jobId" -> jobId)
.addQueryString("key" -> dataStoreKey)
.getWithJsonResponse[JobExportProperties]

override def requestUserAccess(token: Option[String], accessRequest: UserAccessRequest): Fox[UserAccessAnswer] =
rpc(s"$webKnossosUri/api/datastores/$dataStoreName/validateUserAccess")
.addQueryString("key" -> dataStoreKey)
Expand Down
Expand Up @@ -54,3 +54,6 @@ GET /triggers/checkInbox
GET /triggers/checkInboxBlocking @com.scalableminds.webknossos.datastore.controllers.DataSourceController.triggerInboxCheckBlocking(token: Option[String])
GET /triggers/newOrganizationFolder @com.scalableminds.webknossos.datastore.controllers.DataSourceController.createOrganizationDirectory(token: Option[String], organizationName: String)
GET /triggers/reload/:organizationName/:dataSetName @com.scalableminds.webknossos.datastore.controllers.DataSourceController.reload(token: Option[String], organizationName: String, dataSetName: String, layerName: Option[String])

# Exports
GET /exports/:jobId/download @com.scalableminds.webknossos.datastore.controllers.ExportsController.download(token: Option[String], jobId: String)

0 comments on commit db4d937

Please sign in to comment.