Skip to content

Commit

Permalink
Update RawFileInput.scala
Browse files Browse the repository at this point in the history
trying to fix bug where name appears blank
  • Loading branch information
kmader committed Sep 16, 2014
1 parent 441f79a commit a01c9cf
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/input/RawFileInput.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,22 @@ abstract class StreamFileInputFormat[T]
*/
class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer)
extends Serializable {
private var path = ""

private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream]
private var isOpen = false
/**
* Calculate the path name independently of opening the file
*/
private lazy val path = {
val pathp = split.getPath(index)
path = pathp.toString
}

/**
* create a new DataInputStream from the split and context
*/
def open(): FSDataInputStream = {
val pathp = split.getPath(index)
path = pathp.toString
val fs = pathp.getFileSystem(context.getConfiguration)
fileIn = fs.open(pathp)
isOpen=true
Expand Down Expand Up @@ -126,9 +132,9 @@ abstract class StreamBasedRecordReader[T](
override def nextKeyValue = {
if (!processed) {
val fileIn = new PortableDataStream(split,context,index)
key = fileIn.getPath
value = parseStream(fileIn)
fileIn.close() // if it has not been open yet, close does nothing
key = fileIn.getPath
processed = true
true
} else {
Expand Down

0 comments on commit a01c9cf

Please sign in to comment.