Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hRaven Reducer Estimator #996

Merged
merged 25 commits into from
Aug 11, 2014
Merged

hRaven Reducer Estimator #996

merged 25 commits into from
Aug 11, 2014

Conversation

bholt
Copy link
Contributor

@bholt bholt commented Aug 4, 2014

Adds new "scalding-hraven" module with an estimator that looks up job history in hRaven to determine the actual amount of bytes reaching reducers. The HRavenHistory trait can be added to a ReducerEstimator to add functionality for looking up job history, and the RatioBasedReducerEstimator does this, scaling the estimate provided by InputSizeReducerEstimator based on the past ratio between mapper/reducer input sizes.

This PR also includes a couple other minor changes:

  • Using a case class as argument to estimateReducers to hold the common reducer estimator info (flow,step,predecessorSteps)
  • Add optional fallbackEstimator which lets estimators be chained

So far I don't have a story for unit testing hRaven. Any ideas are welcome.

@bholt bholt changed the title hRaven Reducer Estimator and other minor estimator fixes hRaven Reducer Estimator Aug 4, 2014
@@ -256,6 +257,18 @@ object ScaldingBuild extends Build {
"org.scala-tools.testing" %% "specs" % "1.6.9" % "test"
)
).dependsOn(scaldingCore)

lazy val scaldingHRaven = module("hraven").settings(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably need to update the .travis.yml now since all the tests run in separate test jobs (due to them getting too large for Travis to deal). Chat with @ianoc

protected def totalSize(taps: Iterator[Tap[_, _, _]], conf: JobConf): Option[Long] =
taps.foldLeft(Option(0L)) {
// recursive case
case (Some(total), multi: MultiSourceTap[Tap[_, _, _], _, _]) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. I guess we forgot this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this worried me — first analytics batch job I ran exposed this. Are there any other cases you can think of that don't implement Hfs that may come up?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None that I know of.

On Mon, Aug 4, 2014 at 10:49 AM, Brandon Holt notifications@github.com
wrote:

In
scalding-core/src/main/scala/com/twitter/scalding/reducer_estimation/InputSizeReducerEstimator.scala:

 val fs = f.getPath.getFileSystem(conf)
 fs.globStatus(f.getPath)
   .map{ s => fs.getContentSummary(s.getPath).getLength }
   .sum

}

  • protected def totalSize(taps: Iterator[Tap[_, _, _]], conf: JobConf): Option[Long] =
  • taps.foldLeft(Option(0L)) {
  •  // recursive case
    
  •  case (Some(total), multi: MultiSourceTap[Tap[_, _, _], _, _]) =>
    

Yeah, this worried me — first analytics batch job I ran exposed this. Are
there any other cases you can think of that don't implement Hfs that may
come up?


Reply to this email directly or view it on GitHub
https://github.com/twitter/scalding/pull/996/files#r15780033.

Oscar Boykin :: @posco :: http://twitter.com/posco

import HRavenHistory.jobConfToRichConfig

private final val apiHostnameKey = "hraven.api.hostname"
private final val hRavenClientConnectTimeout = 30000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these could be defaults that also could be configured in the conf, right?

val numReducers = estimateReducers(flow, predecessorSteps, flowStep).getOrElse(0)
val estimators = Option(conf.get(Config.ReducerEstimators))
.map(_.split(",")).flatten
.map(Class.forName(_).newInstance.asInstanceOf[ReducerEstimator])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use the forName with getCurrentContext? That seems to be correct thing (or at least what cascading and hadoop do in several places) to get the classloader for the current thread.

* @param mapperBytes Input to mappers (in bytes)
* @param reducerBytes Input to reducers (in bytes)
*/
case class FlowStepHistory(mapperBytes: Long, reducerBytes: Long)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also have a RichDate for when that job ran?

One idea would be to use a sealed trait here so we can add more to it later without breaking the code. We can still have some apply methods that create these so FlowStepHistory(10L, 1L) would still work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I didn't know what sealed traits did. Seems like a plan.

So far, I haven't added RichDate. Is it a big deal for someone to add it whenever they need it?

@johnynek
Copy link
Collaborator

johnynek commented Aug 8, 2014

looks good except for one minor point.

@bholt
Copy link
Contributor Author

bholt commented Aug 8, 2014

I'm currently testing this in Science, and it looks like I'll have an additional change or two, so don't merge yet.

* Prepend an estimator so it will be tried first. If it returns None,
* the previously-set estimators will be tried in order.
*/
def addReducerEstimator[T](clsName: String): Config =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just noticed this method does not really depend on T. Scalac should probably warn or error on unused type parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

@johnynek
Copy link
Collaborator

If this is ready, it looks good to me. Brandon if you are ready to pull the trigger, go for it.

I think we may be getting close to scalding 0.12.0 here.

bholt added a commit that referenced this pull request Aug 11, 2014
hRaven Reducer Estimator and a bunch of other reducer_estimation refactoring
@bholt bholt merged commit 6b64e3e into develop Aug 11, 2014
@bholt bholt deleted the bholt/hraven-reducer-estimator branch August 11, 2014 18:29
@bholt
Copy link
Contributor Author

bholt commented Aug 11, 2014

Thanks for all the feedback and suggestions, @johnynek.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants