Skip to content

Commit

Permalink
Merge pull request #553 from adamilardi/verifytypes
Browse files Browse the repository at this point in the history
Verify coerced types
  • Loading branch information
johnynek committed Aug 11, 2013
2 parents 6b20587 + 84e16b9 commit 9f9224c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Expand Up @@ -315,6 +315,14 @@ class RichPipe(val pipe : Pipe) extends java.io.Serializable with JoinAlgorithms
conv.assertArityMatches(f)
new Each(pipe, f, new FilterFunction(fn, conv))
}

/**
* Text files can have corrupted data. If you use this function and a
* cascading trap you can filter out corrupted data from your pipe.
*/
def verifyTypes[A](f: Fields)(implicit conv: TupleConverter[A]): Pipe = {
pipe.filter(f) { (a: A) => true }
}

/**
* Given a function, partitions the pipe into several groups based on the
Expand Down
28 changes: 28 additions & 0 deletions scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala
Expand Up @@ -1602,3 +1602,31 @@ class SampleWithReplacementTest extends Specification {
}
}

class VerifyTypesJob(args: Args) extends Job(args) {
Tsv("input", new Fields("age", "weight"))
.addTrap(Tsv("trap"))
.verifyTypes[(Int, Int)]('age -> 'weight)
.verifyTypes[Int]('weight)
.write(Tsv("output"))
}

class VerifyTypesJobTest extends Specification {
"Verify types operation" should {
"put bad records in a trap" in {
val input = List((3, "aaa"),(23,154),(15,"123"),(53,143),(7,85),(19,195),
(42,187),(35,165),(68,121),(13,"34"),(17,173),(2,13),(2,"break"))

JobTest(new com.twitter.scalding.VerifyTypesJob(_))
.source(Tsv("input", new Fields("age", "weight")), input)
.sink[(Int, Int)](Tsv("output")) { outBuf =>
outBuf.toList.size must_== input.size - 2
}
.sink[(Any, Any)](Tsv("trap")) { outBuf =>
outBuf.toList.size must_== 2
}
.run
.finish

}
}
}

0 comments on commit 9f9224c

Please sign in to comment.