Skip to content
This repository has been archived by the owner on Jul 14, 2022. It is now read-only.

Commit

Permalink
Merge pull request #59 from uc-cdis/feat/pre-genesis-wf
Browse files Browse the repository at this point in the history
Feat/pre genesis wf
  • Loading branch information
UchicagoZchen138 committed Oct 12, 2021
2 parents 858a386 + f945835 commit 19e1d37
Show file tree
Hide file tree
Showing 14 changed files with 629 additions and 40 deletions.
13 changes: 13 additions & 0 deletions mariner/TECHDEBT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Tech Debt

### resolveExpression ${} parsing

Created: 9/30/2021

Issue: When parsing ${} expressions, we assume it is only 1 ${} and do not ensure that there are multiple js expressions

Why this occured: This was a stopgap to get the pre genesis workflow working, previosuly mariner did not support evaluation of ${} expressions

Imapct: We won't be able to correctly parse and evaluate ${} ${} .... ${}

Planned Resolution: As part of PXP-8786, we will refactor how js expressions are evaluated and will fix this issue.
1 change: 1 addition & 0 deletions mariner/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Tool struct {
ExpressionResult map[string]interface{}
Task *Task
S3Input *ToolS3Input
initWorkDirFiles []string

// dev'ing
// need to load this with runtime context as per CWL spec
Expand Down
2 changes: 2 additions & 0 deletions mariner/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus"
)

