Skip to content

Commit

Permalink
Internalize Sequence combinator into the controller.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioana-blue authored and psuter committed Nov 18, 2016
1 parent f0ef11d commit ca15c68
Show file tree
Hide file tree
Showing 21 changed files with 965 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ case class ActivationMessage(
val value = (content getOrElse JsObject()).compactPrint
s"$action?message=$value"
}

def causedBySequence: Boolean = cause.isDefined
}

object ActivationMessage extends DefaultJsonProtocol {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
})
}

/**
* Returns an ActivationResponse that is used as a placeholder for payload
* Used as a feed for starting a sequence.
* NOTE: the code is application error (since this response could be used as a response for the sequence
* if the payload contains an error)
*/
protected[core] def payloadPlaceholder(payload: Option[JsObject]) = ActivationResponse(ApplicationError, payload)

/**
* Interprets response from container after initialization. This method is only called when the initialization failed.
*
Expand Down
29 changes: 29 additions & 0 deletions common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,35 @@ object WhiskAction
wp map { resolvedPkg => FullyQualifiedEntityName(resolvedPkg.namespace.addpath(resolvedPkg.name), actionName) }
}
}

/**
* Resolves an action name if it is contained in a package.
* Look up the package to determine if it is a binding or the actual package.
* If it's a binding, rewrite the fully qualified name of the action using the actual package path name.
* If it's the actual package, use its name directly as the package path name.
* While traversing the package bindings, merge the parameters.
*/
def resolveActionAndMergeParameters(entityStore: EntityStore, fullyQualifiedName: FullyQualifiedEntityName)(
implicit ec: ExecutionContext, transid: TransactionId): Future[WhiskAction] = {
// first check that there is a package to be resolved
val entityPath = fullyQualifiedName.path
if (entityPath.defaultPackage) {
// this is the default package, nothing to resolve
WhiskAction.get(entityStore, fullyQualifiedName.toDocId)
} else {
// there is a package to be resolved
val pkgDocid = fullyQualifiedName.pathToDocId
val actionName = fullyQualifiedName.name
val wp = WhiskPackage.resolveBinding(entityStore, pkgDocid, mergeParameters = true)
wp flatMap { resolvedPkg =>
// fully resolved name for the action
val fqenAction = FullyQualifiedEntityName(resolvedPkg.namespace.addpath(resolvedPkg.name), actionName)
// get the whisk action associate with it and inherit the parameters from the package/binding
WhiskAction.get(entityStore, fqenAction.toDocId) map { _.inherit(resolvedPkg.parameters) }
}
}
}

}

object ActionLimitsOption extends DefaultJsonProtocol {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import whisk.core.database.DocumentFactory
* @param version the semantic version (usually matches the activated entity)
* @param publish true to share the activation or false otherwise
* @param annotation the set of annotations to attribute to the activation
* @param duration of the activation in milliseconds
* @throws IllegalArgumentException if any required argument is undefined
*/
@throws[IllegalArgumentException]
Expand All @@ -59,7 +60,8 @@ case class WhiskActivation(
logs: ActivationLogs = ActivationLogs(),
version: SemVer = SemVer(),
publish: Boolean = false,
annotations: Parameters = Parameters())
annotations: Parameters = Parameters(),
duration: Option[Long] = None)
extends WhiskEntity(EntityName(activationId())) {

require(cause != null, "cause undefined")
Expand All @@ -80,8 +82,8 @@ case class WhiskActivation(
val JsObject(baseFields) = WhiskActivation.serdes.write(this).asJsObject
val newFields = (baseFields - "response") + ("response" -> response.toExtendedJson)
if (end != Instant.EPOCH) {
val duration = (end.toEpochMilli - start.toEpochMilli).toJson
JsObject(newFields + ("duration" -> duration))
val durationValue = (duration getOrElse (end.toEpochMilli - start.toEpochMilli)).toJson
JsObject(newFields + ("duration" -> durationValue))
} else {
JsObject(newFields - "end")
}
Expand All @@ -99,7 +101,8 @@ case class WhiskActivation(
logs = ActivationLogs(),
version = version,
publish = publish,
annotations = annotations)
annotations = annotations,
duration = duration)

def withLogs(logs: ActivationLogs) = WhiskActivation(
namespace = namespace,
Expand All @@ -113,7 +116,8 @@ case class WhiskActivation(
logs = logs,
version = version,
publish = publish,
annotations = annotations)
annotations = annotations,
duration = duration)
}

object WhiskActivation
Expand All @@ -134,7 +138,7 @@ object WhiskActivation
}

override val collectionName = "activations"
override implicit val serdes = jsonFormat12(WhiskActivation.apply)
override implicit val serdes = jsonFormat13(WhiskActivation.apply)

override val cacheEnabled = true
override def cacheKeyForUpdate(w: WhiskActivation) = w.docid.asDocInfo
Expand Down
22 changes: 19 additions & 3 deletions common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ case class WhiskPackage(
WhiskPackage(namespace, name, binding, p ++ parameters, version, publish, annotations)
}

/**
* Merges parameters into existing set of parameters for package.
* The parameters from p supersede parameters from this.
*/
def mergeParameters(p: Parameters) = {
WhiskPackage(namespace, name, binding, parameters ++ p, version, publish, annotations)
}

/**
* Gets binding for package iff this is not already a package reference.
*/
Expand Down Expand Up @@ -140,17 +148,25 @@ object WhiskPackage
override val collectionName = "packages"

/**
* Traverses a binding recursively to find the root package.
* Traverses a binding recursively to find the root package and
* merges parameters along the way if mergeParameters flag is set.
*
* @param db the entity store containing packages
* @param pkg the package document id to start resolving
* @param mergeParameters flag that indicates whether parameters should be merged during package resolution
* @return the same package if there is no binding, or the actual reference package otherwise
*/
def resolveBinding(db: EntityStore, pkg: DocId)(
def resolveBinding(db: EntityStore, pkg: DocId, mergeParameters: Boolean = false)(
implicit ec: ExecutionContext, transid: TransactionId): Future[WhiskPackage] = {
WhiskPackage.get(db, pkg) flatMap { wp =>
// if there is a binding resolve it
val resolved = wp.binding map { binding => resolveBinding(db, binding.docid) }
val resolved = wp.binding map { binding =>
if (mergeParameters) {
resolveBinding(db, binding.docid, true) map {
resolvedPackage => resolvedPackage.mergeParameters(wp.parameters)
}
} else resolveBinding(db, binding.docid)
}
resolved getOrElse Future.successful(wp)
}
}
Expand Down
4 changes: 4 additions & 0 deletions common/scala/src/main/scala/whisk/http/ErrorResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ object Messages {
val bindingCannotReferenceBinding = "Cannot bind to another package binding."
val requestedBindingIsNotValid = "Cannot bind to a resource that is not a package."
val notAllowedOnBinding = "Operation not permitted on package binding."

/** Error messages for sequence activations. */
val sequenceRetrieveActivationTimeout = "Timeout reached when retrieving activation for sequence component."
val sequenceActivationFailure = "Sequence failed."
}

/** Replaces rejections with Json object containing cause and transaction id. */
Expand Down

0 comments on commit ca15c68

Please sign in to comment.