Skip to content

Commit

Permalink
Revert FileSource support for empty directories
Browse files Browse the repository at this point in the history
  • Loading branch information
isnotinvain committed Nov 17, 2016
1 parent c77b4de commit 462746e
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 108 deletions.
125 changes: 58 additions & 67 deletions scalding-core/src/main/scala/com/twitter/scalding/FileSource.scala
Expand Up @@ -26,7 +26,7 @@ import cascading.tap.hadoop.Hfs
import cascading.tap.local.FileTap
import cascading.tuple.Fields
import com.etsy.cascading.tap.local.LocalTap
import com.twitter.algebird.{ MapAlgebra, Semigroup }
import com.twitter.algebird.{ MapAlgebra, OrVal }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileStatus, Path, PathFilter }
import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader }
Expand Down Expand Up @@ -168,57 +168,46 @@ object FileSource {
}

/**
* Returns true if, for every file matched by globPath, there is a _SUCCESS file present
* in its parent directory.
* @return whether globPath contains a _SUCCESS file
*/
def globHasSuccessFile(globPath: String, conf: Configuration): Boolean = {
def globHasSuccessFile(globPath: String, conf: Configuration): Boolean = allGlobFilesWithSuccess(globPath, conf, hiddenFilter = false)

val allFiles = glob(globPath, conf, AcceptAllPathFilter)

val dirs = allFiles
.iterator
.filter { fileStatus =>
// ignore hidden *directories*
val isHiddenDir = fileStatus.isDirectory && !HiddenFileFilter.accept(fileStatus.getPath)
!isHiddenDir
}.map { fileStatus: FileStatus =>
/**
* Determines whether each file in the glob has a _SUCCESS sibling file in the same directory
* @param globPath path to check
* @param conf Hadoop Configuration to create FileSystem
* @param hiddenFilter true, if only non-hidden files are checked
* @return true if the directory has files after filters are applied
*/
def allGlobFilesWithSuccess(globPath: String, conf: Configuration, hiddenFilter: Boolean): Boolean = {
// Produce tuples (dirName, hasSuccess, hasNonHidden) keyed by dir
//
val usedDirs = glob(globPath, conf, AcceptAllPathFilter)
.map { fileStatus: FileStatus =>
// stringify Path for Semigroup
val dir =
if (fileStatus.isDirectory)
fileStatus.getPath.toString
else
fileStatus.getPath.getParent.toString

val fileIsSuccessFile = SuccessFileFilter.accept(fileStatus.getPath) && fileStatus.isFile

// create a table of dir, containsSuccessFile
// to be summed later
dir -> fileIsSuccessFile
// HiddenFileFilter should better be called non-hidden but it borrows its name from the
// private field name in hadoop FileInputFormat
//
dir -> (dir,
OrVal(SuccessFileFilter.accept(fileStatus.getPath) && fileStatus.isFile),
OrVal(HiddenFileFilter.accept(fileStatus.getPath)))
}

// sumByKey using OR
// important not to use Algebird's OrVal which is a monoid, and treats 'false'
// as zero. Combined with MapAlgebra.sumByKey, that results in any keys mapped to
// false being dropped from the output (MapAlgebra tries to be sparse, but we don't
// want that here). Using a Semigroup (no zero) instead of a Monoid fixes that.
val dirStatuses = MapAlgebra.sumByKey(dirs)(Semigroup.from((x, y) => x || y))
// OR by key
val uniqueUsedDirs = MapAlgebra.sumByKey(usedDirs)
.filter { case (_, (_, _, hasNonHidden)) => (!hiddenFilter || hasNonHidden.get) }

val invalid = dirStatuses.isEmpty || dirStatuses.exists { case (dir, containsSuccessFile) => !containsSuccessFile }

ifVerboseLog(conf) {
val dirStatusesStr = dirStatuses.mkString("\n")
val allFilesStr = allFiles.mkString("\n")
s"""
|globHasSuccessFile:
|globPath: $globPath
|directory has success file?:
|$dirStatusesStr
|all files matching globPath:
|$allFilesStr
""".stripMargin
// there is at least one valid path, and all paths have success
//
uniqueUsedDirs.nonEmpty && uniqueUsedDirs.forall {
case (_, (_, hasSuccess, _)) => hasSuccess.get
}

!invalid
}
}

Expand Down Expand Up @@ -331,16 +320,19 @@ abstract class FileSource extends SchemedSource with LocalSourceOverride with Hf
}

protected def createHdfsReadTap(hdfsMode: Hdfs): Tap[JobConf, _, _] = {
val taps =
val taps: List[Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]]] =
goodHdfsPaths(hdfsMode)
.iterator
.map { path => CastHfsTap(createHfsTap(hdfsScheme, path, sinkMode)) }
.toList

