Skip to content
This repository has been archived by the owner on Jul 14, 2022. It is now read-only.

Commit

Permalink
Merge pull request #37 from uc-cdis/feat/gwas-pilot
Browse files Browse the repository at this point in the history
Feat/gwas pilot
  • Loading branch information
mattgarvin1 committed Oct 1, 2020
2 parents 962b9a5 + cbb5b73 commit dc4d5bf
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 97 deletions.
45 changes: 27 additions & 18 deletions mariner/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ func (tool *Tool) cmdElts() (cmdElts CommandElements, err error) {
}

// fixme: handle optional inputs
// UPDATE: optional inputs which are not provided should input.Binding as nil
func (tool *Tool) inputElts() (cmdElts CommandElements, err error) {
tool.Task.infof("begin handle command input elements")

cmdElts = make([]*CommandElement, 0)
var inputType string
for _, input := range tool.Task.Root.Inputs {
Expand Down Expand Up @@ -193,6 +195,9 @@ func inputValue(input *cwl.Input, rawInput interface{}, inputType string, bindin
5. handle shellQuote -> need to test (feature not yet supported on array inputs)
*/

// NOTICE: shellQuote default value is true - everything gets shellQuote'd unless `shellQuote: false` is specified
// see: https://www.commonwl.org/v1.0/CommandLineTool.html#ShellCommandRequirement

var s string
switch inputType {
case "object": // TODO - presently bindings on 'object' inputs not supported - have yet to find an example to work with
Expand Down Expand Up @@ -272,16 +277,19 @@ func inputValue(input *cwl.Input, rawInput interface{}, inputType string, bindin
}
// "if true, add 'prefix' to the commandline. If false, add nothing."
if boolVal {
// need to test shellQuote feature
if binding.ShellQuote {
val = append(val, "\""+binding.Prefix+"\"")
} else {
val = append(val, binding.Prefix)
}
/*
// need to test shellQuote feature - default value is true
if binding.ShellQuote {
val = append(val, "\""+binding.Prefix+"\"")
} else {
val = append(val, binding.Prefix)
}
*/
val = append(val, binding.Prefix)
}
return val, nil

case "string", "number":
case "string", "number", "int", "long", "float", "double":
s, err = valFromRaw(rawInput)
case CWLFileType, CWLDirectoryType:
s, err = pathFromRaw(rawInput)
Expand All @@ -298,10 +306,12 @@ func inputValue(input *cwl.Input, rawInput interface{}, inputType string, bindin
if !binding.Separate {
val = []string{strings.Join(val, "")}
}
// need to test ShellQuote feature
if binding.ShellQuote {
val = []string{"\"" + strings.Join(val, " ") + "\""}
}
/*
// need to test ShellQuote feature - default value is true
if binding.ShellQuote {
val = []string{"\"" + strings.Join(val, " ") + "\""}
}
*/
return val, nil
}

Expand Down Expand Up @@ -421,11 +431,9 @@ func (tool *Tool) argVal(arg cwl.Argument) (val []string, err error) {
val = make([]string, 0)
if arg.Value != "" {
// implies string literal or expression to eval - see NOTE at typeSwitch
// fmt.Println("string literal or expression to eval..")
// NOTE: *might* need to check "$(" or "${" instead of just "$"
if strings.HasPrefix(arg.Value, "$") {
// expression to eval - here `self` is null - no additional context to load - just need to eval in inputsVM
// fmt.Println("expression to eval..")
result, err := evalExpression(arg.Value, tool.Task.Root.InputsVM)
if err != nil {
return nil, tool.Task.errorf("failed to evaluate expression: %v; err: %v", arg.Value, err)
Expand All @@ -444,17 +452,18 @@ func (tool *Tool) argVal(arg cwl.Argument) (val []string, err error) {
val = append(val, arg.Value)
}
} else {
// fmt.Println("resolving valueFrom..")
// get value from `valueFrom` field which may itself be a string literal, an expression, or a string which contains one or more expressions
resolvedText, _, err := tool.resolveExpressions(arg.Binding.ValueFrom.String)
if err != nil {
return nil, tool.Task.errorf("%v", err)
}

// handle shellQuote - default value is true
if arg.Binding.ShellQuote {
resolvedText = "\"" + resolvedText + "\""
}
/*
// handle shellQuote - default value is true
if arg.Binding.ShellQuote {
resolvedText = "\"" + resolvedText + "\""
}
*/

// capture result
val = append(val, resolvedText)
Expand Down
39 changes: 32 additions & 7 deletions mariner/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/exec"
"strings"
"sync"
"time"

cwl "github.com/uc-cdis/cwl.go"
Expand All @@ -22,6 +23,7 @@ import (
// NOTE: engine object code store all the logs/event-monitoring/statistics for the workflow run
// ----- create some field, define a sensible data structure to easily collect/store/retreive logs
type K8sEngine struct {
sync.RWMutex `json:"-"`
TaskSequence []string // for testing purposes
UnfinishedProcs map[string]bool // engine's stack of CLT's that are running; (task.Root.ID, Process) pairs
FinishedProcs map[string]bool // engine's stack of completed processes; (task.Root.ID, Process) pairs
Expand Down Expand Up @@ -132,9 +134,13 @@ func done(runID string) error {
}

// DispatchTask does some setup for and dispatches workflow Tools
func (engine K8sEngine) dispatchTask(task *Task) (err error) {
func (engine *K8sEngine) dispatchTask(task *Task) (err error) {
engine.infof("begin dispatch task: %v", task.Root.ID)
tool := task.tool(engine.RunID)

engine.Lock()
tool := task.tool(engine.RunID) // #race #ok
engine.Unlock()

err = tool.setupTool()
if err != nil {
return engine.errorf("failed to setup tool: %v; error: %v", task.Root.ID, err)
Expand All @@ -156,16 +162,25 @@ func (engine K8sEngine) dispatchTask(task *Task) (err error) {

// move proc from unfinished to finished stack
func (engine *K8sEngine) finishTask(task *Task) {
engine.Lock()
defer engine.Unlock()

delete(engine.UnfinishedProcs, task.Root.ID)
engine.FinishedProcs[task.Root.ID] = true
engine.Log.finish(task)
task.Done = &trueVal

// task.Lock()
task.Done = &trueVal // #race #ok
// task.Unlock()
}

// push newly started process onto the engine's stack of running processes
// initialize log
func (engine *K8sEngine) startTask(task *Task) {
engine.UnfinishedProcs[task.Root.ID] = true
engine.Lock()
engine.UnfinishedProcs[task.Root.ID] = true // #race #ok
engine.Unlock()

engine.Log.start(task)
}

Expand All @@ -181,8 +196,8 @@ func (engine *K8sEngine) collectOutput(tool *Tool) error {
// The Tool represents a workflow Tool and so is either a CommandLineTool or an ExpressionTool
func (task *Task) tool(runID string) *Tool {
task.infof("begin make tool object")
task.Outputs = make(map[string]interface{})
task.Log.Output = task.Outputs
task.Outputs = make(map[string]interface{}) // #race #ok
task.Log.Output = task.Outputs // #race #ok
tool := &Tool{
Task: task,
WorkingDir: task.workingDir(runID),
Expand All @@ -198,7 +213,17 @@ func (task *Task) tool(runID string) *Tool {
// ----- could come up with a better/more uniform naming scheme
func (task *Task) workingDir(runID string) string {
task.infof("begin make task working dir")

safeID := strings.ReplaceAll(task.Root.ID, "#", "")

// task.Root.ID is not unique among tool runs
// so, adding this random 4 char suffix
// this is not really a perfect solution
// because errors can still happen, though with very low probability
// "error" here meaning by chance creating a `safeID` that's already been used
// --- by a previous run of this same tool/task object
safeID = fmt.Sprintf("%v-%v", safeID, getRandString(4))

dir := fmt.Sprintf(pathToWorkingDirf, runID, safeID)
if task.ScatterIndex > 0 {
dir = fmt.Sprintf("%v-scatter-%v", dir, task.ScatterIndex)
Expand Down Expand Up @@ -275,7 +300,7 @@ func (engine *K8sEngine) runTool(tool *Tool) (err error) {
// runCommandLineTool..
// 1. generates the command to execute
// 2. makes call to RunK8sJob to dispatch job to run the commandline tool
func (engine K8sEngine) runCommandLineTool(tool *Tool) (err error) {
func (engine *K8sEngine) runCommandLineTool(tool *Tool) (err error) {
engine.infof("begin run CommandLineTool: %v", tool.Task.Root.ID)
err = tool.generateCommand()
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions mariner/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type File struct {
Basename string `json:"basename"` // last element of location path
NameRoot string `json:"nameroot"` // basename without file extension
NameExt string `json:"nameext"` // file extension of basename
DirName string `json:"dirname"` // name of directory containing the file
Contents string `json:"contents"` // first 64 KiB of file as a string, if loadContents is true
SecondaryFiles []*File `json:"secondaryFiles"` // array of secondaryFiles
}
Expand All @@ -44,22 +45,23 @@ type File struct {
// right now they hold the exact same path
// prefixissue - don't need to handle here - the 'path' argument is the full path, with working dir and all
func fileObject(path string) (fileObj *File) {
base, root, ext := fileFields(path)
base, root, ext, dirname := fileFields(path)
fileObj = &File{
Class: CWLFileType,
Location: path, // stores the full path
Path: path,
Basename: base,
NameRoot: root,
NameExt: ext,
DirName: dirname,
}
return fileObj
}

// pedantic splitting regarding leading periods in the basename
// see: https://www.commonwl.org/v1.0/Workflow.html#File
// the description of nameroot and nameext
func fileFields(path string) (base string, root string, ext string) {
func fileFields(path string) (base string, root string, ext string, dirname string) {
base = lastInPath(path)
baseNoLeadingPeriods, nPeriods := trimLeading(base, ".")
tmp := strings.Split(baseNoLeadingPeriods, ".")
Expand All @@ -73,7 +75,8 @@ func fileFields(path string) (base string, root string, ext string) {
}
// add back any leading periods that were trimmed from base
root = strings.Repeat(".", nPeriods) + root
return base, root, ext
dirname = strings.TrimSuffix(path, fmt.Sprintf("/%v", base))
return base, root, ext, dirname
}

// given a string s and a character char
Expand Down
Loading

0 comments on commit dc4d5bf

Please sign in to comment.