Skip to content
Permalink
Browse files

add flush / wait for files referenced using file(...)

this is to improve reliability when accessing files that might suffer
latnency in synchronizing between nodes
  • Loading branch information...
ssadedin committed Jun 26, 2019
1 parent b10685f commit 8eea42bbdb665ec918e1a97f90bdfdeb97293a5b
@@ -1690,6 +1690,10 @@ public class Pipeline implements ResourceRequestor {
File f = new File(fileName)
if(!f.parentFile)
f = new File(new File("."),fileName).canonicalFile

if(!f.exists()) {
new PipelineFile(f.path, StorageLayer.defaultStorage).flushMetadataAndCheckIfMissing(1000)
}
return f
}

@@ -129,16 +129,22 @@ class PipelineFile implements Serializable {
}
}

boolean flushMetadataAndCheckIfMissing() {
boolean flushMetadataAndCheckIfMissing(long timeout=0) {
log.info "File does not appear to exist: listing directory to flush file system: " + this
Path p = this.toPath()
Path parent = p.toAbsolutePath().parent
try { Files.list(parent) } catch(Exception e) { log.warning("Failed to list files of parent directory of " + this + " ($parent)"); }
if(!this.exists()) {
if(this.exists()) {
log.info("File " + this + " revealed by listing directory to flush metadata")
return true
}
else {
while(timeout>0) {
Thread.sleep(200)
if(this.flushMetadataAndCheckIfMissing(0))
return true
timeout -= 200
}
return false
}
}
@@ -0,0 +1 @@
rm -f foo.txt
@@ -0,0 +1,7 @@
source ../testsupport.sh

run

grep -q 'Pipeline Succeeded' test.out || err "Pipeline failed when accessing delayed file"


@@ -0,0 +1,13 @@
hello = {
new Thread({
Thread.sleep(420)
println "Create file foo.txt"
new File('foo.txt').text = 'hello'
}).start()

assert file('foo.txt').text == 'hello'
}

run {
hello
}

0 comments on commit 8eea42b

Please sign in to comment.
You can’t perform that action at this time.