Skip to content

Commit

Permalink
Scala Hadoop Shred: now only writes atomic-events if JSONs shred succ…
Browse files Browse the repository at this point in the history
…essfully (fixes #2245)
  • Loading branch information
alexanderdean committed Dec 29, 2015
1 parent 5b08f2a commit 57e5fe2
Showing 1 changed file with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ class ShredJob(args : Args) extends Job(args) {

// Aliases for our job
val input = MultipleTextLineFiles(shredConfig.inFolder).read
val goodOutput = PartitionedTsv(shredConfig.outFolder, ShredJob.ShreddedPartition, false, ('json), SinkMode.REPLACE)
val goodJsonsOutput = PartitionedTsv(shredConfig.outFolder, ShredJob.ShreddedPartition, false, ('json), SinkMode.REPLACE)
val badOutput = Tsv(shredConfig.badFolder) // Technically JSONs but use Tsv for custom JSON creation
implicit val resolver = shredConfig.igluResolver
val alteredOutput = MultipleTextLineFiles(ShredJob.getAlteredEnrichedOutputPath(shredConfig.outFolder))
val goodEventsOutput = MultipleTextLineFiles(ShredJob.getAlteredEnrichedOutputPath(shredConfig.outFolder))

// Do we add a failure trap?
val trappableInput = shredConfig.exceptionsFolder match {
Expand All @@ -201,18 +201,23 @@ class ShredJob(args : Args) extends Job(args) {

// Handle good rows
val good = common
.flatMapTo('output -> 'good) { o: ValidatedNel[JsonSchemaPairs] =>
.flatMap('output -> 'good) { o: ValidatedNel[JsonSchemaPairs] =>
ShredJob.projectGoods(o)
}

// Write atomic-events
val events = good
.mapTo('line -> 'altered) { s: String =>
ShredJob.alterEnrichedEvent(s)
}
.write(goodEventsOutput)

// Write JSONs
val jsons = good
.flatMapTo('good -> ('schema, 'json)) { pairs: List[JsonSchemaPair] =>
pairs.map { pair =>
(pair._1.toSchemaUri, pair._2.toString)
}
}
.write(goodOutput)

val alteredEnriched = input.mapTo('line -> 'altered) { s: String =>
ShredJob.alterEnrichedEvent(s)
}
.write(alteredOutput)
.write(goodJsonsOutput)
}

0 comments on commit 57e5fe2

Please sign in to comment.