Skip to content
This repository has been archived by the owner on Jul 14, 2022. It is now read-only.

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
UchicagoZchen138 committed Oct 27, 2021
1 parent 8a0c011 commit e4917cc
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 107 deletions.
2 changes: 1 addition & 1 deletion conformancelib/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *InputsCollector) inspectInputs(inputs map[string]interface{}) error {

// determines whether a map i represents a CWL file object
// lifted from the mariner package
// NOTE: need to make changes in mariner code
// TODO: need to make changes in mariner code
func isClass(i interface{}, class string) (f bool) {
if i == nil {
return false
Expand Down
2 changes: 1 addition & 1 deletion mariner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ const (
pathToUserRunsf = "%v/workflowRuns/" // fill with userID
pathToUserRunLogf = pathToUserRunsf + "%v/" + logFile // fill with runID

commonsDataPersistentVolumeClaimName = "nfs-commons-data-test-pvc"
commonsDataPersistentVolumeClaimName = "mariner-nfs-pvc"
)

var (
Expand Down
70 changes: 33 additions & 37 deletions mariner/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,55 +109,59 @@ func (engine *K8sEngine) loadInput(tool *Tool, input *cwl.Input) (err error) {
return nil
}

func appendCommonsFileInfo(filePath string, tool *Tool) (err error) {
guid := pathLib.Base(filePath)
indexFile, err := getIndexedFileInfo(guid)
if err != nil {
return tool.Task.errorf("Unable to get indexed record: %v", err)
}
tool.Task.infof("Found indexed metadata: %v", indexFile)
// NOTE: we need a smarter way to know which url to get when there are multiple!
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: indexFile.URLs[0],
Path: pathLib.Join(pathToCommonsData, indexFile.Filename),
InitWorkDir: false,
})

return nil
}

// wrapper around processFile() - collects path of input file and all secondary files
func (tool *Tool) processFile(f interface{}) (*File, error) {
func (tool *Tool) processFile(f interface{}) (file *File, err error) {
obj, err := processFile(tool, f)
if err != nil {
return nil, err
}
if !strings.HasPrefix(obj.Path, pathToCommonsData) {
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: obj.Path,
InitWorkDir: false,
})
} else {
// commons file
guid := pathLib.Base(obj.Path)
indexFile, err := getIndexedFileInfo(guid)
if strings.HasPrefix(obj.Path, pathToCommonsData) {

err = appendCommonsFileInfo(obj.Path, tool)
if err != nil {
return nil, tool.Task.errorf("Unable to get indexed record: %v", err)
return nil, err
}
tool.Task.infof("Found indexed metadata: %v", indexFile)
// NOTE: we need a smarter way to know which url to get when there are multiple!

} else {
// user files
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: indexFile.URLs[0],
Path: pathLib.Join(pathToCommonsData, indexFile.Filename),
Path: obj.Path,
InitWorkDir: false,
})

}

// note: I don't think any input to this process will have secondary files loaded
// into this field at this point in the process
for _, sf := range obj.SecondaryFiles {
if !strings.HasPrefix(sf.Path, pathToCommonsData) {
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: sf.Path,
Path: sf.Path,
InitWorkDir: false,
})
} else {
// commons file
guid := pathLib.Base(sf.Path)
indexFile, err := getIndexedFileInfo(guid)
err = appendCommonsFileInfo(sf.Path, tool)
if err != nil {
return nil, tool.Task.errorf("Unable to get indexed record: %v", err)
return nil, err
}
tool.Task.infof("Found indexed metadata: %v", indexFile)
// NOTE: we need a smarter way to know which url to get when there are multiple!
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: indexFile.URLs[0],
Path: pathLib.Join(pathToCommonsData, indexFile.Filename),
InitWorkDir: false,
})
}
}
return obj, nil
Expand Down Expand Up @@ -366,23 +370,15 @@ func (engine *K8sEngine) transformInput(tool *Tool, input *cwl.Input) (out inter
for _, sf := range fileObj.SecondaryFiles {
if !strings.HasPrefix(sf.Location, pathToCommonsData) {
tool.S3Input = append(tool.S3Input, &ToolS3Input{
Path: sf.Location,
Path: sf.Location,
InitWorkDir: false,
})
} else {
// commons file
guid := pathLib.Base(sf.Path)
indexFile, err := getIndexedFileInfo(guid)
err = appendCommonsFileInfo(sf.Path, tool)
if err != nil {
return nil, tool.Task.errorf("Unable to get indexed record: %v", err)
return nil, err
}
tool.Task.infof("Found indexed metadata: %v", indexFile)
// NOTE: we need a smarter way to know which url to get when there are multiple!
tool.S3Input = append(tool.S3Input, &ToolS3Input{
URL: indexFile.URLs[0],
Path: pathLib.Join(pathToCommonsData, indexFile.Filename),
InitWorkDir: false,
})
}
}
}
Expand Down
42 changes: 21 additions & 21 deletions mariner/utility.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,32 @@ func getRandString(n int) string {

// Indexd File struct
type IndexFileInfo struct {
Filename string `json:"file_name"`
Filesize uint64 `json:"size"`
DID string `json"did"`
URLs []string `json:"urls"`
Filename string `json:"file_name"`
Filesize uint64 `json:"size"`
DID string `json"did"`
URLs []string `json:"urls"`
}

// Gets basic indexd info
// NOTE: We certainly need to add a check for the user ACL and access to this file!
// TODO: add a check for the user ACL and access to this file!
func getIndexedFileInfo(guid string) (finfo *IndexFileInfo, err error) {
indexdPath := "http://indexd-service/index/"
indexdUrl := indexdPath + guid
res, err := http.Get(indexdUrl)
if err != nil {
return nil, err
}
indexdPath := "http://indexd-service/index/"
indexdUrl := indexdPath + guid
res, err := http.Get(indexdUrl)
if err != nil {
return nil, err
}

defer res.Body.Close()
defer res.Body.Close()

if res.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("Found status code %v for GUID %v", res.StatusCode, guid))
}
if res.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("Found status code %v for GUID %v", res.StatusCode, guid))
}

