Add some Execution support in scalding #674

Merged
merged 4 commits into from Jul 20, 2016

Projects

None yet

4 participants

@johnynek
Collaborator

This allows you to convert a Producer[Scalding, T] to a Execution[TypedPipe[T]] given a DateRange and optional options.

This does not convert summingbird to using Execution internally, but it maybe we can side-step that if we can safely use it from Execution (for the time being).

@oscar-stripe oscar-stripe Add some Execution support in scalding
a4ec7b2
@johnynek
Collaborator

/cc @benpence can you take a look. I know @isnotinvain said you were thinking of related things.

@johnynek johnynek commented on an outdated diff Jul 18, 2016
...a/com/twitter/summingbird/scalding/ScaldingLaws.scala
@@ -131,11 +162,13 @@ class ScaldingLaws extends WordSpec {
val summer = TestGraphs.singleStepJob[Scalding, (Long, Int), Int, Int](source, testStore)(t =>
fn(t._2))
- val scald = Scalding("scalaCheckJob")
- val ws = new LoopState(intr)
+ val ex = Scalding.toExecutionExact(
@johnynek
johnynek Jul 18, 2016 Collaborator

looks I screwed this up...

oscar-stripe added some commits Jul 19, 2016
@oscar-stripe oscar-stripe Add more debugging
4ee87b0
@oscar-stripe oscar-stripe Make it work with 0.16.1-RC3
9e84103
@johnynek
Collaborator

seems to pass tests for me now.

@rubanm rubanm commented on an outdated diff Jul 20, 2016
...m/twitter/summingbird/scalding/ScaldingPlatform.scala
@@ -564,6 +566,44 @@ object Scalding {
}
/**
+ * like toPipeExact, but using Execution
+ */
+ def toExecutionExact[T](
+ dr: DateRange,
+ prod: Producer[Scalding, T],
+ options: Map[String, Options] = Map.empty): Execution[TypedPipe[(Timestamp, T)]] =
+
+ Execution.getMode.flatMap { mode =>
+ val fd = new FlowDef
+ toPipeExact(dr, prod, options)(fd, mode) match {
+ case Right(pipe) =>
@rubanm
rubanm Jul 20, 2016 Contributor

Had to double check if this was unused, so Right(_)?

@rubanm
Contributor
rubanm commented Jul 20, 2016

👍

@oscar-stripe oscar-stripe don't capture an unused variable
6a49a4e
@johnynek johnynek merged commit 39e3850 into develop Jul 20, 2016

1 of 2 checks passed

continuous-integration/travis-ci/pr The Travis CI build is in progress
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details
@johnynek johnynek deleted the oscar/scalding-execution branch Jul 20, 2016
@benpence benpence commented on the diff Jul 20, 2016
...m/twitter/summingbird/scalding/ScaldingPlatform.scala
+
+ Execution.getMode.flatMap { mode =>
+ val fd = new FlowDef
+ toPipeExact(dr, prod, options)(fd, mode) match {
+ case Right(_) =>
+ /**
+ * We plan twice. The first time we plan to see what new
+ * writes we do, the second time we only read, we do not
+ * do any new writes.
+ */
+ Execution.fromFn { (_, _) => fd }
+ .flatMap { _ =>
+ // Now plan again and use summingbird's built in support
+ // to minimize work by reading existing on-disk data:
+ val reads = new FlowDef
+ toPipeExact(dr, prod, options)(reads, mode) match {
@benpence
benpence Jul 20, 2016

Forgive me, I'm very unfamiliar with the SB codebase. Don't we have the expectation that Config options provided to an Execution could affect the behavior of how the Scalding Job runs? Or is that supposed to be represented in options?

@johnynek
johnynek Jul 20, 2016 Collaborator

Config is passed to Execution. Options are summingbird specific and are passed to toPipeExact.

We don't have the expectation that scalding options can control summingbird planning.

@benpence
benpence Jul 20, 2016

OK sounds good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment