Skip to content
This repository has been archived by the owner on Jul 14, 2022. It is now read-only.

Feat - get metadata for commons files #66

Merged
merged 1 commit into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion mariner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
commonsPrefix = "COMMONS/"
userPrefix = "USER/"
conformancePrefix = "CONFORMANCE/"
workspacePrefix = "/" + engineWorkspaceVolumeName
gatewayPrefix = "/" + commonsDataVolumeName

notStarted = "not-started" // 3
running = "running" // 2
Expand Down Expand Up @@ -109,7 +111,7 @@ const (
pathToUserRunsf = "%v/workflowRuns/" // fill with userID
pathToUserRunLogf = pathToUserRunsf + "%v/" + logFile // fill with runID

commonsDataPersistentVolumeClaimName = "mariner-nfs-pvc"
commonsDataPersistentVolumeClaimName = "nfs-commons-data-test-pvc"
)

var (
Expand Down
10 changes: 4 additions & 6 deletions mariner/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type K8sEngine struct {
Manifest *Manifest // to pass the manifest to the gen3fuse container of each task pod
Log *MainLog //
KeepFiles map[string]bool // all the paths to not delete during basic file cleanup
IsInitWorkDir string
}

// Tool represents a leaf in the graph of a workflow
Expand All @@ -63,7 +62,6 @@ type Tool struct {
ExpressionResult map[string]interface{}
Task *Task
S3Input []*ToolS3Input
initWorkDirFiles []string

// dev'ing
// need to load this with runtime context as per CWL spec
Expand All @@ -84,9 +82,9 @@ type TaskRuntimeJSContext struct {

// ToolS3Input ..
type ToolS3Input struct {
URL string `json:"url"` // S3 URL
Path string `json:"path"` // Local path for dl
InitWorkDir bool `json:"init_work_dir"` // is this an initwkdir requirement?
URL string `json:"url"` // S3 URL
Path string `json:"path"` // Local path for dl
InitWorkDir bool `json:"init_work_dir"` // is this an initwkdir requirement?
}

// Engine runs an instance of the mariner engine job
Expand Down Expand Up @@ -283,7 +281,7 @@ func (task *Task) tool(runID string) *Tool {
tool := &Tool{
Task: task,
WorkingDir: task.workingDir(runID),
S3Input: []*ToolS3Input{},
S3Input: []*ToolS3Input{},
}
tool.JSVM = tool.newJSVM()
task.infof("end make tool object")
Expand Down
32 changes: 16 additions & 16 deletions mariner/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ func (tool *Tool) processFile(f interface{}) (file *File, err error) {
if err != nil {
return nil, err
}
if strings.HasPrefix(obj.Path, pathToCommonsData) {
if strings.HasPrefix(obj.Path, commonsPrefix) {

err = appendCommonsFileInfo(obj.Path, tool)
if err != nil {
return nil, err
}
} else if !strings.HasPrefix(obj.Path, pathToCommonsData) {

} else {
// user files
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: obj.Path,
Expand All @@ -151,17 +151,17 @@ func (tool *Tool) processFile(f interface{}) (file *File, err error) {
// note: I don't think any input to this process will have secondary files loaded
// into this field at this point in the process
for _, sf := range obj.SecondaryFiles {
if !strings.HasPrefix(sf.Path, pathToCommonsData) {
if strings.HasPrefix(sf.Path, commonsPrefix) {
err = appendCommonsFileInfo(obj.Path, tool)
if err != nil {
return nil, err
}

} else if !strings.HasPrefix(sf.Path, pathToCommonsData) {
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: sf.Path,
InitWorkDir: false,
})
} else {
// commons file
err = appendCommonsFileInfo(sf.Path, tool)
if err != nil {
return nil, err
}
}
}
return obj, nil
Expand Down Expand Up @@ -368,17 +368,17 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
}
for _, fileObj := range fileArray {
for _, sf := range fileObj.SecondaryFiles {
if !strings.HasPrefix(sf.Location, pathToCommonsData) {
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: sf.Location,
InitWorkDir: false,
})
} else {
if strings.HasPrefix(sf.Location, commonsPrefix) {
// commons file
err = appendCommonsFileInfo(sf.Path, tool)
err = appendCommonsFileInfo(sf.Location, tool)
if err != nil {
return nil, err
}
} else if !strings.HasPrefix(sf.Location, pathToCommonsData) {
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: sf.Location,
InitWorkDir: false,
})
}
}
}
Expand Down
9 changes: 0 additions & 9 deletions mariner/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,8 @@ func (tool *Tool) env() (env []k8sv1.EnvVar, err error) {

// for marinerTask job
func (engine *K8sEngine) s3SidecarEnv(tool *Tool) (env []k8sv1.EnvVar) {
initWorkDirFiles := strings.Join(tool.initWorkDirFiles, ",")
engine.infof("load s3 sidecar env for task: %v", tool.Task.Root.ID)
env = []k8sv1.EnvVar{
{
Name: "InitWorkDirFiles",
Value: initWorkDirFiles,
},
{
Name: "IsInitWorkDir",
Value: engine.IsInitWorkDir,
},
{
Name: "AWSCREDS",
ValueFrom: envVarAWSUserCreds,
Expand Down
58 changes: 27 additions & 31 deletions mariner/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,54 @@ import (
"bytes"
"encoding/json"
"fmt"
"path/filepath"
pathLib "path"
"path/filepath"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
log "github.com/sirupsen/logrus"
)

func isValidPath(path string) bool {
validPrefixes := [...]string{userPrefix, commonsPrefix, workspacePrefix, gatewayPrefix}
for _, prefix := range validPrefixes {
if strings.HasPrefix(path, prefix) {
return true
}
}
return false
}

// this file contains some methods/functions for setting up and working with Tools (i.e., commandlinetools and expressiontools)

func pathHelper(path string, tool *Tool) (err error) {
if !isValidPath(path) {
log.Errorf("unsupported initwkdir path: %v", path)
return tool.Task.errorf("unsupported initwkdir path: %v", path)
}
url := ""
if strings.HasPrefix(path, userPrefix) {
trimmedPath := strings.TrimPrefix(path, userPrefix)
path = strings.Join([]string{"/", engineWorkspaceVolumeName, "/", trimmedPath}, "")
tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: path,
InitWorkDir: true,
})
tool.initWorkDirFiles = append(tool.initWorkDirFiles, path)
} else if strings.HasPrefix(path, "/"+engineWorkspaceVolumeName) {
tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: path,
InitWorkDir: true,
})
tool.initWorkDirFiles = append(tool.initWorkDirFiles, path)
tool.Task.infof("*File - Path: %v", path)
} else if strings.HasPrefix(path, commonsPrefix) {
guid := pathLib.Base(path)
indexFile, err := getIndexedFileInfo(guid)
if err != nil {
return tool.Task.errorf("Unable to get indexed record: %v; error: %v", guid, err)
}
path = pathLib.Join(pathToCommonsData, indexFile.Filename)
tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: indexFile.URLs[0],
Path: path,
InitWorkDir: true,
})
tool.initWorkDirFiles = append(tool.initWorkDirFiles, path)
} else {
log.Errorf("unsupported initwkdir path: %v", path)
return tool.Task.errorf("unsupported initwkdir path: %v", path)
url = indexFile.URLs[0]
}

tool.Task.infof("adding initwkdir path: %v", path)
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: url,
Path: path,
InitWorkDir: true,
})
tool.Task.infof("*File - Path: %v", path)

