Skip to content
This repository has been archived by the owner on Jul 14, 2022. It is now read-only.

Commit

Permalink
Merge pull request #45 from uc-cdis/feat/expression-tools
Browse files Browse the repository at this point in the history
(PXP-7446): Feat/expression tools
  • Loading branch information
cterrazas2 committed Feb 23, 2021
2 parents d2c19d7 + 6c93799 commit 1cb8dcf
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 233 deletions.
51 changes: 15 additions & 36 deletions mariner/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/robertkrimen/otto"
cwl "github.com/uc-cdis/cwl.go"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -388,26 +387,22 @@ func (engine *K8sEngine) setupTool(tool *Tool) (err error) {
return nil
}

// RunTool runs the tool
// If ExpressionTool, passes to appropriate handler to eval the expression
// If CommandLineTool, passes to appropriate handler to create k8s job
// RunTool runs the tool from the engine and passes to the appropriate handler to create a k8s job.
func (engine *K8sEngine) runTool(tool *Tool) (err error) {
engine.infof("begin run tool: %v", tool.Task.Root.ID)
switch class := tool.Task.Root.Class; class {
case "ExpressionTool":
if err = engine.runExpressionTool(tool); err != nil {
return engine.errorf("failed to run ExpressionTool: %v; error: %v", tool.Task.Root.ID, err)
}
if err = engine.listenForDone(tool); err != nil {
return engine.errorf("failed to listen for task to finish: %v; error: %v", tool.Task.Root.ID, err)
}
case "CommandLineTool":
if err = engine.runCommandLineTool(tool); err != nil {
return engine.errorf("failed to run CommandLineTool: %v; error: %v", tool.Task.Root.ID, err)
}

// collect resource metrics via k8s api
// NOTE: at present, metrics are NOT collected for expressionTools
// this should be fixed
go engine.collectResourceMetrics(tool)

if err = engine.listenForDone(tool); err != nil {
return engine.errorf("failed to listen for task to finish: %v; error: %v", tool.Task.Root.ID, err)
}
Expand Down Expand Up @@ -453,32 +448,16 @@ func (engine *K8sEngine) listenForDone(tool *Tool) (err error) {
return nil
}

// #no-fuse - this has to change!
// currently, regrettably, expressiontools run "in the engine", not in separate containers
// need to revisit this in detail
// figure out if expression tools should be dispatched as jobs
// or if it's okay that they run "in the engine"
// probably no actual computation of any kind should run "in the engine"
// so I think the expressiontool should run as a job, just like commandlinetools
// runExpressionTool uses the engine to dispatch a task job for a given tool to evaluate an expression.
func (engine *K8sEngine) runExpressionTool(tool *Tool) (err error) {
engine.infof("begin run ExpressionTool: %v", tool.Task.Root.ID)
// note: context has already been loaded
if err = os.Chdir(tool.WorkingDir); err != nil {
return engine.errorf("failed to move to tool working dir: %v; error: %v", tool.Task.Root.ID, err)
}
result, err := evalExpression(tool.Task.Root.Expression, tool.InputsVM)
err = tool.evaluateExpression()
if err != nil {
return engine.errorf("failed to eval expression for ExpressionTool: %v; error: %v", tool.Task.Root.ID, err)
return engine.errorf("failed to evaluate expression for tool: %v; error: %v", tool.Task.Root.ID, err)
}
os.Chdir("/") // move back (?) to root after tool finishes execution -> or, where should the default directory position be?

// expression must return a JSON object where the keys are the IDs of the ExpressionTool outputs
// see description of `expression` field here:
// https://www.commonwl.org/v1.0/Workflow.html#ExpressionTool
var ok bool
tool.ExpressionResult, ok = result.(map[string]interface{})
if !ok {
return engine.errorf("ExpressionTool expression did not return a JSON object: %v", tool.Task.Root.ID)
err = engine.dispatchTaskJob(tool)
if err != nil {
return engine.errorf("failed to dispatch task job: %v; error: %v", tool.Task.Root.ID, err)
}
engine.infof("end run ExpressionTool: %v", tool.Task.Root.ID)
return nil
Expand Down
25 changes: 4 additions & 21 deletions mariner/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,27 +168,12 @@ func (engine *K8sEngine) s3KeyToLocalPath(key string) string {
return strings.Replace(key, engine.UserID, engineWorkspaceVolumeName, 1)
}

// loads contents of file into the File.Contents field
// #no-fuse - read from s3, not locally
func (engine *K8sEngine) loadContents(f *File) (err error) {

// loadContents downloads contents for a file from the engine's S3 file manager to populate the file contents field.
func (engine *K8sEngine) loadContents(file *File) (err error) {
sess := engine.S3FileManager.newS3Session()
downloader := s3manager.NewDownloader(sess)

// Location field stores full path, no need to handle prefix here
s3Key := strings.TrimPrefix(engine.localPathToS3Key(f.Location), "/")

// Create a buffer to write the S3 Object contents to.
// see: https://stackoverflow.com/questions/41645377/golang-s3-download-to-buffer-using-s3manager-downloader
s3Key := engine.localPathToS3Key(file.Location)
buf := &aws.WriteAtBuffer{}

// read up to 64 KiB from file, as specified in CWL docs
// 1 KiB is 1024 bytes -> 64 KiB is 65536 bytes
//
// S3 sdk supports specifying byte ranges
// in this way: https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35

// Write the contents of S3 Object to the buffer
s3Obj := &s3.GetObjectInput{
Bucket: aws.String(engine.S3FileManager.S3BucketName),
Key: aws.String(s3Key),
Expand All @@ -198,9 +183,7 @@ func (engine *K8sEngine) loadContents(f *File) (err error) {
if err != nil {
return fmt.Errorf("failed to download file, %v", err)
}

// populate File.Contents field with contents
f.Contents = string(buf.Bytes())
file.Contents = string(buf.Bytes())
return nil
}

Expand Down
114 changes: 7 additions & 107 deletions mariner/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,42 +219,15 @@ func (tool *Tool) processFileList(l interface{}) ([]*File, error) {
return out, nil
}

// if err and input is not optional, it is a fatal error and the run should fail out
// transformInput parses all input in a workflow from the engine's tool.
func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out interface{}, err error) {
tool.Task.infof("begin transform input: %v", input.ID)

/*
NOTE: presently only context loaded into js vm's here is `self`
Will certainly need to add more context to handle all cases
Definitely, definitely need a generalized method for loading appropriate context at appropriate places
In particular, the `inputs` context is probably going to be needed most commonly
OTHERNOTE: `self` (in js vm) takes on different values in different places, according to cwl docs
see: https://www.commonwl.org/v1.0/Workflow.html#Parameter_references
---
Steps:
1. handle ValueFrom case at stepInput level
- if no ValueFrom specified, assign parameter value to `out` to processed in next step
2. handle ValueFrom case at toolInput level
- initial value is `out` from step 1
*/
localID := lastInPath(input.ID)

// stepInput ValueFrom case
if tool.StepInputMap[localID] != nil {
// no processing needs to happen if the valueFrom field is empty
if tool.StepInputMap[localID].ValueFrom != "" {
// here the valueFrom field is not empty, so we need to handle valueFrom
valueFrom := tool.StepInputMap[localID].ValueFrom
if strings.HasPrefix(valueFrom, "$") {
// valueFrom is an expression that needs to be eval'd

// get a js vm
vm := tool.JSVM.Copy() // #js-runtime

// preprocess struct/array so that fields can be accessed in vm
// Question: how to handle non-array/struct data types?
// --------- no preprocessing should have to happen in this case.
vm := tool.JSVM.Copy()
self, err := tool.loadInputValue(input)
if err != nil {
return nil, tool.Task.errorf("failed to load value: %v", err)
Expand All @@ -263,75 +236,29 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
if err != nil {
return nil, tool.Task.errorf("failed to preprocess context: %v", err)
}

// set `self` variable in vm
if err = vm.Set("self", self); err != nil {
return nil, tool.Task.errorf("failed to set 'self' value in js vm: %v", err)
}

/*
// Troubleshooting js
// note: when accessing object fields using keys must use otto.Run("obj.key"), NOT otto.Get("obj.key")
fmt.Println("self in js:")
jsSelf, err := vm.Get("self")
jsSelfVal, err := jsSelf.Export()
PrintJSON(jsSelfVal)
fmt.Println("Expression:")
PrintJSON(valueFrom)
fmt.Println("Object.keys(self)")
keys, err := vm.Run("Object.keys(self)")
if err != nil {
fmt.Printf("Error evaluating Object.keys(self): %v\n", err)
}
keysVal, err := keys.Export()
PrintJSON(keysVal)
*/

// eval the expression in the vm, capture result in `out`
if out, err = evalExpression(valueFrom, vm); err != nil {
return nil, tool.Task.errorf("failed to eval js expression: %v; error: %v", valueFrom, err)
}
} else {
// valueFrom is not an expression - take raw string/val as value
out = valueFrom
}
}
}

// if this tool is not a step of a parent workflow
// OR
// if this tool is a step of a parent workflow but the valueFrom is empty
if out == nil {
out, err = tool.loadInputValue(input)
if err != nil {
return nil, tool.Task.errorf("failed to load input value: %v", err)
}
if out == nil {
// implies an optional parameter with no value provided and no default value specified
// this input parameter is not used by the tool
tool.Task.infof("optional input with no value or default provided - skipping: %v", input.ID)
return nil, nil
}
}

// if file, need to ensure that all file attributes get populated (e.g., basename)
/*
fixme: handle array of files
Q: what about directories (?)
do this:
switch statement:
case file
case []file
note: check types in the param type list?
vs. checking types of actual values
*/

switch {
case isFile(out):
if out, err = tool.processFile(out); err != nil {
Expand All @@ -342,14 +269,9 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
return nil, tool.Task.errorf("failed to process file list: %v; error: %v", out, err)
}
default:
// fmt.Println("is not a file object")
tool.Task.infof("input is not a file object: %v", input.ID)
}

// ######### Load Secondary Files ############
// ######### ought to encapsulate this to a function
// ######### and call it for inputs and outputs
// ######### since it's basically the same process both times

if len(input.SecondaryFiles) > 0 {
var fileArray []*File
switch {
Expand All @@ -358,48 +280,33 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
case isArrayOfFile(out):
fileArray = out.([]*File)
default:
// this is a fatal error - engine should fail out here
// but don't panic - actually handle this err and fail out gracefully
panic("invalid input: secondary files specified for a non-file input")
return nil, tool.Task.errorf("invalid input: secondary files specified for a non-file input.")
}

for _, entry := range input.SecondaryFiles {
val := entry.Entry
if strings.HasPrefix(val, "$") {
vm := tool.JSVM.Copy()
for _, fileObj := range fileArray {
// preprocess output file object
self, err := preProcessContext(fileObj)
if err != nil {
return nil, tool.Task.errorf("%v", err)
}
// set `self` variable name
// assuming it is okay to use one vm for all evaluations and just reset the `self` variable before each eval
vm.Set("self", self)

// eval js
jsResult, err := evalExpression(val, vm)
if err != nil {
return nil, tool.Task.errorf("%v", err)
}

// retrieve secondaryFile's path (type interface{} with underlying type string)
sFilePath, ok := jsResult.(string)
if !ok {
return nil, tool.Task.errorf("secondaryFile expression did not return string")
}

if exist, _ := engine.fileExists(sFilePath); !exist {
// fatal error
panic("secondary file doesn't exist")
return nil, tool.Task.errorf("secondary file doesn't exist")
}

// get file object for secondaryFile and append it to the input file's SecondaryFiles field
sFileObj := fileObject(sFilePath)
fileObj.SecondaryFiles = append(fileObj.SecondaryFiles, sFileObj)
}
} else {
// follow those two steps indicated at the bottom of the secondaryFiles field description
suffix, carats := trimLeading(val, "^")
for _, fileObj := range fileArray {
engine.loadSFilesFromPattern(tool, fileObj, suffix, carats)
Expand All @@ -415,15 +322,11 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
}
}

// at this point, variable `out` is the transformed input thus far (even if no transformation actually occured)
// so `out` will be what we work with in this next block as an initial value
// tool inputBinding ValueFrom case
if input.Binding != nil && input.Binding.ValueFrom != nil {
valueFrom := input.Binding.ValueFrom.String
if strings.HasPrefix(valueFrom, "$") {
vm := tool.JSVM.Copy() // #js-runtime
vm := tool.JSVM.Copy()
var context interface{}
// fixme: handle array of files
switch out.(type) {
case *File, []*File:
context, err = preProcessContext(out)
Expand All @@ -433,17 +336,14 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
default:
context = out
}

vm.Set("self", context) // NOTE: again, will more than likely need additional context here to cover other cases
vm.Set("self", context)
if out, err = evalExpression(valueFrom, vm); err != nil {
return nil, tool.Task.errorf("failed to eval expression: %v; error: %v", valueFrom, err)
}
} else {
// not an expression, so no eval necessary - take raw value
out = valueFrom
}
}

tool.Task.infof("end transform input: %v", input.ID)
return out, nil
}
Expand Down
Loading

0 comments on commit 1cb8dcf

Please sign in to comment.