Skip to content

Commit

Permalink
Refactor getRequiredFiles to not add files in executor (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzhang10 committed Apr 21, 2022
1 parent 7ea2674 commit 3690b7d
Showing 1 changed file with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ abstract class StdUdfWrapper(_expressions: Seq[Expression]) extends Expression
_stdUdf = _sparkTypeInference.getStdUdf
_nullableArguments = _stdUdf.getAndCheckNullableArguments
_stdUdf.init(_stdFactory)
// Only call this on driver
if (TaskContext.get == null) {
getRequiredFiles()
}
getRequiredFiles()
_requiredFilesProcessed = false
_initialized = true
}
Expand Down Expand Up @@ -97,18 +94,21 @@ abstract class StdUdfWrapper(_expressions: Seq[Expression]) extends Expression
throw new UnsupportedOperationException("getRequiredFiles not yet supported for StdUDF" + _expressions.length)
}

lazy val sparkContext = SparkSession.builder().getOrCreate().sparkContext
_distributedCacheFiles = requiredFiles.map(file => {
try {
val resolvedFile = FileSystemUtils.resolveLatest(file)
// TODO: Currently does not support adding of files with same file name. E.g dirA/file.txt dirB/file.txt
sparkContext.addFile(resolvedFile)
resolvedFile
FileSystemUtils.resolveLatest(file)
} catch {
case e: IOException =>
throw new RuntimeException("Failed to resolve path: [" + file + "].", e)
}
})

// Only add files on the driver
if (TaskContext.get == null) {
val sparkContext = SparkSession.builder().getOrCreate().sparkContext
// TODO: Currently does not support adding of files with same file name. E.g dirA/file.txt dirB/file.txt
_distributedCacheFiles.foreach(sparkContext.addFile)
}
}
}
} // scalastyle:on magic.number
Expand Down

0 comments on commit 3690b7d

Please sign in to comment.