Skip to content

Commit

Permalink
Support unused accumulators (#306)
Browse files Browse the repository at this point in the history
Otherwise code might fail with:

INFO:luigi-interface:Caused by: java.util.NoSuchElementException: key not found: foo
scala.collection.MapLike$class.default(MapLike.scala:228)
scala.collection.AbstractMap.default(Map.scala:59)
scala.collection.MapLike$class.apply(MapLike.scala:141)
scala.collection.AbstractMap.apply(Map.scala:59)
com.spotify.scio.ScioResult.getAggregatorValues(ScioResult.scala:98)
com.spotify.scio.ScioResult.accumulatorTotalValue(ScioResult.scala:56)
com.spotify.scio.ScioResult$$anonfun$2.apply(ScioResult.scala:82)
com.spotify.scio.ScioResult$$anonfun$2.apply(ScioResult.scala:82)
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:1120)
scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:1120)
scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:1109)
scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:1109)
scala.collection.immutable.StreamIterator.hasNext(Stream.scala:1114)
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:155)z
  • Loading branch information
ravwojdyla authored and nevillelyh committed Oct 31, 2016
1 parent 93b32b4 commit e310f62
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
3 changes: 2 additions & 1 deletion scio-core/src/main/scala/com/spotify/scio/ScioResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class ScioResult private[scio] (val internal: PipelineResult,
}

private def getAggregatorValues[T](acc: Accumulator[T]): Iterable[AggregatorValues[T]] =
aggregators(acc.name).map(a => internal.getAggregatorValues(a.asInstanceOf[Aggregator[_, T]]))
aggregators.getOrElse(acc.name, Nil)
.map(a => internal.getAggregatorValues(a.asInstanceOf[Aggregator[_, T]]))

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,12 @@ class AccumulatorTest extends PipelineSpec {
metrics.accumulators.steps should contain theSameElementsAs expectedSteps
}

it should "support support unused accumulators" in {
val sc = ScioContext.forTest()
val maxI = sc.maxAccumulator[Int]("maxI")
val r = sc.close()
r.accumulatorTotalValue(maxI) should be(Integer.MIN_VALUE)
r.accumulatorValuesAtSteps(maxI) shouldBe empty
}

}

0 comments on commit e310f62

Please sign in to comment.