Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serve exported files from datastore #5942

Merged
merged 8 commits into from Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Expand Up @@ -12,6 +12,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released

### Added
- Added the possibility to add additional volume layers to an existing annotation via the left sidebar. [#5881](https://github.com/scalableminds/webknossos/pull/5881)
- Tiff exports are now served from the datastore module to prepare for remote datastores with webknossos-worker. [#5942](https://github.com/scalableminds/webknossos/pull/5942)

### Changed

Expand Down
13 changes: 0 additions & 13 deletions app/controllers/JobsController.scala
@@ -1,6 +1,5 @@
package controllers

import java.nio.file.{Files, Paths}
import java.util.Date

import com.mohiva.play.silhouette.api.Silhouette
Expand Down Expand Up @@ -179,16 +178,4 @@ class JobsController @Inject()(jobDAO: JobDAO,
}
}

def downloadExport(jobId: String, exportFileName: String): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
for {
jobIdValidated <- ObjectId.parse(jobId)
job <- jobDAO.findOne(jobIdValidated)
latestRunId <- job.latestRunId.toFox
organization <- organizationDAO.findOne(request.identity._organization)
filePath = Paths.get("binaryData", organization.name, ".export", latestRunId, exportFileName)
_ <- bool2Fox(Files.exists(filePath)) ?~> "job.export.fileNotFound"
} yield Ok.sendPath(filePath, inline = false)
}

}
20 changes: 19 additions & 1 deletion app/controllers/UserTokenController.scala
Expand Up @@ -16,12 +16,13 @@ import io.swagger.annotations._
import javax.inject.Inject
import models.annotation._
import models.binary.{DataSetDAO, DataSetService, DataStoreService}
import models.job.JobDAO
import models.user.{User, UserService}
import net.liftweb.common.{Box, Full}
import oxalis.security._
import play.api.libs.json.Json
import play.api.mvc.{Action, AnyContent, PlayBodyParsers, Result}
import utils.WkConf
import utils.{ObjectId, WkConf}

import scala.concurrent.ExecutionContext

Expand All @@ -44,6 +45,7 @@ class UserTokenController @Inject()(dataSetDAO: DataSetDAO,
annotationInformationProvider: AnnotationInformationProvider,
dataStoreService: DataStoreService,
tracingStoreService: TracingStoreService,
jobDAO: JobDAO,
wkSilhouetteEnvironment: WkSilhouetteEnvironment,
conf: WkConf,
sil: Silhouette[WkEnv])(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers)
Expand Down Expand Up @@ -98,6 +100,8 @@ class UserTokenController @Inject()(dataSetDAO: DataSetDAO,
handleDataSourceAccess(accessRequest.resourceId, accessRequest.mode, userBox)(sharingTokenAccessCtx)
case AccessResourceType.tracing =>
handleTracingAccess(accessRequest.resourceId.name, accessRequest.mode, userBox)
case AccessResourceType.jobExport =>
handleJobExportAccess(accessRequest.resourceId.name, accessRequest.mode, userBox)
case _ =>
Fox.successful(UserAccessAnswer(granted = false, Some("Invalid access token.")))
}
Expand Down Expand Up @@ -188,4 +192,18 @@ class UserTokenController @Inject()(dataSetDAO: DataSetDAO,
else UserAccessAnswer(granted = false, Some(s"No ${mode.toString} access to tracing"))
}
}

private def handleJobExportAccess(jobId: String, mode: AccessMode, userBox: Box[User]): Fox[UserAccessAnswer] =
if (mode != AccessMode.read)
Fox.successful(UserAccessAnswer(granted = false, Some(s"Unsupported acces mode for job exports: $mode")))
else {
for {
jobIdValidated <- ObjectId.parse(jobId)
jobBox <- jobDAO.findOne(jobIdValidated)(DBAccessContext(userBox)).futureBox
answer = jobBox match {
case Full(_) => UserAccessAnswer(granted = true)
case _ => UserAccessAnswer(granted = false, Some(s"No ${mode} access to job export"))
}
} yield answer
}
jstriebel marked this conversation as resolved.
Show resolved Hide resolved
}
24 changes: 22 additions & 2 deletions app/controllers/WKRemoteDataStoreController.scala
Expand Up @@ -2,6 +2,7 @@ package controllers