return nil
}

Expand Down Expand Up @@ -138,9 +138,7 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) {
return tool.Task.errorf("unsupported initwkdir type: %T; value: %v", output, output)
}
}
engine.IsInitWorkDir = "true"
tool.Task.infof("s3input paths: %v", tool.S3Input)
tool.Task.infof("initWorkDirFiles: %v", tool.initWorkDirFiles)
continue
}

Expand Down Expand Up @@ -196,7 +194,6 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) {
tool.Task.infof("raw key: %v", key)
tool.Task.infof("tool workdir: %v", tool.WorkingDir)


var b []byte
switch contents.(type) {
case string:
Expand Down Expand Up @@ -224,11 +221,10 @@ func (engine *K8sEngine) initWorkDirReq(tool *Tool) (err error) {
}
log.Infof("init working directory request recieved")
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: "s3://" + filepath.Join(engine.S3FileManager.S3BucketName, key),
Path: filepath.Join(tool.WorkingDir, entryName),
URL: "s3://" + filepath.Join(engine.S3FileManager.S3BucketName, key),
Path: filepath.Join(tool.WorkingDir, entryName),
InitWorkDir: true,
})
engine.IsInitWorkDir = "true"
}
}
}
Expand Down
22 changes: 13 additions & 9 deletions sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (fm *S3FileManager) fetchTaskS3InputList() ([]*TaskS3Input, error) {
}

func isLocalPath(path string, url string) bool {
return url == "" && strings.Contains(filepath.Dir(path), localDataPath)
return url == "" && filepath.Dir(path) == localDataPath
}

func getS3KeyAndBucket(fileUrl string, path string, fm *S3FileManager) (key string, bucket string, err error) {
Expand All @@ -112,7 +112,7 @@ func getS3KeyAndBucket(fileUrl string, path string, fm *S3FileManager) (key stri
return key, parsed.Host, nil

} else {
log.Infof("trying to download obj with key: %s", fm.s3Key(path))
log.Infof("trying to download obj with key: %v", fm.s3Key(path))

return strings.TrimPrefix(fm.s3Key(path), "/"), fm.S3BucketName, nil
}
Expand Down Expand Up @@ -164,6 +164,7 @@ func (fm *S3FileManager) downloadInputFiles(taskS3Input []*TaskS3Input) (err err
}

// create/open file for writing
log.Infof("local path: %v", localPath)
f, err := os.Create(localPath)
if err != nil {
log.Errorf("failed to open file: %s", err)
Expand All @@ -173,13 +174,16 @@ func (fm *S3FileManager) downloadInputFiles(taskS3Input []*TaskS3Input) (err err
s3Key, s3Bucket, err := getS3KeyAndBucket(taskInput.URL, taskInput.Path, fm)

if err != nil {
_, err = downloader.Download(f, &s3.GetObjectInput{
Bucket: aws.String(s3Bucket),
Key: aws.String(s3Key),
})
if err != nil {
log.Errorf("failed to download file with url %s and path %s with error %s: ", taskInput.URL, taskInput.Path, err)
}
log.Errorf("failed to get s3 key and bucket from %v; error - %v", taskInput.URL, err)
}

log.Infof("downloading; bucket - %v; key - %v", s3Bucket, s3Key)
_, err = downloader.Download(f, &s3.GetObjectInput{
Bucket: aws.String(s3Bucket),
Key: aws.String(s3Key),
})
if err != nil {
log.Errorf("failed to download file with url %s and path %s with error %s: ", taskInput.URL, taskInput.Path, err)
}
}

Expand Down