taps match {
case Nil => new IterableSource[Any](Nil, hdfsScheme.getSourceFields).createTap(Read)(hdfsMode).asInstanceOf[Tap[JobConf, _, _]]
case one :: Nil => one
case many => new ScaldingMultiSourceTap(many)
.toList.map { path => CastHfsTap(createHfsTap(hdfsScheme, path, sinkMode)) }
taps.size match {
case 0 => {
// This case is going to result in an error, but we don't want to throw until
// validateTaps. Return an InvalidSource here so the Job constructor does not fail.
// In the worst case if the flow plan is misconfigured,
//openForRead on mappers should fail when using this tap.
new InvalidSourceTap(hdfsPaths)
}
case 1 => taps.head
case _ => new ScaldingMultiSourceTap(taps)
}
}
}
Expand Down Expand Up @@ -410,31 +402,30 @@ trait SequenceFileScheme extends SchemedSource {
}

/**
* Uses _SUCCESS files instead of the presence of non-hidden files to determine if a path is good.
* Ensures that a _SUCCESS file is present in every directory included by a glob,
* as well as the requirements of [[FileSource.pathIsGood]]. The set of directories to check for
* _SUCCESS
* is determined by examining the list of all paths returned by globPaths and adding parent
* directories of the non-hidden files encountered.
* pathIsGood should still be considered just a best-effort test. As an illustration the following
* layout with an in-flight job is accepted for the glob dir*/*:
* <pre>
* dir1/_temporary
* dir2/file1
* dir2/_SUCCESS
* </pre>
*
* Requires that:
* 1) Every matched, non-hidden directory contains a _SUCCESS file
* 2) Every matched, non-hidden file's parent directory contain a _SUCCESS file
* Similarly if dir1 is physically empty pathIsGood is still true for dir*&#47;* above
*
* pathIsGood should still be considered just a best-effort test. There are still cases where this is
* not a sufficient test for correctness. See https://github.com/twitter/scalding/issues/1602
* On the other hand it will reject an empty output directory of a finished job:
* <pre>
* dir1/_SUCCESS
* </pre>
*
* This does accept empty directories that contain a _SUCCESS file, which signals the directory is both
* valid, and there is not data for that directory (you'll get an empty pipe).
*/
trait SuccessFileSource extends FileSource {
override protected def pathIsGood(p: String, conf: Configuration) =
FileSource.globHasSuccessFile(p, conf)

// we need to do some filtering on goodHdfsPaths to remove
// empty dirs that we consider "good" but don't want to ask hadoop's FileInputFormat to read.
override protected def goodHdfsPaths(hdfsMode: Hdfs): Iterable[String] = {
super
.goodHdfsPaths(hdfsMode)
// some paths deemed "good" may actually be empty, and hadoop's FileInputFormat
// doesn't like that. So we filter them away here.
.filter { p => FileSource.globHasNonHiddenPaths(p, hdfsMode.conf) }
}
FileSource.allGlobFilesWithSuccess(p, conf, true)
}

/**
Expand Down
Expand Up @@ -204,8 +204,8 @@ class FileSourceTest extends WordSpec with Matchers {
pathIsGood("test_data/2013/05/") shouldBe false
}

"accept a single directory glob with only _SUCCESS and ignored files" in {
pathIsGood("test_data/2013/05/*") shouldBe true
"reject a single directory glob with only _SUCCESS and ignored files" in {
pathIsGood("test_data/2013/05/*") shouldBe false
}

"accept a directory with data and _SUCCESS in it when specified as a glob" in {
Expand All @@ -216,8 +216,8 @@ class FileSourceTest extends WordSpec with Matchers {
pathIsGood("test_data/2013/04/") shouldBe false
}

"accept a directory with only _SUCCESS when specified as a glob" in {
pathIsGood("test_data/2013/06/*") shouldBe true
"reject a directory with only _SUCCESS when specified as a glob" in {
pathIsGood("test_data/2013/06/*") shouldBe false
}

"reject a directory with only _SUCCESS when specified without a glob" in {
Expand All @@ -232,8 +232,9 @@ class FileSourceTest extends WordSpec with Matchers {
pathIsGood("test_data/2013/{04,08}/*") shouldBe true
}

"accept a multi-dir glob if all matched non-hidden directories have _SUCCESS files, even when some are empty" in {
pathIsGood("test_data/2013/{04,05,06}/*") shouldBe true
"accept a multi-dir glob if all dirs with non-hidden files have _SUCCESS while dirs with " +
"hidden ones don't" in {
pathIsGood("test_data/2013/{04,05}/*") shouldBe true
}

// NOTE: this is an undesirable limitation of SuccessFileSource, and is encoded here
Expand Down
Expand Up @@ -15,19 +15,19 @@ limitations under the License.
*/
package com.twitter.scalding.platform

import java.util.{Iterator => JIterator}
import java.util.{ Iterator => JIterator }

import cascading.flow.FlowException
import cascading.pipe.joiner.{InnerJoin, JoinerClosure}
import cascading.pipe.joiner.{ InnerJoin, JoinerClosure }
import cascading.scheme.Scheme
import cascading.scheme.hadoop.{TextLine => CHTextLine}
import cascading.scheme.hadoop.{ TextLine => CHTextLine }
import cascading.tap.Tap
import cascading.tuple.{Fields, Tuple}
import cascading.tuple.{ Fields, Tuple }
import com.twitter.scalding._
import com.twitter.scalding.serialization.OrderedSerialization
import com.twitter.scalding.source.{FixedTypedText, NullSink, TypedText}
import org.scalacheck.{Arbitrary, Gen}
import org.scalatest.{Matchers, WordSpec}
import com.twitter.scalding.source.{ FixedTypedText, NullSink, TypedText }
import org.scalacheck.{ Arbitrary, Gen }
import org.scalatest.{ Matchers, WordSpec }

import scala.collection.JavaConverters._
import scala.language.experimental.macros
Expand Down Expand Up @@ -416,12 +416,6 @@ class TestTypedEmptySource extends FileSource with TextSourceScheme with Mappabl
TupleConverter.asSuperConverter[(Long, String), U](implicitly[TupleConverter[(Long, String)]])
}