import com.scalableminds.util.accesscontext.{AuthorizedAccessContext, GlobalAccessContext}
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.controllers.JobExportProperties
import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId
import com.scalableminds.webknossos.datastore.models.datasource.inbox.{InboxDataSourceLike => InboxDataSource}
import com.scalableminds.webknossos.datastore.services.{
Expand All @@ -13,14 +14,16 @@ import com.typesafe.scalalogging.LazyLogging
import javax.inject.Inject
import models.analytics.{AnalyticsService, UploadDatasetEvent}
import models.binary._
import models.job.JobDAO
import models.organization.OrganizationDAO
import models.user.User
import models.user.{User, UserDAO}
import net.liftweb.common.Full
import oxalis.mail.{MailchimpClient, MailchimpTag}
import oxalis.security.{WebknossosBearerTokenAuthenticatorService, WkSilhouetteEnvironment}
import play.api.i18n.{Messages, MessagesProvider}
import play.api.libs.json.{JsError, JsSuccess, JsValue}
import play.api.libs.json.{JsError, JsSuccess, JsValue, Json}
import play.api.mvc.{Action, AnyContent, PlayBodyParsers}
import utils.ObjectId

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -31,6 +34,8 @@ class WKRemoteDataStoreController @Inject()(
analyticsService: AnalyticsService,
organizationDAO: OrganizationDAO,
dataSetDAO: DataSetDAO,
userDAO: UserDAO,
jobDAO: JobDAO,
mailchimpClient: MailchimpClient,
wkSilhouetteEnvironment: WkSilhouetteEnvironment)(implicit ec: ExecutionContext, bodyParsers: PlayBodyParsers)
extends Controller
Expand Down Expand Up @@ -153,6 +158,21 @@ class WKRemoteDataStoreController @Inject()(
}
} yield Ok
}
}

def jobExportProperties(name: String, key: String, jobId: String): Action[AnyContent] = Action.async {
implicit request =>
dataStoreService.validateAccess(name, key) { _ =>
for {
jobIdValidated <- ObjectId.parse(jobId)
job <- jobDAO.findOne(jobIdValidated)(GlobalAccessContext)
jobOwner <- userDAO.findOne(job._owner)(GlobalAccessContext)
organization <- organizationDAO.findOne(jobOwner._organization)(GlobalAccessContext)
latestRunId <- job.latestRunId.toFox ?~> "job.notRun"
exportFileName <- job.exportFileName.toFox ?~> "job.noExportFileName"
jobExportProperties = JobExportProperties(jobId, latestRunId, organization.name, exportFileName)
} yield Ok(Json.toJson(jobExportProperties))
}
}

}
34 changes: 24 additions & 10 deletions app/models/job/Job.scala
Expand Up @@ -11,6 +11,7 @@ import com.scalableminds.webknossos.schema.Tables._
import com.typesafe.scalalogging.LazyLogging
import javax.inject.Inject
import models.analytics.{AnalyticsService, FailedJobEvent, RunJobEvent}
import models.binary.DataStoreDAO
import models.job.JobState.JobState
import models.organization.OrganizationDAO
import models.user.{MultiUserDAO, User, UserDAO}
Expand All @@ -24,7 +25,6 @@ import utils.{ObjectId, SQLClient, SQLDAO, WkConf}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

import scala.concurrent.duration._