// this file contains code for handling/processing file objects
Expand Down Expand Up @@ -174,6 +175,7 @@ func (engine *K8sEngine) loadContents(file *File) (err error) {
downloader := s3manager.NewDownloader(sess)
s3Key := engine.localPathToS3Key(file.Location)
buf := &aws.WriteAtBuffer{}
log.Debugf("here is the s3 file that we are downloading %s", s3Key)
s3Obj := &s3.GetObjectInput{
Bucket: aws.String(engine.S3FileManager.S3BucketName),
Key: aws.String(s3Key),
Expand Down
19 changes: 17 additions & 2 deletions mariner/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sort"
"strings"

log "github.com/sirupsen/logrus"

cwl "github.com/uc-cdis/cwl.go"
)

Expand Down Expand Up @@ -75,6 +77,7 @@ func (engine *K8sEngine) loadInput(tool *Tool, input *cwl.Input) (err error) {
required := true
if provided, err := engine.transformInput(tool, input); err == nil {
if provided == nil {
tool.Task.infof("found optional input: %v", input.ID)
// optional input with no value or default provided
// this is an unused input parameter
// and so does not show up on the command line
Expand Down Expand Up @@ -107,7 +110,7 @@ func (engine *K8sEngine) loadInput(tool *Tool, input *cwl.Input) (err error) {

// wrapper around processFile() - collects path of input file and all secondary files
func (tool *Tool) processFile(f interface{}) (*File, error) {
obj, err := processFile(f)
obj, err := processFile(tool, f)
if err != nil {
return nil, err
}
Expand All @@ -127,7 +130,7 @@ func (tool *Tool) processFile(f interface{}) (*File, error) {

// called in transformInput() routine
// handles path prefix issue
func processFile(f interface{}) (*File, error) {
func processFile(tool *Tool, f interface{}) (*File, error) {

// if it's already of type File or *File, it requires no processing
if obj, ok := f.(File); ok {
Expand Down Expand Up @@ -173,6 +176,7 @@ func processFile(f interface{}) (*File, error) {
*/
GUID := strings.TrimPrefix(path, commonsPrefix)
path = strings.Join([]string{pathToCommonsData, GUID}, "")
log.Debugf("here is the uid path %s", path)
case strings.HasPrefix(path, userPrefix):
/*
~ Path representations/handling for user-data ~
Expand All @@ -188,6 +192,7 @@ func processFile(f interface{}) (*File, error) {
*/
trimmedPath := strings.TrimPrefix(path, userPrefix)
path = strings.Join([]string{"/", engineWorkspaceVolumeName, "/", trimmedPath}, "")

case strings.HasPrefix(path, conformancePrefix):
trimmedPath := strings.TrimPrefix(path, conformancePrefix)
path = strings.Join([]string{"/", conformanceVolumeName, "/", trimmedPath}, "")
Expand Down Expand Up @@ -224,25 +229,33 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
tool.Task.infof("begin transform input: %v", input.ID)
localID := lastInPath(input.ID)
if tool.StepInputMap[localID] != nil {
tool.Task.infof("found StepInputMap record for: %v", localID)
tool.Task.infof("ValueFrom record %v; for: %v", tool.StepInputMap[localID].ValueFrom, input.ID)
if tool.StepInputMap[localID].ValueFrom != "" {
valueFrom := tool.StepInputMap[localID].ValueFrom
if strings.HasPrefix(valueFrom, "$") {
tool.Task.infof("found JS ValueFrom for input: %v", input.ID)
vm := tool.JSVM.Copy()
self, err := tool.loadInputValue(input)
if err != nil {
return nil, tool.Task.errorf("failed to load value: %v", err)
}
tool.Task.infof("loaded input value: %v; from input: %v", self, input.ID)
self, err = preProcessContext(self)
if err != nil {
return nil, tool.Task.errorf("failed to preprocess context: %v", err)
}
tool.Task.infof("preprocess input value: %v; from input: %v", self, input.ID)
if err = vm.Set("self", self); err != nil {
return nil, tool.Task.errorf("failed to set 'self' value in js vm: %v", err)
}
tool.Task.infof("for input: %v; evaluating expression: %v", input.ID, valueFrom)
if out, err = evalExpression(valueFrom, vm); err != nil {
return nil, tool.Task.errorf("failed to eval js expression: %v; error: %v", valueFrom, err)
}
tool.Task.infof("for input: %v; expression returned: %v", input.ID, out)
} else {
tool.Task.infof("no JS in ValueFrom for input: %v; assigning: %v", input.ID, valueFrom)
out = valueFrom
}
}
Expand All @@ -253,6 +266,7 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
if err != nil {
return nil, tool.Task.errorf("failed to load input value: %v", err)
}
tool.Task.infof("for input: %v; out is nil, so loaded: %v", input.ID, out)
if out == nil {
tool.Task.infof("optional input with no value or default provided - skipping: %v", input.ID)
return nil, nil
Expand Down Expand Up @@ -430,6 +444,7 @@ func (tool *Tool) inputsToVM() (err error) {
}
} else {
// valueFrom specified in inputBinding - resulting value stored in input.Provided.Raw
tool.Task.infof("input: %v; input provided raw: %v", input.ID, input.Provided.Raw)
switch input.Provided.Raw.(type) {
case string:
f = fileObject(input.Provided.Raw.(string))
Expand Down
51 changes: 31 additions & 20 deletions mariner/js.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/robertkrimen/otto"
log "github.com/sirupsen/logrus"
)

// this file contains code for evaluating JS expressions encountered in the CWL
Expand Down Expand Up @@ -54,6 +55,11 @@ func js(s string) (js string, fn bool, err error) {
fn = strings.HasPrefix(s, "${")
s = strings.TrimLeft(s, "$(\n")
s = strings.TrimRight(s, ")\n")

if strings.Count(s, "(") != strings.Count(s, ")") {
log.Debugf("expression %s has an unequal amount of left and right parenthesis and will not be evaluated correctly", s)
s = s + ")"
}
return s, fn, nil
}

Expand Down Expand Up @@ -86,7 +92,7 @@ func evalExpression(exp string, vm *otto.Otto) (result interface{}, err error) {
} else {
output, err = vm.Run(js)
if err != nil {
return nil, fmt.Errorf("failed to evaluate js expression: %v", err)
return nil, fmt.Errorf("failed to evaluate js expression: %v, here is the js expression %s", err, js)
}
}
result, _ = output.Export()
Expand All @@ -103,17 +109,31 @@ func (tool *Tool) evalExpression(exp string) (result interface{}, err error) {
return val, nil
}

// resolveExpressions processes a text field which may or may not be
// - one expression
// - a string literal
// - a string which contains one or more separate JS expressions, each wrapped like $(...)
// presently writing simple case to return a string only for use in the argument valueFrom case
// can easily extend in the future to be used for any field, to return any kind of value
// NOTE: should work - needs to be tested more
// NOTE: successful output is one of (text, nil, nil) or ("", *f, nil)
// algorithm works in goplayground: https://play.golang.org/p/YOv-K-qdL18
// resolveExpressions intakes a string which can be an expression, a string literal, or JS expression like $(...) or ${...} and resolves it. The resolved result is put into a file and the file pointer is returned
func (tool *Tool) resolveExpressions(inText string) (outText string, outFile *File, err error) {
tool.Task.infof("begin resolve expression: %v", inText)
// TODO: assert that the full inText is only a single JS expression. and refactor logic in the per-rune parser.
if len(inText) > 1 {
if inText[0] == '$' && inText[1] == '{' {
tool.Task.infof("Interpreting as single JS expression: %v", inText)
result, err := evalExpression(inText, tool.InputsVM)
if err != nil {
return "", nil, tool.Task.errorf("%v", err)
}

switch result.(type) {
case string:
outText = result.(string)
case File:
f := result.(File)
return "", &f, nil
}

tool.Task.infof("end resolve expression. resolved text: %v", outText)
return outText, nil, nil
}
}

r := bufio.NewReader(strings.NewReader(inText))
var c0, c1, c2 string
var done bool
Expand All @@ -127,48 +147,39 @@ func (tool *Tool) resolveExpressions(inText string) (outText string, outFile *Fi
return "", nil, tool.Task.errorf("%v", err)
}
}

c0, c1, c2 = c1, c2, string(nextRune)
if c1 == "$" && c2 == "(" && c0 != "\\" {
// indicates beginning of expression block

// read through to the end of this expression block
expression, err := r.ReadString(')')
if err != nil {
return "", nil, tool.Task.errorf("%v", err)
}

// get full $(...) expression
expression = c1 + c2 + expression

// eval that thing
result, err := evalExpression(expression, tool.InputsVM)
if err != nil {
return "", outFile, tool.Task.errorf("%v", err)
}

// result ought to be a string (edit: OR a file)
switch result.(type) {
case string:
val := result.(string)

// cut off trailing "$" that had already been collected
image = image[:len(image)-1]

// collect resulting string
image = append(image, val)
case File:
f := result.(File)
return "", &f, nil
}
} else {
if !done {
// checking done so as to not collect null value
image = append(image, string(c2))
}
}
}

// get resolved string value
outText = strings.Join(image, "")
tool.Task.infof("end resolve expression. resolved text: %v", outText)
return outText, nil, nil
Expand Down
5 changes: 5 additions & 0 deletions mariner/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,13 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) {

// for marinerTask job
func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) {
initWorkDirFiles := strings.Join(tool.initWorkDirFiles, ",")
engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID)
env = []k8sv1.EnvVar{
{
Name: "InitWorkDirFiles",
Value: initWorkDirFiles,
},
{
Name: "IsInitWorkDir",
Value: engine.IsInitWorkDir,
Expand Down
5 changes: 5 additions & 0 deletions mariner/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ func (engine *K8sEngine) infof(f string, v ...interface{}) {
engine.writeLogToS3()
}

func (engine *K8sEngine) debugf(f string, v ...interface{}) {
engine.Log.Main.Event.infof(f, v...)
engine.writeLogToS3()
}

func (task *Task) errorf(f string, v ...interface{}) error {
return task.Log.Event.errorf(f, v...)
}
Expand Down
7 changes: 4 additions & 3 deletions mariner/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
log "github.com/sirupsen/logrus"
cwl "github.com/uc-cdis/cwl.go"
)

Expand Down Expand Up @@ -69,9 +70,9 @@ func (engine *K8sEngine) handleCLTOutput(tool *Tool) (err error) {
if err = tool.outputEval(&output, results); err != nil {
return tool.Task.errorf("%v", err)
}
// if outputEval, then the resulting value from the expression eval is assigned to the output parameter
// hence the function HandleCLTOutput() returns here
return nil

log.Debugf("here is the eval %s", output.Binding.Eval.Raw)
continue
}

// 4. SecondaryFiles - currently only supporting simplest case when handling expressions here
Expand Down
7 changes: 3 additions & 4 deletions mariner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,6 @@ func (server *Server) authZ(r *http.Request) bool {

// fetchRefreshToken is invoked from the server to check if a refresh token is expired and fetches a new one if it is.
func (server *Server) fetchRefreshToken() bool {
logrus.Info("we got to refresh token")
wtsPath := "http://workspace-token-service/oauth2/"
connectedUrl := wtsPath + "connected"
res, err := http.Get(connectedUrl)
Expand All @@ -524,15 +523,15 @@ func (server *Server) fetchRefreshToken() bool {
return false
}
if res.StatusCode != 200 {
logrus.Info("refresh token expired or user not logged in, fetching new refresh token")
logrus.Debug("refresh token expired or user not logged in, fetching new refresh token")
authUrl := wtsPath + "authorization_url?redirect=/"
res, err := http.Get(authUrl)
if err != nil {
fmt.Println("error fetching refresh token from wts")
logrus.Error("error fetching refresh token from wts")
return false
}
if res.StatusCode == 400 {
fmt.Println("wts refresh token bad request, user error")
logrus.Error("wts refresh token bad request, user error")
return false
}
res.Body.Close()
Expand Down
Loading

0 comments on commit 19e1d37

Please sign in to comment.