class TestFieldsEmptySource(val fields: Fields = new Fields("customNamedOffset", "customNamedLine")) extends FileSource with SuccessFileSource {
override def hdfsPaths: Iterable[String] = Iterable.empty
override def localPaths: Iterable[String] = Iterable.empty
override def hdfsScheme = HadoopSchemeInstance(new CHTextLine(fields, CHTextLine.DEFAULT_CHARSET).asInstanceOf[Scheme[_, _, _, _, _]])
}

// Tests the scenario where you have no data present in the directory pointed to by a source typically
// due to the directory being empty (but for a _SUCCESS file)
// We test out that this shouldn't result in a Cascading planner error during {@link Job.buildFlow}
Expand All @@ -431,19 +425,6 @@ class EmptyDataJob(args: Args) extends Job(args) {
.write(TypedTsv[String]("output"))
}

class FieldsEmptyDataJob(args: Args) extends Job(args) {
val x = new TestFieldsEmptySource(new Fields("offset1", "line1")).read
val y = new TestFieldsEmptySource(new Fields("offset2", "line2")).read

// Empty sources can return an empty MemoryTap
// Here we are testing that this MemoryTap has the right Fields setup in it.
// joinWithSmaller triggers the issue we are testing, specially that 'line1 and 'line2
// are available in the MemoryTaps according to the planner. Previous we had used Fields.All
// and we get the error that field 'line1 and field 'line2 cannot be found in UNKNOWN fields
x.joinWithSmaller('line1 -> 'line2, y)
.write(Tsv("output"))
}

// Keeping all of the specifications in the same tests puts the result output all together at the end.
// This is useful given that the Hadoop MiniMRCluster and MiniDFSCluster spew a ton of logging.
class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest {
Expand Down Expand Up @@ -708,17 +689,14 @@ class PlatformTest extends WordSpec with Matchers with HadoopSharedPlatformTest
}
}

// If we support empty sources again in the future, update this test
"An EmptyData source" should {
"read from empty source and write to output without errors" in {
HadoopPlatformJobTest(new EmptyDataJob(_), cluster)
.run()
}
}

"A FieldsEmptyData source" should {
"read from empty source and write to output without errors" in {
HadoopPlatformJobTest(new FieldsEmptyDataJob(_), cluster)
.run()
val e = intercept[FlowException] {
HadoopPlatformJobTest(new EmptyDataJob(_), cluster)
.run()
}
assert(e.getCause.getClass === classOf[InvalidSourceException])
}
}

Expand Down

0 comments on commit 462746e

Please sign in to comment.