Skip to content
This repository has been archived by the owner on Jul 14, 2022. It is now read-only.

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
UchicagoZchen138 committed Nov 2, 2021
1 parent 72e94b5 commit b5827bf
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 54 deletions.
2 changes: 2 additions & 0 deletions mariner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
commonsPrefix = "COMMONS/"
userPrefix = "USER/"
conformancePrefix = "CONFORMANCE/"
workspacePrefix = "/" + engineWorkspaceVolumeName
gatewayPrefix = "/" + commonsDataVolumeName

notStarted = "not-started" // 3
running = "running" // 2
Expand Down
10 changes: 4 additions & 6 deletions mariner/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type K8sEngine struct {
Manifest *Manifest // to pass the manifest to the gen3fuse container of each task pod
Log *MainLog //
KeepFiles map[string]bool // all the paths to not delete during basic file cleanup
IsInitWorkDir string
}

// Tool represents a leaf in the graph of a workflow
Expand All @@ -63,7 +62,6 @@ type Tool struct {
ExpressionResult map[string]interface{}
Task *Task
S3Input []*ToolS3Input
initWorkDirFiles []string

// dev'ing
// need to load this with runtime context as per CWL spec
Expand All @@ -84,9 +82,9 @@ type TaskRuntimeJSContext struct {

// ToolS3Input ..
type ToolS3Input struct {
URL string `json:"url"` // S3 URL
Path string `json:"path"` // Local path for dl
InitWorkDir bool `json:"init_work_dir"` // is this an initwkdir requirement?
URL string `json:"url"` // S3 URL
Path string `json:"path"` // Local path for dl
InitWorkDir bool `json:"init_work_dir"` // is this an initwkdir requirement?
}

// Engine runs an instance of the mariner engine job
Expand Down Expand Up @@ -283,7 +281,7 @@ func (task *Task) tool(runID string) *Tool {
tool := &Tool{
Task: task,
WorkingDir: task.workingDir(runID),
S3Input: []*ToolS3Input{},
S3Input: []*ToolS3Input{},
}
tool.JSVM = tool.newJSVM()
task.infof("end make tool object")
Expand Down
9 changes: 0 additions & 9 deletions mariner/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,8 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) {

// for marinerTask job
func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) {
initWorkDirFiles := strings.Join(tool.initWorkDirFiles, ",")
engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID)
env = []k8sv1.EnvVar{
{
Name: "InitWorkDirFiles",
Value: initWorkDirFiles,
},
{
Name: "IsInitWorkDir",
Value: engine.IsInitWorkDir,
},
{
Name: "AWSCREDS",
ValueFrom: envVarAWSUserCreds,
Expand Down
66 changes: 27 additions & 39 deletions mariner/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,54 @@ import (
"bytes"
"encoding/json"
"fmt"
"path/filepath"
pathLib "path"
"path/filepath"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus"
)

func isValidPath(path string) bool {
validPrefixes := [...]string{userPrefix, commonsPrefix, workspacePrefix, gatewayPrefix}
for _, prefix := range validPrefixes {
if strings.HasPrefix(path, prefix) {
return true
}
}
return false
}

// this file contains some methods/functions for setting up and working with Tools (i.e., commandlinetools and expressiontools)

func pathHelper(path string, tool *Tool) (err error) {
if !isValidPath(path) {
log.Errorf("unsupported initwkdir path: %v", path)
return tool.Task.errorf("unsupported initwkdir path: %v", path)
}
url := ""
if strings.HasPrefix(path, userPrefix) {
trimmedPath := strings.TrimPrefix(path, userPrefix)
path = strings.Join([]string{"/", engineWorkspaceVolumeName, "/", trimmedPath}, "")
tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: path,
InitWorkDir: true,
})
tool.initWorkDirFiles = append(tool.initWorkDirFiles, path)
} else if strings.HasPrefix(path, "/"+engineWorkspaceVolumeName) {
tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: path,
InitWorkDir: true,
})
tool.initWorkDirFiles = append(tool.initWorkDirFiles, path)
tool.Task.infof("*File - Path: %v", path)
} else if strings.HasPrefix(path, commonsPrefix) {
guid := pathLib.Base(path)
indexFile, err := getIndexedFileInfo(guid)
if err != nil {
return tool.Task.errorf("Unable to get indexed record: %v; error: %v", guid, err)
}
path = pathLib.Join(pathToCommonsData, indexFile.Filename)
tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: indexFile.URLs[0],
Path: path,
InitWorkDir: true,
})
tool.initWorkDirFiles = append(tool.initWorkDirFiles, path)
} else if strings.HasPrefix(path, "/"+commonsDataVolumeName) {
tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: path,
InitWorkDir: true,
})
tool.initWorkDirFiles = append(tool.initWorkDirFiles, path)
tool.Task.infof("*File - Path: %v", path)
} else {
log.Errorf("unsupported initwkdir path: %v", path)
return tool.Task.errorf("unsupported initwkdir path: %v", path)
url = indexFile.URLs[0]
}

tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: url,
Path: path,
InitWorkDir: true,
})
tool.Task.infof("*File - Path: %v", path)

return nil
}

Expand Down Expand Up @@ -146,9 +138,7 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) {
return tool.Task.errorf("unsupported initwkdir type: %T; value: %v", output, output)
}
}
engine.IsInitWorkDir = "true"
tool.Task.infof("s3input paths: %v", tool.S3Input)
tool.Task.infof("initWorkDirFiles: %v", tool.initWorkDirFiles)
continue
}

Expand Down Expand Up @@ -204,7 +194,6 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) {
tool.Task.infof("raw key: %v", key)
tool.Task.infof("tool workdir: %v", tool.WorkingDir)


var b []byte
switch contents.(type) {
case string:
Expand Down Expand Up @@ -232,11 +221,10 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) {
}
log.Infof("init working directory request recieved")
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: "s3://" + filepath.Join(engine.S3FileManager.S3BucketName, key),
Path: filepath.Join(tool.WorkingDir, entryName),
URL: "s3://" + filepath.Join(engine.S3FileManager.S3BucketName, key),
Path: filepath.Join(tool.WorkingDir, entryName),
InitWorkDir: true,
})
engine.IsInitWorkDir = "true"
}
}
}
Expand Down

0 comments on commit b5827bf

Please sign in to comment.