-
Notifications
You must be signed in to change notification settings - Fork 706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add withDescription
for naming MR steps
#1283
Conversation
This is pretty great. Thanks for doing this. I'll review it carefully this week. |
will change. any other issues? @johnynek ? |
@@ -31,6 +36,25 @@ trait ExecutionContext { | |||
def flowDef: FlowDef | |||
def mode: Mode | |||
|
|||
def getIdentifierOpt(descriptions: Seq[String]): Option[String] = { | |||
if (descriptions.nonEmpty) { | |||
Some(descriptions.distinct.mkString(", ")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we sort these to make sure it is stable? Not really sure we should sort on second thought. It would be good if the order is roughly the order that the descriptions came in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make this a private method on object ExecutionContext
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed to make it private. I wanted the sorting to be in order and did the distinct just to be safe.
Have you tried this on your cluster at all? It's hard to be sure we are hitting all the cases (hashJoin, joins, groupBys, forceToDisk are all different code paths). Could we beef up the tests at the very least to try each of those cases? |
Yes tried it on a bunch of our jobs which used all those cases (except for On Thu, May 14, 2015 at 4:33 PM, P. Oscar Boykin notifications@github.com
|
Making a macro to get the current line and file would be useful with this:
|
} | ||
|
||
def setPipeDescriptions(p: Pipe, descriptions: Seq[String]): Pipe = { | ||
p.getStepConfigDef().setProperty( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will mostly work, but... our OrderedSerialization issues seem to have popped up that these won't always go together :/ -- we tried to use these to configure serializers for each step. Do an op that would do some transform then added the config to that pipe to let it work. But then the two would be moved apart by cascading. Not a blocker to this, but it unfortunately would mean the naming ends up being more best effort than would be ideal I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point, and I wonder if we remove "withDescription" from typedPipe and only allow it after you ".group" if that would guarantee better that it is attached to the step where it occurs ... but then it is also less powerful and it is nice to annotate descriptions everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Funnily enough after a group by is exactly where we've been seeing it fail :/ I'm not sure its worth altering the plan here. This is a big step up in info/usability. We might just want to include a little note about YMMV i guess in a readme
Looks like this has legit failures against latest develop now, doesn't compile |
@@ -1036,6 +1040,19 @@ class WithOnComplete[T](typedPipe: TypedPipe[T], fn: () => Unit) extends TypedPi | |||
forceToDiskExecution.flatMap(_.toIterableExecution) | |||
} | |||
|
|||
class WithDescriptionTypedPipe[T](typedPipe: TypedPipe[T], description: String) extends TypedPipe[T] { | |||
override def toPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be asPipe now.
@aposwolsky we are aiming to freeze for a rls on wednesday. It would be awesome to get this in for that if we can |
@aposwolsky we are looking to freeze for our next rls at 5pm pst today, you going to have a chance to look at this before then? |
I was OOO the past few days, but I rebased and pushed the change from toPipe to asPipe |
val conf = step.getConfig | ||
getIdentifierOpt(getDesc(step)).foreach(id => { | ||
val newJobName = "%s %s".format(conf.getJobName, id) | ||
println("Added descriptions to job name: %s".format(newJobName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get a log.info or debug here instead of a println?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, this looks like it is changing how all jobs are named. That is somewhat orthogonal to descriptions, right?
We have a lot of tooling that assumes the job names are not changed. Could we instead just add a key to the jobConf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, we actually like it to show it in hadoop jobtracker, what about we make it configurable so we will keep it setting name and we can default to something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, the "job name" always had "(X/Y)" appending to it, all this is doing is appending extra stuff after that, not changing the prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have some analytics that run on our jobs and they match on the job name. For instance, we have a reducer estimator that runs and looks at the names to get the history of the particular step. Others might have other systems running.
I think changing how the jobs are names could be disruptive for people.
Can we find a way to opt-in to this behavior and you can put those options as default in your clusters? Something like a configuration key like "scalding.description.addtoname" -> "true"
or something? When this is not present, we just add an entry to the jobConf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already pushed a change that would put the descriptions in a different config and not update mapred.name. Since we want it appended to the name I will add a steplistener on our side that will just add the value to that config to the job name on our side... win win for everyone :)
Made suggested changes |
how do we re-run the tests, it seems to have failed on spurious errors: |
looking |
flowSteps.foreach(step => { | ||
val baseFlowStep: BaseFlowStep[JobConf] = step.asInstanceOf[BaseFlowStep[JobConf]] | ||
val descriptions = getDesc(baseFlowStep) | ||
updateFlowStepName(baseFlowStep, descriptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like you are still updating the job name. Like I said, I don't want to change that without more thought and buy in. Our existing job monitoring will be impacted and probably other systems will as well. I know we should have a test for anything we care about, and we can add an issue to do this here (if indeed we want some contract on the name and not another way to track job identity).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mapred.name is already set by the time it gets here, so the job.xml file for hadoop will not have any mention of the descriptions. However, the "stepName" needs to be updated for the dot file generation to have the descriptions
@johnynek : ok, I changed it so it only changed the flow stepName in the .dot file generation, otherwise it just sets a different config. This works for our setup since (1) .dot files have descriptions and (2) I added a flowstepstrategy that adds the descriptions to the job name so it looks good on our side. Does this solution work for you as well? I would like to see this pull request go through because I don't want to have to start maintaining a patch. |
ping? |
1 similar comment
ping? |
Looking now. Sorry. |
if (descriptions.nonEmpty) Some(descriptions.distinct.mkString(", ")) else None | ||
} | ||
|
||
private def updateStepConfigWithDescriptions(step: BaseFlowStep[JobConf], descriptions: Seq[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
descriptions is unused, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! thanks will fix
One minor issue, then in looks good to merge (remove an unused parameter). Thanks for pushing this through to the end. |
no problem! made changes |
Thanks! 👍 Will merge when it's green. |
:( can we re-run the CI build? |
restarted |
success! |
Add `withDescription` for naming MR steps
Thanks for this nice new feature! |
anytime :) |
@aposwolsky, thanks for adding this feature. It is really great! |
Adding ability to have more meaningful names in MR steps instead of just "(X/Y")