Skip to content

Commit

Permalink
Fix #146: Handle all errors
Browse files Browse the repository at this point in the history
  • Loading branch information
samuell committed Oct 14, 2021
1 parent f7dcccc commit 5069057
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 38 deletions.
6 changes: 4 additions & 2 deletions misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ func cleanFiles(fileNames ...string) {
for _, fileName := range fileNames {
auditFileName := fileName + ".audit.json"
if _, err := os.Stat(fileName); err == nil {
os.Remove(fileName)
errRem := os.Remove(fileName)
Check(errRem)
Debug.Println("Successfully removed file", fileName)
}
if _, err := os.Stat(auditFileName); err == nil {
os.Remove(auditFileName)
errRem := os.Remove(auditFileName)
Check(errRem)
Debug.Println("Successfully removed audit.json file", auditFileName)
}
}
Expand Down
5 changes: 4 additions & 1 deletion process.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ func (p *Process) Run() {
}
// Remove any FIFO file
if oip.doStream && oip.FifoFileExists() {
os.Remove(oip.FifoPath())
err := os.Remove(oip.FifoPath())
if err != nil {
p.Failf("Could not remove Fifo path %s", oip.FifoPath())
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ func TestGetBufsize(t *testing.T) {
initTestLogs()

wantBufSize := 1234
os.Setenv("SCIPIPE_BUFSIZE", "1234")
err := os.Setenv("SCIPIPE_BUFSIZE", "1234")
Check(err)
haveBufSize := getBufsize()

if haveBufSize != wantBufSize {
Expand Down
45 changes: 27 additions & 18 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewTask(workflow *Workflow, process *Process, name string, cmdPat string, i
for oname, outPathFunc := range outPathFuncs {
oip, err := NewFileIP(outPathFunc(t))
if err != nil {
process.Fail(err.Error())
process.Failf("Could not create out IP %s: %v", oname, err)
}
if ptInfo, ok := portInfos[oname]; ok {
if ptInfo.doStream {
Expand Down Expand Up @@ -266,7 +266,7 @@ func (t *Task) Execute() {
t.workflow.IncConcurrentTasks(t.cores) // Will block if max concurrent tasks is reached
err := t.createDirs() // Create output directories needed for any outputs
if err != nil {
t.Fail(err)
t.Failf("Could not create directories: %v", err)
}
startTime := time.Now()
if t.CustomExecute != nil {
Expand Down Expand Up @@ -325,7 +325,10 @@ func (t *Task) anyOutputsExist() (anyFileExists bool) {

// createDirs creates directories for out-IPs of the task
func (t *Task) createDirs() error {
os.MkdirAll(t.TempDir(), 0777)
err := os.MkdirAll(t.TempDir(), 0777)
if err != nil {
t.Failf("Could not create tempdir %s: %v", t.TempDir(), err)
}

for _, oip := range t.OutIPs {
oipDir := oip.TempDir() // This will create all out dirs, including the temp dir
Expand All @@ -336,7 +339,7 @@ func (t *Task) createDirs() error {
}
err := os.MkdirAll(oipDir, 0777)
if err != nil {
return errors.New(fmt.Sprintf("Could not create directory: %s: %s", oipDir, err))
return errors.New(fmt.Sprintf("Could not create directory %s: %v", oipDir, err))
}
}

Expand Down Expand Up @@ -426,32 +429,38 @@ func FinalizePaths(tempExecDir string, ips ...*FileIP) error {
// Move paths for ports, to final destinations
if !oip.doStream {
tempPath := tempExecDir + "/" + oip.TempPath()
newPath := oip.Path()
Debug.Println("Moving OutIP path: ", tempPath, " -> ", newPath)
renameErr := os.Rename(tempPath, newPath)
finPath := oip.Path()
Debug.Println("Moving OutIP path: ", tempPath, " -> ", finPath)
renameErr := os.Rename(tempPath, finPath)
if renameErr != nil {
return errors.New(fmt.Sprintf("Could not rename out-IP file %s to %s: %s", tempPath, newPath, renameErr))
return errors.New(fmt.Sprintf("Could not rename out-IP file %s to %s: %s", tempPath, finPath, renameErr))
}
}
}
// For remaining paths in temporary execution dir, just move out of it
filepath.Walk(tempExecDir, func(tempPath string, fileInfo os.FileInfo, err error) error {
err := filepath.Walk(tempExecDir, func(tempPath string, fileInfo os.FileInfo, err error) error {
if !fileInfo.IsDir() {
newPath := strings.Replace(tempPath, tempExecDir+"/", "", 1)
newPath = strings.Replace(newPath, FSRootPlaceHolder+"/", "/", 1)
newPath = replacePlaceholdersWithParentDirs(newPath)
newPathDir := filepath.Dir(newPath)
if _, err := os.Stat(newPathDir); os.IsNotExist(err) {
os.MkdirAll(newPathDir, 0777)
finPath := strings.Replace(tempPath, tempExecDir+"/", "", 1)
finPath = strings.Replace(finPath, FSRootPlaceHolder+"/", "/", 1)
finPath = replacePlaceholdersWithParentDirs(finPath)
finPathDir := filepath.Dir(finPath)
if _, err := os.Stat(finPathDir); os.IsNotExist(err) {
errMkdir := os.MkdirAll(finPathDir, 0777)
if errMkdir != nil {
Error.Printf("Failed to create directory for final path: %s\n", finPathDir)
}
}
Debug.Println("Moving remaining file path: ", tempPath, " -> ", newPath)
renameErr := os.Rename(tempPath, newPath)
Debug.Println("Moving remaining file path: ", tempPath, " -> ", finPath)
renameErr := os.Rename(tempPath, finPath)
if renameErr != nil {
return errors.New(fmt.Sprintf("Could not rename remaining file %s to %s: %s", tempPath, newPath, renameErr))
return errors.New(fmt.Sprintf("Could not rename remaining file %s to %s: %s", tempPath, finPath, renameErr))
}
}
return err
})
if err != nil {
Error.Printf("Failed walking temporary execution directory %s: %v\n", tempExecDir, err)
}
// Remove temporary execution dir (but not for absolute paths, or current dir)
if tempExecDir != "" && tempExecDir != "." && tempExecDir[0] != '/' {
remErr := os.RemoveAll(tempExecDir)
Expand Down
16 changes: 10 additions & 6 deletions task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,15 @@ func TestExtraFilesFinalizePaths(t *testing.T) {
tsk := NewTask(nil, nil, "test_task", "echo foo", map[string]*FileIP{}, nil, nil, map[string]string{}, nil, "", nil, 4)
// Create extra file
tmpDir := tsk.TempDir()
os.MkdirAll(tmpDir, 0777)
err := os.MkdirAll(tmpDir, 0777)
Check(err)
fName := filepath.Join(tmpDir, "letterfile_a.txt")
_, err := os.Create(fName)
if err != nil {
_, errCreate := os.Create(fName)
if errCreate != nil {
t.Fatalf("File could not be created: %s\n", fName)
}
tsk.finalizePaths()
errFin := tsk.finalizePaths()
Check(errFin)
filePath := filepath.Join(".", "letterfile_a.txt")
if _, err := os.Stat(filePath); os.IsNotExist(err) {
t.Error("File did not exist: " + filePath)
Expand All @@ -178,13 +180,15 @@ func TestExtraFilesFinalizePathsAbsolute(t *testing.T) {
}

absDir := filepath.Join(tmpDir, FSRootPlaceHolder, tmpDir)
os.MkdirAll(absDir, 0777)
errMkdir := os.MkdirAll(absDir, 0777)
Check(errMkdir)
fName := filepath.Join(absDir, "letterfile_a.txt")
_, err = os.Create(fName)
if err != nil {
t.Fatalf("File could not be created: %s\n", fName)
}
FinalizePaths(tmpDir)
errFin := FinalizePaths(tmpDir)
Check(errFin)
filePath := filepath.Join(tmpDir, "letterfile_a.txt")
if _, err := os.Stat(filePath); os.IsNotExist(err) {
t.Error("File did not exist: " + filePath)
Expand Down
5 changes: 4 additions & 1 deletion workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ func (wf *Workflow) PlotGraph(filePath string) {
createDirs(filePath)
dotFile, err := os.Create(filePath)
CheckWithMsg(err, "Could not create dot file "+filePath)
dotFile.WriteString(dot)
_, errDot := dotFile.WriteString(dot)
if errDot != nil {
wf.Failf("Could not write to DOT-file %s: %s", dotFile.Name(), errDot)
}
}

// PlotGraphPDF writes the workflow structure to a dot file, and also runs the
Expand Down
21 changes: 12 additions & 9 deletions workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,12 @@ func TestStreaming(t *testing.T) {
func TestSubStreamJoinInPlaceHolder(t *testing.T) {
initTestLogs()

exec.Command("bash", "-c", "echo 1 > .tmp/file1.txt").CombinedOutput()
exec.Command("bash", "-c", "echo 2 > .tmp/file2.txt").CombinedOutput()
exec.Command("bash", "-c", "echo 3 > .tmp/file3.txt").CombinedOutput()
_, err1 := exec.Command("bash", "-c", "echo 1 > .tmp/file1.txt").CombinedOutput()
Check(err1)
_, err2 := exec.Command("bash", "-c", "echo 2 > .tmp/file2.txt").CombinedOutput()
Check(err2)
_, err3 := exec.Command("bash", "-c", "echo 3 > .tmp/file3.txt").CombinedOutput()
Check(err3)

wf := NewWorkflow("TestSubStreamJoinInPlaceHolderWf", 16)

Expand All @@ -384,14 +387,14 @@ func TestSubStreamJoinInPlaceHolder(t *testing.T) {

wf.Run()

_, err1 := os.Stat(".tmp/file1.txt")
assertNil(t, err1, "File missing!")
_, err1b := os.Stat(".tmp/file1.txt")
assertNil(t, err1b, "File missing!")

_, err2 := os.Stat(".tmp/file2.txt")
assertNil(t, err2, "File missing!")
_, err2b := os.Stat(".tmp/file2.txt")
assertNil(t, err2b, "File missing!")

_, err3 := os.Stat(".tmp/file3.txt")
assertNil(t, err3, "File missing!")
_, err3b := os.Stat(".tmp/file3.txt")
assertNil(t, err3b, "File missing!")

_, err4 := os.Stat(".tmp/substream_merged.txt")
assertNil(t, err4, "File missing!")
Expand Down

0 comments on commit 5069057

Please sign in to comment.