b, err := ioutil.ReadAll(res.Body)
err = json.Unmarshal(b, &finfo)
if err != nil {
return nil, err
}
return finfo, err
b, err := ioutil.ReadAll(res.Body)
err = json.Unmarshal(b, &finfo)
if err != nil {
return nil, err
}
return finfo, err
}
6 changes: 6 additions & 0 deletions sidecar/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package main

const (
commonsDataPath = "/commons-data"
localDataPath = "/engine-workspace"
)
96 changes: 49 additions & 47 deletions sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package main
import (
"encoding/json"
"fmt"
"os"
"net/url"
"os"
pathLib "path"
"path/filepath"
"strings"
Expand All @@ -19,9 +19,9 @@ import (

// TaskS3Input ..
type TaskS3Input struct {
URL string `json:"url"` // S3 URL
Path string `json:"path"` // Local path for dl
InitWorkDir bool `json:"init_work_dir"` // is this an initwkdir requirement?
URL string `json:"url"` // S3 URL
Path string `json:"path"` // Local path for dl
InitWorkDir bool `json:"init_work_dir"` // is this an initwkdir requirement?
}

func main() {
Expand All @@ -32,31 +32,31 @@ func main() {
// 1. read in the target s3 paths
taskS3Input, err := fm.fetchTaskS3InputList()
if err != nil {
log.Errorf("readMarinerS3Paths failed:", err)
log.Errorf("readMarinerS3Paths failed: %s", err)
}

// 2. download those files to the shared volume
err = fm.downloadInputFiles(taskS3Input)
if err != nil {
log.Errorf("downloadFiles failed:", err)
log.Errorf("downloadFiles failed: %s", err)
}

// 3. signal main container to run
err = fm.signalTaskToRun()
if err != nil {
log.Errorf("signalTaskToRun failed:", err)
log.Errorf("signalTaskToRun failed: %s", err)
}

// 4. wait for main container to finish
err = fm.waitForTaskToFinish()
if err != nil {
log.Errorf("waitForTaskToFinish failed:", err)
log.Errorf("waitForTaskToFinish failed: %s", err)
}

// 5. upload output files to s3
err = fm.uploadOutputFiles()
if err != nil {
log.Errorf("uploadOutputFiles failed:", err)
log.Errorf("uploadOutputFiles failed: %s", err)
}

}
Expand Down Expand Up @@ -94,6 +94,30 @@ func (fm *S3FileManager) fetchTaskS3InputList() ([]*TaskS3Input, error) {
return taskS3Input, nil
}

func isLocalPath(path string, url string) bool {
return url == "" && strings.Contains(filepath.Dir(path), localDataPath)
}

func getS3KeyAndBucket(fileUrl string, path string, fm *S3FileManager) (key string, bucket string, err error) {
if fileUrl != "" {
parsed, err := url.Parse(fileUrl)
if err != nil {
log.Errorf("failed parsing URI: %v; error: %v\n", fileUrl, err)
return "", "", fmt.Errorf("failed parsing URI: %v; error: %v\n", fileUrl, err)
}
key := strings.TrimPrefix(parsed.Path, "/")

log.Infof("trying to download obj with key: %v", key)

return key, parsed.Host, nil

} else {
log.Infof("trying to download obj with key: %s", fm.s3Key(path))

return strings.TrimPrefix(fm.s3Key(path), "/"), fm.S3BucketName, nil
}
}

// 2. download this task's input files from s3
func (fm *S3FileManager) downloadInputFiles(taskS3Input []*TaskS3Input) (err error) {

Expand Down Expand Up @@ -122,8 +146,7 @@ func (fm *S3FileManager) downloadInputFiles(taskS3Input []*TaskS3Input) (err err

var skipFile = false


if filepath.Dir(taskInput.Path) == "/commons-data" {
if strings.Contains(filepath.Dir(taskInput.Path), commonsDataPath) {
// test if it exists
log.Infof("commons file: %v", taskInput.Path)
_, err = os.Stat(taskInput.Path)
Expand All @@ -135,54 +158,33 @@ func (fm *S3FileManager) downloadInputFiles(taskS3Input []*TaskS3Input) (err err
log.Errorf("failed to make dirs: %v\n", err)
}
} else {
var lpath string
if taskInput.URL == "" && filepath.Dir(taskInput.Path) == "/engine-workspace" {
localPath := taskInput.Path
if isLocalPath(taskInput.Path, taskInput.URL) {
skipFile = true
lpath = filepath.Join(fm.TaskWorkingDir, pathLib.Base(taskInput.Path))
} else {
lpath = taskInput.Path
localPath = filepath.Join(fm.TaskWorkingDir, pathLib.Base(taskInput.Path))
}

// create necessary dirs
if err = os.MkdirAll(filepath.Dir(lpath), os.ModeDir); err != nil {
if err = os.MkdirAll(filepath.Dir(localPath), os.ModeDir); err != nil {
log.Errorf("failed to make dirs: %v\n", err)
}

// create/open file for writing
f, err := os.Create(lpath)
f, err := os.Create(localPath)
if err != nil {
log.Errorf("failed to open file:", err)
log.Errorf("failed to open file: %s", err)
}
defer f.Close()

if taskInput.URL != "" {
parsed, err := url.Parse(taskInput.URL)
if err != nil {
log.Errorf("failed parsing URI: %v; error: %v\n", taskInput.URL, err)
}
key := strings.TrimPrefix(parsed.Path, "/")

log.Infof("trying to download obj with key: %v", key)

// write s3 object content into file
_, err = downloader.Download(f, &s3.GetObjectInput{
Bucket: aws.String(parsed.Host),
Key: aws.String(key),
})
if err != nil {
log.Errorf("failed to download file:", taskInput.URL, err)
}
} else {
path := taskInput.Path
log.Infof("trying to download obj with key:", fm.s3Key(path))
s3Key, s3Bucket, err := getS3KeyAndBucket(taskInput.URL, taskInput.Path, fm)

// write s3 object content into file
if err != nil {
_, err = downloader.Download(f, &s3.GetObjectInput{
Bucket: aws.String(fm.S3BucketName),
Key: aws.String(strings.TrimPrefix(fm.s3Key(path), "/")),
Bucket: aws.String(s3Bucket),
Key: aws.String(s3Key),
})
if err != nil {
log.Errorf("failed to download file:", path, err)
log.Errorf("failed to download file with url %s and path %s with error %s: ", taskInput.URL, taskInput.Path, err)
}
}
}
Expand Down Expand Up @@ -263,7 +265,7 @@ func (fm *S3FileManager) waitForTaskToFinish() error {
// 'done' file doesn't exist
default:
// unexpected error
log.Errorf("unexpected error checking for doneFlag:", err)
log.Errorf("unexpected error checking for doneFlag: %s", err)
}
time.Sleep(5 * time.Second)
}
Expand Down Expand Up @@ -293,7 +295,7 @@ func (fm *S3FileManager) uploadOutputFiles() (err error) {
defer wg.Done()
f, err := os.Open(path)
if err != nil {
log.Errorf("failed to open file:", path, err)
log.Errorf("failed to open file %s with error %s:", path, err)
return
}

Expand All @@ -303,12 +305,12 @@ func (fm *S3FileManager) uploadOutputFiles() (err error) {
Body: f,
})
if err != nil {
log.Errorf("failed to upload file:", path, err)
log.Errorf("failed to upload file %s with error %s:", path, err)
return
}
fmt.Println("file uploaded to location:", result.Location)
if err = f.Close(); err != nil {
log.Errorf("failed to close file:", err)
log.Errorf("failed to close file: %s", err)
}
<-guard
}(p)
Expand Down

0 comments on commit e4917cc

Please sign in to comment.