case class Job(
Expand Down Expand Up @@ -56,18 +56,22 @@ case class Job(

def effectiveState: JobState = manualState.getOrElse(state)

def resultLink(organizationName: String): Option[String] =
def exportFileName: Option[String] = argAsStringOpt("export_file_name")

def datasetName: Option[String] = argAsStringOpt("dataset_name")

private def argAsStringOpt(key: String) = (commandArgs \ key).toOption.flatMap(_.asOpt[String])

def resultLink(organizationName: String, dataStorePublicUrl: String): Option[String] =
if (effectiveState != JobState.SUCCESS) None
else {
command match {
case "convert_to_wkw" =>
(commandArgs \ "dataset_name").toOption.flatMap(_.asOpt[String]).map { dataSetName =>
s"/datasets/$organizationName/$dataSetName/view"
datasetName.map { dsName =>
s"/datasets/$organizationName/$dsName/view"
}
case "export_tiff" =>
(commandArgs \ "export_file_name").toOption.flatMap(_.asOpt[String]).map { exportFileName =>
s"/api/jobs/${_id.id}/downloadExport/$exportFileName"
}
Some(s"$dataStorePublicUrl/data/exports/${_id.id}/download")
case "infer_nuclei" =>
returnValue.map { resultDatasetName =>
s"/datasets/$organizationName/$resultDatasetName/view"
Expand All @@ -92,7 +96,7 @@ class JobDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionContext)
Job(
ObjectId(r._Id),
ObjectId(r._Owner),
r._Datastore,
r._Datastore.trim,
jstriebel marked this conversation as resolved.
Show resolved Hide resolved
r.command,
Json.parse(r.commandargs).as[JsObject],
state,
Expand All @@ -117,6 +121,13 @@ class JobDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionContext)
parsed <- parseAll(r)
} yield parsed

override def findOne(jobId: ObjectId)(implicit ctx: DBAccessContext): Fox[Job] =
for {
accessQuery <- readAccessQuery
r <- run(sql"select #$columns from #$existingCollectionName where #$accessQuery and _id = $jobId".as[JobsRow])
parsed <- parseFirst(r, jobId)
} yield parsed

def countUnassignedPendingForDataStore(_dataStore: String): Fox[Int] =
for {
r <- run(sql"""select count(_id) from #$existingCollectionName
Expand Down Expand Up @@ -221,6 +232,7 @@ class JobService @Inject()(wkConf: WkConf,
userDAO: UserDAO,
multiUserDAO: MultiUserDAO,
jobDAO: JobDAO,
dataStoreDAO: DataStoreDAO,
organizationDAO: OrganizationDAO,
analyticsService: AnalyticsService,
slackNotificationService: SlackNotificationService,
Expand Down Expand Up @@ -258,7 +270,8 @@ class JobService @Inject()(wkConf: WkConf,
for {
user <- userDAO.findOne(jobBeforeChange._owner)(GlobalAccessContext)
organization <- organizationDAO.findOne(user._organization)(GlobalAccessContext)
resultLink = jobAfterChange.resultLink(organization.name)
dataStore <- dataStoreDAO.findOneByName(jobBeforeChange._dataStore)(GlobalAccessContext)
resultLink = jobAfterChange.resultLink(organization.name, dataStore.publicUrl)
resultLinkMrkdwn = resultLink.map(l => s" <${wkConf.Http.uri}$l|Result>").getOrElse("")
multiUser <- multiUserDAO.findOne(user._multiUser)(GlobalAccessContext)
superUserLabel = if (multiUser.isSuperUser) " (for superuser)" else ""
Expand All @@ -277,7 +290,8 @@ class JobService @Inject()(wkConf: WkConf,
for {
owner <- userDAO.findOne(job._owner) ?~> "user.notFound"
organization <- organizationDAO.findOne(owner._organization) ?~> "organization.notFound"
resultLink = job.resultLink(organization.name)
dataStore <- dataStoreDAO.findOneByName(job._dataStore) ?~> "dataStore.notFound"
resultLink = job.resultLink(organization.name, dataStore.publicUrl)
} yield {
Json.obj(
"id" -> job._id.id,
Expand Down
2 changes: 1 addition & 1 deletion conf/webknossos.latest.routes
Expand Up @@ -88,6 +88,7 @@ PATCH /datastores/:name/status c
POST /datastores/:name/verifyUpload controllers.WKRemoteDataStoreController.validateDataSetUpload(name: String, key: String, token: String)
POST /datastores/:name/reportDatasetUpload controllers.WKRemoteDataStoreController.reportDatasetUpload(name: String, key: String, token: String, dataSetName: String, dataSetSizeBytes: Long)
POST /datastores/:name/deleteErroneous controllers.WKRemoteDataStoreController.deleteErroneous(name: String, key: String)
GET /datastores/:name/jobExportProperties controllers.WKRemoteDataStoreController.jobExportProperties(name: String, key: String, jobId: String)
fm3 marked this conversation as resolved.
Show resolved Hide resolved
POST /datastores/:name/validateUserAccess controllers.UserTokenController.validateAccessViaDatastore(name: String, key: String, token: Option[String])
POST /datastores controllers.DataStoreController.create
DELETE /datastores/:name controllers.DataStoreController.delete(name: String)
Expand Down Expand Up @@ -206,5 +207,4 @@ GET /jobs/run/computeMeshFile/:organizationName/:dataSetName c
GET /jobs/run/exportTiff/:organizationName/:dataSetName controllers.JobsController.runExportTiffJob(organizationName: String, dataSetName: String, bbox: String, layerName: Option[String], tracingId: Option[String], tracingVersion: Option[String], annotationId: Option[String], annotationType: Option[String], hideUnmappedIds: Option[Boolean], mappingName: Option[String], mappingType: Option[String])
GET /jobs/run/inferNuclei/:organizationName/:dataSetName controllers.JobsController.runInferNucleiJob(organizationName: String, dataSetName: String, layerName: Option[String])
GET /jobs/:id controllers.JobsController.get(id: String)
GET /jobs/:id/downloadExport/:exportFileName controllers.JobsController.downloadExport(id: String, exportFileName: String)
POST /jobs/:id/status controllers.WKRemoteWorkerController.updateJobStatus(key: String, id: String)
8 changes: 6 additions & 2 deletions frontend/javascripts/admin/job/job_list_view.js
Expand Up @@ -17,7 +17,7 @@ import {
import * as React from "react";

import type { APIJob } from "types/api_flow_types";
import { getJobs } from "admin/admin_rest_api";
import { getJobs, doWithToken } from "admin/admin_rest_api";
import Persistence from "libs/persistence";
import * as Utils from "libs/utils";
import FormattedDate from "components/formatted_date";
Expand Down Expand Up @@ -64,6 +64,7 @@ type State = {
isLoading: boolean,
jobs: Array<APIJob>,
searchQuery: string,
token: string,
};

const persistence: Persistence<State> = new Persistence(
Expand All @@ -78,6 +79,7 @@ class JobListView extends React.PureComponent<Props, State> {
isLoading: true,
jobs: [],
searchQuery: "",
token: "",
};

componentWillMount() {
Expand All @@ -100,10 +102,12 @@ class JobListView extends React.PureComponent<Props, State> {

async fetchData(): Promise<void> {
const jobs = await getJobs();
const token = await doWithToken(async t => t);
this.setState(
{
isLoading: false,
jobs,
token,
},
// refresh jobs according to the refresh interval
() => {
Expand Down Expand Up @@ -182,7 +186,7 @@ class JobListView extends React.PureComponent<Props, State> {
return (
<span>
{job.resultLink && (
<a href={job.resultLink} title="Download">
<a href={`${job.resultLink}?token=${this.state.token}`} title="Download">
<DownOutlined />
Download
</a>
Expand Down
@@ -0,0 +1,49 @@
package com.scalableminds.webknossos.datastore.controllers

import java.nio.file.{Files, Path, Paths}

import com.google.inject.Inject
import com.scalableminds.util.tools.FoxImplicits
import com.scalableminds.webknossos.datastore.DataStoreConfig
import com.scalableminds.webknossos.datastore.services.{
DSRemoteWebKnossosClient,
DataStoreAccessTokenService,
UserAccessRequest
}
import play.api.libs.json.{Json, OFormat}
import play.api.mvc.{Action, AnyContent}

import scala.concurrent.ExecutionContext

case class JobExportProperties(jobId: String, runId: String, organizationName: String, exportFileName: String) {

def fullPathIn(baseDir: Path): Path =
baseDir.resolve(organizationName).resolve(".export").resolve(runId).resolve(exportFileName)
}

object JobExportProperties {
implicit val jsonFormat: OFormat[JobExportProperties] = Json.format[JobExportProperties]
}

class ExportsController @Inject()(webKnossosClient: DSRemoteWebKnossosClient,
accessTokenService: DataStoreAccessTokenService,
config: DataStoreConfig)(implicit ec: ExecutionContext)
extends Controller
with FoxImplicits {

private val dataBaseDir: Path = Paths.get(config.Datastore.baseFolder)

def download(token: Option[String], jobId: String): Action[AnyContent] = Action.async { implicit request =>
accessTokenService.validateAccess(UserAccessRequest.downloadJobExport(jobId), token) {
AllowRemoteOrigin {
for {
exportProperties <- webKnossosClient.getJobExportProperties(jobId)
fullPath = exportProperties.fullPathIn(dataBaseDir)
_ <- bool2Fox(Files.exists(fullPath)) ?~> "job.export.fileNotFound"
} yield Ok.sendPath(fullPath, inline = false)
}
}

}

}
Expand Up @@ -19,7 +19,7 @@ object AccessMode extends ExtendedEnumeration {

object AccessResourceType extends ExtendedEnumeration {
type AccessResourceType = Value
val datasource, tracing, webknossos = Value
val datasource, tracing, webknossos, jobExport = Value
}

case class UserAccessRequest(resourceId: DataSourceId, resourceType: AccessResourceType.Value, mode: AccessMode.Value) {
Expand Down Expand Up @@ -48,6 +48,9 @@ object UserAccessRequest {
def writeTracing(tracingId: String): UserAccessRequest =
UserAccessRequest(DataSourceId(tracingId, ""), AccessResourceType.tracing, AccessMode.write)

def downloadJobExport(jobId: String): UserAccessRequest =
UserAccessRequest(DataSourceId(jobId, ""), AccessResourceType.jobExport, AccessMode.read)

def webknossos: UserAccessRequest =
UserAccessRequest(DataSourceId("webknossos", ""), AccessResourceType.webknossos, AccessMode.administrate)
}
Expand Down
Expand Up @@ -5,6 +5,7 @@ import com.google.inject.Inject
import com.google.inject.name.Named
import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.DataStoreConfig
import com.scalableminds.webknossos.datastore.controllers.JobExportProperties
import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler
import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId
import com.scalableminds.webknossos.datastore.models.datasource.inbox.InboxDataSourceLike
Expand Down Expand Up @@ -91,6 +92,12 @@ class DSRemoteWebKnossosClient @Inject()(
def deleteErroneousDataSource(id: DataSourceId): Fox[_] =
rpc(s"$webKnossosUri/api/datastores/$dataStoreName/deleteErroneous").addQueryString("key" -> dataStoreKey).post(id)

def getJobExportProperties(jobId: String): Fox[JobExportProperties] =
rpc(s"$webKnossosUri/api/datastores/$dataStoreName/jobExportProperties")
.addQueryString("jobId" -> jobId)
.addQueryString("key" -> dataStoreKey)
.getWithJsonResponse[JobExportProperties]

override def requestUserAccess(token: Option[String], accessRequest: UserAccessRequest): Fox[UserAccessAnswer] =
rpc(s"$webKnossosUri/api/datastores/$dataStoreName/validateUserAccess")
.addQueryString("key" -> dataStoreKey)
Expand Down
Expand Up @@ -54,3 +54,6 @@ GET /triggers/checkInbox
GET /triggers/checkInboxBlocking @com.scalableminds.webknossos.datastore.controllers.DataSourceController.triggerInboxCheckBlocking(token: Option[String])
GET /triggers/newOrganizationFolder @com.scalableminds.webknossos.datastore.controllers.DataSourceController.createOrganizationDirectory(token: Option[String], organizationName: String)
GET /triggers/reload/:organizationName/:dataSetName @com.scalableminds.webknossos.datastore.controllers.DataSourceController.reload(token: Option[String], organizationName: String, dataSetName: String, layerName: Option[String])

# Exports
GET /exports/:jobId/download @com.scalableminds.webknossos.datastore.controllers.ExportsController.download(token: Option[String], jobId: String)