From 8c928eedb95222c8722719910e693f982201d117 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 26 Nov 2013 16:16:11 -0800 Subject: [PATCH 1/3] Add local or shuffle option --- .../com/twitter/summingbird/storm/Constants.scala | 1 + .../twitter/summingbird/storm/StormPlatform.scala | 12 +++++++++--- .../summingbird/storm/option/FlatMapOptions.scala | 4 +++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala index 7dbca4740..d0d17a186 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala @@ -44,4 +44,5 @@ object Constants { val DEFAULT_MONOID_IS_COMMUTATIVE = MonoidIsCommutative.default val DEFAULT_MAX_WAITING_FUTURES = MaxWaitingFutures(10) val DEFAULT_MAX_FUTURE_WAIT_TIME = MaxFutureWaitTime(Duration.fromSeconds(60)) + val DEFAULT_FM_LOCAL_OR_SHUFFLE = LocalOrShuffle(false) } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index fbd9b81d8..b82da50d8 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -187,6 +187,9 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val maxWaitTime = getOrElse(stormDag, node, DEFAULT_MAX_FUTURE_WAIT_TIME) logger.info("[{}] maxWaiting: {}", nodeName, maxWaiting.get) + val useLocalOrShuffle = getOrElse(stormDag, node, DEFAULT_FM_LOCAL_OR_SHUFFLE) + logger.info("[{}] useLocalOrShuffle: {}", nodeName, useLocalOrShuffle.get) + val summerOpt:Option[SummerNode[Storm]] = stormDag.dependantsOf(node).collect{case s: SummerNode[Storm] => s}.headOption val bolt = summerOpt match { @@ -214,9 +217,12 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: StormNode => stormDag.getNodeName(x) } - // TODO: https://github.com/twitter/summingbird/issues/366 - // test localOrShuffleGrouping here. may give big wins for serialization heavy jobs. - dependenciesNames.foreach { declarer.shuffleGrouping(_) } + if (useLocalOrShuffle.get) { + dependenciesNames.foreach { declarer.localOrShuffleGrouping(_) } + } else { + dependenciesNames.foreach { declarer.shuffleGrouping(_) } + } + } private def scheduleSpout[K](stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = { diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala index 038298b6f..e0cf75483 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala @@ -56,4 +56,6 @@ object SpoutStormMetrics { class SpoutStormMetrics(val metrics: () => TraversableOnce[StormMetric[IMetric]]) { def toSpoutMetrics: () => TraversableOnce[Metric[IMetric]] = {() => metrics().map{ x: StormMetric[IMetric] => Metric(x.name, x.metric, x.interval.inSeconds)}} -} \ No newline at end of file +} + +case class LocalOrShuffle(get: Boolean) \ No newline at end of file From bf939ee469b54a5ba92074fe1a11e878240e57f7 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 26 Nov 2013 17:38:21 -0800 Subject: [PATCH 2/3] Rename away from storm specific naming --- .../scala/com/twitter/summingbird/storm/Constants.scala | 2 +- .../scala/com/twitter/summingbird/storm/StormPlatform.scala | 6 +++--- .../twitter/summingbird/storm/option/FlatMapOptions.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala index d0d17a186..fb8ad274e 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala @@ -44,5 +44,5 @@ object Constants { val DEFAULT_MONOID_IS_COMMUTATIVE = MonoidIsCommutative.default val DEFAULT_MAX_WAITING_FUTURES = MaxWaitingFutures(10) val DEFAULT_MAX_FUTURE_WAIT_TIME = MaxFutureWaitTime(Duration.fromSeconds(60)) - val DEFAULT_FM_LOCAL_OR_SHUFFLE = LocalOrShuffle(false) + val DEFAULT_FM_PREFER_LOCAL_DEPENDANT = PreferLocalDependant(false) } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index b82da50d8..b5d6df696 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -187,8 +187,8 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val maxWaitTime = getOrElse(stormDag, node, DEFAULT_MAX_FUTURE_WAIT_TIME) logger.info("[{}] maxWaiting: {}", nodeName, maxWaiting.get) - val useLocalOrShuffle = getOrElse(stormDag, node, DEFAULT_FM_LOCAL_OR_SHUFFLE) - logger.info("[{}] useLocalOrShuffle: {}", nodeName, useLocalOrShuffle.get) + val usePreferLocalDependant = getOrElse(stormDag, node, DEFAULT_FM_PREFER_LOCAL_DEPENDANT) + logger.info("[{}] usePreferLocalDependant: {}", nodeName, usePreferLocalDependant.get) val summerOpt:Option[SummerNode[Storm]] = stormDag.dependantsOf(node).collect{case s: SummerNode[Storm] => s}.headOption @@ -217,7 +217,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: StormNode => stormDag.getNodeName(x) } - if (useLocalOrShuffle.get) { + if (usePreferLocalDependant.get) { dependenciesNames.foreach { declarer.localOrShuffleGrouping(_) } } else { dependenciesNames.foreach { declarer.shuffleGrouping(_) } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala index e0cf75483..151f9e289 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala @@ -58,4 +58,4 @@ class SpoutStormMetrics(val metrics: () => TraversableOnce[StormMetric[IMetric]] {() => metrics().map{ x: StormMetric[IMetric] => Metric(x.name, x.metric, x.interval.inSeconds)}} } -case class LocalOrShuffle(get: Boolean) \ No newline at end of file +case class PreferLocalDependant(get: Boolean) \ No newline at end of file From 4fa45f945d6076cbe1a3aa2f28ceef02ad3743d7 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 26 Nov 2013 17:42:15 -0800 Subject: [PATCH 3/3] Dependency instead of dependant --- .../scala/com/twitter/summingbird/storm/Constants.scala | 2 +- .../scala/com/twitter/summingbird/storm/StormPlatform.scala | 6 +++--- .../twitter/summingbird/storm/option/FlatMapOptions.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala index fb8ad274e..8ba63ef6a 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/Constants.scala @@ -44,5 +44,5 @@ object Constants { val DEFAULT_MONOID_IS_COMMUTATIVE = MonoidIsCommutative.default val DEFAULT_MAX_WAITING_FUTURES = MaxWaitingFutures(10) val DEFAULT_MAX_FUTURE_WAIT_TIME = MaxFutureWaitTime(Duration.fromSeconds(60)) - val DEFAULT_FM_PREFER_LOCAL_DEPENDANT = PreferLocalDependant(false) + val DEFAULT_FM_PREFER_LOCAL_DEPENDENCY = PreferLocalDependency(false) } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index b5d6df696..4bc956f9f 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -187,8 +187,8 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val maxWaitTime = getOrElse(stormDag, node, DEFAULT_MAX_FUTURE_WAIT_TIME) logger.info("[{}] maxWaiting: {}", nodeName, maxWaiting.get) - val usePreferLocalDependant = getOrElse(stormDag, node, DEFAULT_FM_PREFER_LOCAL_DEPENDANT) - logger.info("[{}] usePreferLocalDependant: {}", nodeName, usePreferLocalDependant.get) + val usePreferLocalDependency = getOrElse(stormDag, node, DEFAULT_FM_PREFER_LOCAL_DEPENDENCY) + logger.info("[{}] usePreferLocalDependency: {}", nodeName, usePreferLocalDependency.get) val summerOpt:Option[SummerNode[Storm]] = stormDag.dependantsOf(node).collect{case s: SummerNode[Storm] => s}.headOption @@ -217,7 +217,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird val dependenciesNames = stormDag.dependenciesOf(node).collect { case x: StormNode => stormDag.getNodeName(x) } - if (usePreferLocalDependant.get) { + if (usePreferLocalDependency.get) { dependenciesNames.foreach { declarer.localOrShuffleGrouping(_) } } else { dependenciesNames.foreach { declarer.shuffleGrouping(_) } diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala index 151f9e289..52c382509 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/option/FlatMapOptions.scala @@ -58,4 +58,4 @@ class SpoutStormMetrics(val metrics: () => TraversableOnce[StormMetric[IMetric]] {() => metrics().map{ x: StormMetric[IMetric] => Metric(x.name, x.metric, x.interval.inSeconds)}} } -case class PreferLocalDependant(get: Boolean) \ No newline at end of file +case class PreferLocalDependency(get: Boolean) \ No newline at end of file