diff --git a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala index 377df6e2cda38..ce38176a66fdc 100644 --- a/core/src/main/scala/org/apache/spark/input/RawFileInput.scala +++ b/core/src/main/scala/org/apache/spark/input/RawFileInput.scala @@ -61,16 +61,22 @@ abstract class StreamFileInputFormat[T] */ class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer) extends Serializable { - private var path = "" + private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream] private var isOpen = false + /** + * Calculate the path name independently of opening the file + */ + private lazy val path = { + val pathp = split.getPath(index) + path = pathp.toString + } /** * create a new DataInputStream from the split and context */ def open(): FSDataInputStream = { val pathp = split.getPath(index) - path = pathp.toString val fs = pathp.getFileSystem(context.getConfiguration) fileIn = fs.open(pathp) isOpen=true @@ -126,9 +132,9 @@ abstract class StreamBasedRecordReader[T]( override def nextKeyValue = { if (!processed) { val fileIn = new PortableDataStream(split,context,index) - key = fileIn.getPath value = parseStream(fileIn) fileIn.close() // if it has not been open yet, close does nothing + key = fileIn.getPath processed = true true } else {