Skip to content
This repository has been archived by the owner on Jul 14, 2022. It is now read-only.

Commit

Permalink
feat(commons): support commons accessing commons files in aws gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
kmhernan authored and UchicagoZchen138 committed Nov 2, 2021
1 parent dce4939 commit 48fa3ee
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 72 deletions.
4 changes: 3 additions & 1 deletion mariner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
commonsPrefix = "COMMONS/"
userPrefix = "USER/"
conformancePrefix = "CONFORMANCE/"
workspacePrefix = "/" + engineWorkspaceVolumeName
gatewayPrefix = "/" + commonsDataVolumeName

notStarted = "not-started" // 3
running = "running" // 2
Expand Down Expand Up @@ -109,7 +111,7 @@ const (
pathToUserRunsf = "%v/workflowRuns/" // fill with userID
pathToUserRunLogf = pathToUserRunsf + "%v/" + logFile // fill with runID

commonsDataPersistentVolumeClaimName = "mariner-nfs-pvc"
commonsDataPersistentVolumeClaimName = "nfs-commons-data-test-pvc"
)

var (
Expand Down
10 changes: 4 additions & 6 deletions mariner/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type K8sEngine struct {
Manifest *Manifest // to pass the manifest to the gen3fuse container of each task pod
Log *MainLog //
KeepFiles map[string]bool // all the paths to not delete during basic file cleanup
IsInitWorkDir string
}

// Tool represents a leaf in the graph of a workflow
Expand All @@ -63,7 +62,6 @@ type Tool struct {
ExpressionResult map[string]interface{}
Task *Task
S3Input []*ToolS3Input
initWorkDirFiles []string

// dev'ing
// need to load this with runtime context as per CWL spec
Expand All @@ -84,9 +82,9 @@ type TaskRuntimeJSContext struct {

// ToolS3Input ..
type ToolS3Input struct {
URL string `json:"url"` // S3 URL
Path string `json:"path"` // Local path for dl
InitWorkDir bool `json:"init_work_dir"` // is this an initwkdir requirement?
URL string `json:"url"` // S3 URL
Path string `json:"path"` // Local path for dl
InitWorkDir bool `json:"init_work_dir"` // is this an initwkdir requirement?
}

// Engine runs an instance of the mariner engine job
Expand Down Expand Up @@ -283,7 +281,7 @@ func (task *Task) tool(runID string) *Tool {
tool := &Tool{
Task: task,
WorkingDir: task.workingDir(runID),
S3Input: []*ToolS3Input{},
S3Input: []*ToolS3Input{},
}
tool.JSVM = tool.newJSVM()
task.infof("end make tool object")
Expand Down
32 changes: 16 additions & 16 deletions mariner/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ func (tool *Tool) processFile(f interface{}) (file *File, err error) {
if err != nil {
return nil, err
}
if strings.HasPrefix(obj.Path, pathToCommonsData) {
if strings.HasPrefix(obj.Path, commonsPrefix) {

err = appendCommonsFileInfo(obj.Path, tool)
if err != nil {
return nil, err
}
} else if !strings.HasPrefix(obj.Path, pathToCommonsData) {

} else {
// user files
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: obj.Path,
Expand All @@ -151,17 +151,17 @@ func (tool *Tool) processFile(f interface{}) (file *File, err error) {
// note: I don't think any input to this process will have secondary files loaded
// into this field at this point in the process
for _, sf := range obj.SecondaryFiles {
if !strings.HasPrefix(sf.Path, pathToCommonsData) {
if strings.HasPrefix(sf.Path, commonsPrefix) {
err = appendCommonsFileInfo(obj.Path, tool)
if err != nil {
return nil, err
}

} else if !strings.HasPrefix(sf.Path, pathToCommonsData) {
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: sf.Path,
InitWorkDir: false,
})
} else {
// commons file
err = appendCommonsFileInfo(sf.Path, tool)
if err != nil {
return nil, err
}
}
}
return obj, nil
Expand Down Expand Up @@ -368,17 +368,17 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
}
for _, fileObj := range fileArray {
for _, sf := range fileObj.SecondaryFiles {
if !strings.HasPrefix(sf.Location, pathToCommonsData) {
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: sf.Location,
InitWorkDir: false,
})
} else {
if strings.HasPrefix(sf.Location, commonsPrefix) {
// commons file
err = appendCommonsFileInfo(sf.Path, tool)
err = appendCommonsFileInfo(sf.Location, tool)
if err != nil {
return nil, err
}
} else if !strings.HasPrefix(sf.Location, pathToCommonsData) {
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: sf.Location,
InitWorkDir: false,
})
}
}
}
Expand Down
9 changes: 0 additions & 9 deletions mariner/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,8 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) {

// for marinerTask job
func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) {
initWorkDirFiles := strings.Join(tool.initWorkDirFiles, ",")
engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID)
env = []k8sv1.EnvVar{
{
Name: "InitWorkDirFiles",
Value: initWorkDirFiles,
},
{
Name: "IsInitWorkDir",
Value: engine.IsInitWorkDir,
},
{
Name: "AWSCREDS",
ValueFrom: envVarAWSUserCreds,
Expand Down
58 changes: 27 additions & 31 deletions mariner/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,54 @@ import (
"bytes"
"encoding/json"
"fmt"
"path/filepath"
pathLib "path"
"path/filepath"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus"
)

func isValidPath(path string) bool {
validPrefixes := [...]string{userPrefix, commonsPrefix, workspacePrefix, gatewayPrefix}
for _, prefix := range validPrefixes {
if strings.HasPrefix(path, prefix) {
return true
}
}
return false
}

// this file contains some methods/functions for setting up and working with Tools (i.e., commandlinetools and expressiontools)

func pathHelper(path string, tool *Tool) (err error) {
if !isValidPath(path) {
log.Errorf("unsupported initwkdir path: %v", path)
return tool.Task.errorf("unsupported initwkdir path: %v", path)
}
url := ""
if strings.HasPrefix(path, userPrefix) {
trimmedPath := strings.TrimPrefix(path, userPrefix)
path = strings.Join([]string{"/", engineWorkspaceVolumeName, "/", trimmedPath}, "")
tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: path,
InitWorkDir: true,
})
tool.initWorkDirFiles = append(tool.initWorkDirFiles, path)
} else if strings.HasPrefix(path, "/"+engineWorkspaceVolumeName) {
tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: path,
InitWorkDir: true,
})
tool.initWorkDirFiles = append(tool.initWorkDirFiles, path)
tool.Task.infof("*File - Path: %v", path)
} else if strings.HasPrefix(path, commonsPrefix) {
guid := pathLib.Base(path)
indexFile, err := getIndexedFileInfo(guid)
if err != nil {
return tool.Task.errorf("Unable to get indexed record: %v; error: %v", guid, err)
}
path = pathLib.Join(pathToCommonsData, indexFile.Filename)
tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: indexFile.URLs[0],
Path: path,
InitWorkDir: true,
})
tool.initWorkDirFiles = append(tool.initWorkDirFiles, path)
} else {
log.Errorf("unsupported initwkdir path: %v", path)
return tool.Task.errorf("unsupported initwkdir path: %v", path)
url = indexFile.URLs[0]
}

tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: url,
Path: path,
InitWorkDir: true,
})
tool.Task.infof("*File - Path: %v", path)

return nil
}

Expand Down Expand Up @@ -138,9 +138,7 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) {
return tool.Task.errorf("unsupported initwkdir type: %T; value: %v", output, output)
}
}
engine.IsInitWorkDir = "true"
tool.Task.infof("s3input paths: %v", tool.S3Input)
tool.Task.infof("initWorkDirFiles: %v", tool.initWorkDirFiles)
continue
}

Expand Down Expand Up @@ -196,7 +194,6 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) {
tool.Task.infof("raw key: %v", key)
tool.Task.infof("tool workdir: %v", tool.WorkingDir)


var b []byte
switch contents.(type) {
case string:
Expand Down Expand Up @@ -224,11 +221,10 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) {
}
log.Infof("init working directory request recieved")
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: "s3://" + filepath.Join(engine.S3FileManager.S3BucketName, key),
Path: filepath.Join(tool.WorkingDir, entryName),
URL: "s3://" + filepath.Join(engine.S3FileManager.S3BucketName, key),
Path: filepath.Join(tool.WorkingDir, entryName),
InitWorkDir: true,
})
engine.IsInitWorkDir = "true"
}
}
}
Expand Down
22 changes: 13 additions & 9 deletions sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (fm *S3FileManager) fetchTaskS3InputList() ([]*TaskS3Input, error) {
}

func isLocalPath(path string, url string) bool {
return url == "" && strings.Contains(filepath.Dir(path), localDataPath)
return url == "" && filepath.Dir(path) == localDataPath
}

func getS3KeyAndBucket(fileUrl string, path string, fm *S3FileManager) (key string, bucket string, err error) {
Expand All @@ -112,7 +112,7 @@ func getS3KeyAndBucket(fileUrl string, path string, fm *S3FileManager) (key stri
return key, parsed.Host, nil

} else {
log.Infof("trying to download obj with key: %s", fm.s3Key(path))
log.Infof("trying to download obj with key: %v", fm.s3Key(path))

return strings.TrimPrefix(fm.s3Key(path), "/"), fm.S3BucketName, nil
}
Expand Down Expand Up @@ -164,6 +164,7 @@ func (fm *S3FileManager) downloadInputFiles(taskS3Input []*TaskS3Input) (err err
}

// create/open file for writing
log.Infof("local path: %v", localPath)
f, err := os.Create(localPath)
if err != nil {
log.Errorf("failed to open file: %s", err)
Expand All @@ -173,13 +174,16 @@ func (fm *S3FileManager) downloadInputFiles(taskS3Input []*TaskS3Input) (err err
s3Key, s3Bucket, err := getS3KeyAndBucket(taskInput.URL, taskInput.Path, fm)

if err != nil {
_, err = downloader.Download(f, &s3.GetObjectInput{
Bucket: aws.String(s3Bucket),
Key: aws.String(s3Key),
})
if err != nil {
log.Errorf("failed to download file with url %s and path %s with error %s: ", taskInput.URL, taskInput.Path, err)
}
log.Errorf("failed to get s3 key and bucket from %v; error - %v", taskInput.URL, err)
}

log.Infof("downloading; bucket - %v; key - %v", s3Bucket, s3Key)
_, err = downloader.Download(f, &s3.GetObjectInput{
Bucket: aws.String(s3Bucket),
Key: aws.String(s3Key),
})
if err != nil {
log.Errorf("failed to download file with url %s and path %s with error %s: ", taskInput.URL, taskInput.Path, err)
}
}

Expand Down

0 comments on commit 48fa3ee

Please sign in to comment.