Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Groups tasks that supersede each other together in the scheduler #570

Closed
wants to merge 2 commits into from

Conversation

daveFNbuck
Copy link
Contributor

Many tasks supersede older versions of the same job. For example, if I do an
hourly sqoop job to copy data from a production db to a hive table, completing
the 8am copy means I don't need to run the 7am copy. If both the 7am copy and
the 8am copy are pending and ready, I should run the 8am copy and consider them
both complete.

This commit achieves these goals by allowing jobs to define two optional
properties. The first is supersedes_bucket. Tasks with the same supersedes
bucket (if not None) are considered by the scheduler to supersede each other.
The scheduler will mark all earlier versions of the task DONE when one of them
completes. It us up to the user to ensure that earlier versions of the task
become complete when a later version is run.

Earlier tasks are identified by having a lower supersedes_priority. The
scheduler will prefer to schedule tasks with higher supersedes_priority.
Supersedes priority can be any type, but most be comparable between tasks in
the same bucket.

@Tarrasch
Copy link
Contributor

s/most/must in commit message :)

# so is everything else in the bucket with lower priority. bucket_priority
# should have a consistent type within a bucket.
supersedes_bucket = None
supersedes_priority = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure I like the idea of having these not as methods. For example

def supersedes_priority():
    return my_data_param.tostr()

would work. But how do you specify that if it's a variable instantiated at class-time? Do you need to set it in __init__ then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just add an @Property decorator to that function and it'll work. I think you're right that they should be functions though, as I mostly do it that way. I'll update that in the morning.

@Tarrasch
Copy link
Contributor

LGTM

@erikbern
Copy link
Contributor

Just quickly looked at it, but my gut feeling is to avoid adding more methods on the task class when possible. The reason is (it seems like) we get exponentially many more scheduling cases to think about.

Is it possible that this can be achieved through dynamic dependencies? Alternatively can we achieve this by using just priorities? (you would just increase the priority by epsilon for each dump)

@daveFNbuck
Copy link
Contributor Author

I don't think dynamic dependencies work very well here. It's possible that I could solve it with a combination of priority and resources to make sure that the most recent one runs first and no two run at the same time. The priority would be a bit tricky, how much of an epsilon should I add to the 2014-12-12T06 run vs the 2014-12-12T07 run? It's much easier if these can be compared as strings. Perhaps if we allowed priority to optionally be a pair of int and string, this would be a smaller code change.

I'd still lose something in that get_work would give me the 2014-12-12T07 and then give me the 2014-12-12T06 one after that's done. Now I have to check whether each job is complete in the run function, and I spend a bunch of time just grabbing complete jobs from the scheduler. This might not be too big a deal though.

@daveFNbuck
Copy link
Contributor Author

One more thing we'd miss out on just using priority: Suppose we have two Tasks a1 and a2 where a1 supersedes a2 and they both have priority 0. If we also have a task b with priority 10 that depends on a2, now a2 has priority 10 and will run before a1.

@daveFNbuck
Copy link
Contributor Author

On further thought, the above example demonstrates the need for this far more generally. Most of my supersedes tasks will have priority 0, and get priority from tasks that depend on them. So if I give them all priority n*epsilon, the scheduler will end up increasing all of their priorities to that of their dependents, say 10. I don't see how to get around this without adding a parameter or preserving the fractional part of the priority, which is just another way of adding a parameter.

Many tasks supersede older versions of the same job. For example, if I do an
hourly sqoop job to copy data from a production db to a hive table, completing
the 8am copy means I don't need to run the 7am copy. If both the 7am copy and
the 8am copy are pending and ready, I should run the 8am copy and consider them
both complete.

This commit achieves these goals by allowing jobs to define two optional
properties. The first is supersedes_bucket. Tasks with the same supersedes
bucket (if not None) are considered by the scheduler to supersede each other.
The scheduler will mark all earlier versions of the task DONE when one of them
completes. It us up to the user to ensure that earlier versions of the task
become complete when a later version is run.

Earlier tasks are identified by having a lower supersedes_priority. The
scheduler will prefer to schedule tasks with higher supersedes_priority.
Supersedes priority can be any type, but must be comparable between tasks in
the same bucket.
The original implementation of supersedes buckets involved calling the linear
function get_supersedes_bucket_tasks for each item with a supersedes bucket
during get_work. This could lead to quadratic runtime if enough items are in
supersedes buckets. I saw get_work running for 6 seconds after a bug lead to
~10k pending items piling up. With multiple calls coming in simultaneously, this
was enough to exceed the timeout and prevent any worker from running jobs.

This commit solves the complexity issue by pre-computing all the priorities in
a linear scan. Each supersedes bucket is sorted by supersedes_priority and
priorities updated so that nothing has smaller priority than a preceding task.
This brings the runtimes back down to milliseconds.

To verify the fix, I added 10,000 tasks in the same bucket. On my laptop, I
killed this after about half a minute pre-fix. Post-fix it takes about 90ms.
@erikbern
Copy link
Contributor

I'm still kind of confused about what implications this would lead to. My concern is we're adding functionality which could make it a lot harder later to change aspects about the scheduler. It seems like the benefit of this functionality is pretty limited and I'm not entirely convinced it outweighs the costs of having to maintain this through future refactorings.

The only case that I can think of when this functionality would be useful is when you have a dependency of the form of "use the latest data" – can you think of other cases?

Another thing I'm struggling to understand is if your sqoop job has downstream dependencies. Could that lead to an issue where the downstream dependencies end up consuming data from the "wrong" output? Could it lead to an issue where downstream dependencies aren't run because end up rescheduling the database dump all the time?

One thing that I've been doing cases like that is to write to disk what's the latest version of the data set. Then dependencies can read the timestamp off disk and use that to figure out what they should depend on. If you do that you can even include the full timestamp as the parameter to the database dump. I've implemented that by having a static method on the class that returns the "latest" version of a task (by looking up the timestamp from disk and setting it as the parameter). Would something like that work for your case?

@daveFNbuck
Copy link
Contributor Author

For these jobs, there is no wrong version of the data. Anything from the current time or newer is considered ok. For example, we need to make regular copies of our users database so that we can join user ids to log data for more context about user actions. Having users that signed up after the current actions in the db doesn't hurt the query.

We're not having issues with versioning the data, as we completely overwrite it every time and just store the version number in a mysql table. The annoying part is that the scheduler might give me the tasks in chronological order. So if I'm behind by 24 hours on one of these hourly tasks and each one takes 5 minutes, I can either just do the most recent one and be done in 5 minutes, or do it in scheduler order and take up to 2 hours to catch up. To get around this, I had to implement logic in the run for the task to determine if the next version of itself is ready and recursively run that one instead. This was fairly problematic in a lot of ways and duplicated a lot of work that had already been done during scheduling to perform a task that the scheduler was already basically doing.

Would it be ok if we just allowed priorities to be either a single number or a pair? That should be enough to get most of what I want, given the right priority update rules. Running an older one after the newer one is ok, because I can quickly check the db for completeness and skip all of the later runs.

@Tarrasch
Copy link
Contributor

I totally understand @erikbern s concerns here. I would much more like to see patches simplifying the internal scheduling logic rather than expanding the functionality. But I haven't digged into this (I started my vacation now :)), so never mind me. :)

@erikbern
Copy link
Contributor

Yeah let me think about it a bit more. I see your point but I want to think about it from a more general level. It would be great to look at 2-3 related problems at the same time rather than solving 1 problem (in which case I think there's a risk of overfitting). But I think the idea of "run the latest version" or "chop off everything in a queue and process it" is an interesting problem to solve.

@daveFNbuck can you expand on your idea about priority as a pair?

@daveFNbuck
Copy link
Contributor Author

Complete agreement here, I'd much rather see this be part of something more generally useful. I'm fine maintaining it in my fork until then. My idea with priority as a pair is that we could specify priority for a task either as a single value or a pair (primary_priority, secondary_priority). The scheduler would treat a single value like the pair (value, 0). The scheduler would then apply the priority graph update logic to the primary_priority so that everything has at least as high primary_priority as its dependents. The secondary_priority would stay the same though. This would allow for me to get much of the effect of the superseding in the scheduler via priority without high priority dependents completely wiping out the information. It wouldn't work if older jobs had higher-priority dependents, but would probably be close enough that I wouldn't care too much about the difference.

I think the above may be a bit overfitting though. Perhaps a more natural solution is to just have the scheduler remember the original priorities as well as the graph-updated priorities and break ties with those. That way if we have a job with priority 20 and another with priority 30, and a priority 50 job depends on them, we'll preferentially run the priority 30 job first. This would work like the above solution, but treating priority as both primary and secondary priority for all jobs. I've considered doing something like that independently of this feature in the past, perhaps even allowing priority to include information about the entire dependency chain so that downward dependencies of the priority 30 job would run preferentially to downward dependencies of the priority 20 job, but this full generalization would probably add way too much complexity.

@erikbern
Copy link
Contributor

Sorry for the long delay about this one. Let me think about it a bit more and get back to you

@erikbern erikbern closed this Apr 11, 2015
@daveFNbuck daveFNbuck deleted the supersedes_tasks branch July 13, 2015 20:22
daveFNbuck added a commit to Houzz/luigi that referenced this pull request Feb 9, 2016
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

In order to gain efficiency when many jobs are queued up, this change allows
workers to provide details of how jobs can be batched to the scheduler. If you
have several hourly jobs of the same type in the scheduler, it can combine them
into a single job for the worker. We allow parameters to be combined in three
ways: we can combine all the arguments in a csv, take the min and max to form
a range, or just provide the min or max. The csv gives the most specificity,
but range and min/max are available for when that's all you need. In particular,
the max function provides an implementation of spotify#570, allowing for jobs that
overwrite eachother to be grouped by just running the largest one.

In order to implement this, the scheduler will create a new task based on the
information sent by the worker. It's possible (as in the max/min case) that the
new task already exists, but if it doesn't it will be cleaned up at the end of
the run. While this new task is running, any other tasks will be marked as
BATCH_RUNNING. When the head task becomes DONE or FAILED, the BATCH_RUNNING
tasks will also be updated accordingly. They'll also have their tracking urls
updated to match the batch task.

This is a fairly big change to how the scheduler works, so there are a few
issues with it in the initial implementation:
  - newly created batch tasks don't show up in dependency graphs
  - the run summary doesn't know what happened to the batched tasks
  - we can't limit how big batches can be (how should we handle ranges?)
  - batching takes quadratic time for simplicity of implementation
  - I'm not sure what would happen if there was a yield in a batch run function

On the worker side, batching is accomplished by setting a batch_class,
batcher_args and batcher_aggregate_args. The batch class is the Python class
that runs the batched version of the job. This can be set equal to the current
class by overriding the class method get_batch_class.

The batcher_args are the arguments passed from the current class to the batch
class. These come in pairs. So if the original class has parameters machine and
filename that need to go to host and files in the batcher, you'll use
  [('machine', 'host'), ('filename', 'files')]
for batcher_args.

The final value is batcher_aggregate_args, which explains which arguments are to
be aggregated and how. So using the machine, filename example, we might want to
batch multiple files together for the same machine. For that, we could do
something like
  {'filename': 'csv'}
to combine them all as comma-separated values. Now if we have multiple machine,
filename pairs such as ('m1', 'f1'), ('m1', 'f2'), ('m2', 'f3'), ('m2', 'f4'),
we'd end up with batch jobs with host, files pairs of ('m1', 'f1,f2') and
('m2', 'f3,f4').

The worker will send the batch class, batcher args and batcher aggregate args to
the worker once per class, which is why these are class methods. It doesn't make
sense to have different ways to batch per individual task, so that's not
allowed.
daveFNbuck added a commit to Houzz/luigi that referenced this pull request Feb 9, 2016
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

In order to gain efficiency when many jobs are queued up, this change allows
workers to provide details of how jobs can be batched to the scheduler. If you
have several hourly jobs of the same type in the scheduler, it can combine them
into a single job for the worker. We allow parameters to be combined in three
ways: we can combine all the arguments in a csv, take the min and max to form
a range, or just provide the min or max. The csv gives the most specificity,
but range and min/max are available for when that's all you need. In particular,
the max function provides an implementation of spotify#570, allowing for jobs that
overwrite eachother to be grouped by just running the largest one.

In order to implement this, the scheduler will create a new task based on the
information sent by the worker. It's possible (as in the max/min case) that the
new task already exists, but if it doesn't it will be cleaned up at the end of
the run. While this new task is running, any other tasks will be marked as
BATCH_RUNNING. When the head task becomes DONE or FAILED, the BATCH_RUNNING
tasks will also be updated accordingly. They'll also have their tracking urls
updated to match the batch task.

This is a fairly big change to how the scheduler works, so there are a few
issues with it in the initial implementation:
  - newly created batch tasks don't show up in dependency graphs
  - the run summary doesn't know what happened to the batched tasks
  - we can't limit how big batches can be (how should we handle ranges?)
  - batching takes quadratic time for simplicity of implementation
  - I'm not sure what would happen if there was a yield in a batch run function

On the worker side, batching is accomplished by setting a batch_class,
batcher_args and batcher_aggregate_args. The batch class is the Python class
that runs the batched version of the job. This can be set equal to the current
class by overriding the class method get_batch_class.

The batcher_args are the arguments passed from the current class to the batch
class. These come in pairs. So if the original class has parameters machine and
filename that need to go to host and files in the batcher, you'll use
  [('machine', 'host'), ('filename', 'files')]
for batcher_args.

The final value is batcher_aggregate_args, which explains which arguments are to
be aggregated and how. So using the machine, filename example, we might want to
batch multiple files together for the same machine. For that, we could do
something like
  {'filename': 'csv'}
to combine them all as comma-separated values. Now if we have multiple machine,
filename pairs such as ('m1', 'f1'), ('m1', 'f2'), ('m2', 'f3'), ('m2', 'f4'),
we'd end up with batch jobs with host, files pairs of ('m1', 'f1,f2') and
('m2', 'f3,f4').

The worker will send the batch class, batcher args and batcher aggregate args to
the worker once per class, which is why these are class methods. It doesn't make
sense to have different ways to batch per individual task, so that's not
allowed.
daveFNbuck added a commit to Houzz/luigi that referenced this pull request Feb 25, 2016
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

In order to gain efficiency when many jobs are queued up, this change allows
workers to provide details of how jobs can be batched to the scheduler. If you
have several hourly jobs of the same type in the scheduler, it can combine them
into a single job for the worker. We allow parameters to be combined in three
ways: we can combine all the arguments in a csv, take the min and max to form
a range, or just provide the min or max. The csv gives the most specificity,
but range and min/max are available for when that's all you need. In particular,
the max function provides an implementation of spotify#570, allowing for jobs that
overwrite eachother to be grouped by just running the largest one.

In order to implement this, the scheduler will create a new task based on the
information sent by the worker. It's possible (as in the max/min case) that the
new task already exists, but if it doesn't it will be cleaned up at the end of
the run. While this new task is running, any other tasks will be marked as
BATCH_RUNNING. When the head task becomes DONE or FAILED, the BATCH_RUNNING
tasks will also be updated accordingly. They'll also have their tracking urls
updated to match the batch task.

This is a fairly big change to how the scheduler works, so there are a few
issues with it in the initial implementation:
  - newly created batch tasks don't show up in dependency graphs
  - the run summary doesn't know what happened to the batched tasks
  - we can't limit how big batches can be (how should we handle ranges?)
  - batching takes quadratic time for simplicity of implementation
  - I'm not sure what would happen if there was a yield in a batch run function

On the worker side, batching is accomplished by setting a batch_class,
batcher_args and batcher_aggregate_args. The batch class is the Python class
that runs the batched version of the job. This can be set equal to the current
class by overriding the class method get_batch_class.

The batcher_args are the arguments passed from the current class to the batch
class. These come in pairs. So if the original class has parameters machine and
filename that need to go to host and files in the batcher, you'll use
  [('machine', 'host'), ('filename', 'files')]
for batcher_args.

The final value is batcher_aggregate_args, which explains which arguments are to
be aggregated and how. So using the machine, filename example, we might want to
batch multiple files together for the same machine. For that, we could do
something like
  {'filename': 'csv'}
to combine them all as comma-separated values. Now if we have multiple machine,
filename pairs such as ('m1', 'f1'), ('m1', 'f2'), ('m2', 'f3'), ('m2', 'f4'),
we'd end up with batch jobs with host, files pairs of ('m1', 'f1,f2') and
('m2', 'f3,f4').

The worker will send the batch class, batcher args and batcher aggregate args to
the worker once per class, which is why these are class methods. It doesn't make
sense to have different ways to batch per individual task, so that's not
allowed.
daveFNbuck added a commit to Houzz/luigi that referenced this pull request Mar 3, 2016
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

In order to gain efficiency when many jobs are queued up, this change allows
workers to provide details of how jobs can be batched to the scheduler. If you
have several hourly jobs of the same type in the scheduler, it can combine them
into a single job for the worker. We allow parameters to be combined in three
ways: we can combine all the arguments in a csv, take the min and max to form
a range, or just provide the min or max. The csv gives the most specificity,
but range and min/max are available for when that's all you need. In particular,
the max function provides an implementation of spotify#570, allowing for jobs that
overwrite eachother to be grouped by just running the largest one.

In order to implement this, the scheduler will create a new task based on the
information sent by the worker. It's possible (as in the max/min case) that the
new task already exists, but if it doesn't it will be cleaned up at the end of
the run. While this new task is running, any other tasks will be marked as
BATCH_RUNNING. When the head task becomes DONE or FAILED, the BATCH_RUNNING
tasks will also be updated accordingly. They'll also have their tracking urls
updated to match the batch task.

This is a fairly big change to how the scheduler works, so there are a few
issues with it in the initial implementation:
  - newly created batch tasks don't show up in dependency graphs
  - the run summary doesn't know what happened to the batched tasks
  - we can't limit how big batches can be (how should we handle ranges?)
  - batching takes quadratic time for simplicity of implementation
  - I'm not sure what would happen if there was a yield in a batch run function

On the worker side, batching is accomplished by setting a batch_class,
batcher_args and batcher_aggregate_args. The batch class is the Python class
that runs the batched version of the job. This can be set equal to the current
class by overriding the class method get_batch_class.

The batcher_args are the arguments passed from the current class to the batch
class. These come in pairs. So if the original class has parameters machine and
filename that need to go to host and files in the batcher, you'll use
  [('machine', 'host'), ('filename', 'files')]
for batcher_args.

The final value is batcher_aggregate_args, which explains which arguments are to
be aggregated and how. So using the machine, filename example, we might want to
batch multiple files together for the same machine. For that, we could do
something like
  {'filename': 'csv'}
to combine them all as comma-separated values. Now if we have multiple machine,
filename pairs such as ('m1', 'f1'), ('m1', 'f2'), ('m2', 'f3'), ('m2', 'f4'),
we'd end up with batch jobs with host, files pairs of ('m1', 'f1,f2') and
('m2', 'f3,f4').

The worker will send the batch class, batcher args and batcher aggregate args to
the worker once per class, which is why these are class methods. It doesn't make
sense to have different ways to batch per individual task, so that's not
allowed.
daveFNbuck added a commit to Houzz/luigi that referenced this pull request Mar 4, 2016
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

In order to gain efficiency when many jobs are queued up, this change allows
workers to provide details of how jobs can be batched to the scheduler. If you
have several hourly jobs of the same type in the scheduler, it can combine them
into a single job for the worker. We allow parameters to be combined in three
ways: we can combine all the arguments in a csv, take the min and max to form
a range, or just provide the min or max. The csv gives the most specificity,
but range and min/max are available for when that's all you need. In particular,
the max function provides an implementation of spotify#570, allowing for jobs that
overwrite eachother to be grouped by just running the largest one.

In order to implement this, the scheduler will create a new task based on the
information sent by the worker. It's possible (as in the max/min case) that the
new task already exists, but if it doesn't it will be cleaned up at the end of
the run. While this new task is running, any other tasks will be marked as
BATCH_RUNNING. When the head task becomes DONE or FAILED, the BATCH_RUNNING
tasks will also be updated accordingly. They'll also have their tracking urls
updated to match the batch task.

This is a fairly big change to how the scheduler works, so there are a few
issues with it in the initial implementation:
  - newly created batch tasks don't show up in dependency graphs
  - the run summary doesn't know what happened to the batched tasks
  - we can't limit how big batches can be (how should we handle ranges?)
  - batching takes quadratic time for simplicity of implementation
  - I'm not sure what would happen if there was a yield in a batch run function

On the worker side, batching is accomplished by setting a batch_class,
batcher_args and batcher_aggregate_args. The batch class is the Python class
that runs the batched version of the job. This can be set equal to the current
class by overriding the class method get_batch_class.

The batcher_args are the arguments passed from the current class to the batch
class. These come in pairs. So if the original class has parameters machine and
filename that need to go to host and files in the batcher, you'll use
  [('machine', 'host'), ('filename', 'files')]
for batcher_args.

The final value is batcher_aggregate_args, which explains which arguments are to
be aggregated and how. So using the machine, filename example, we might want to
batch multiple files together for the same machine. For that, we could do
something like
  {'filename': 'csv'}
to combine them all as comma-separated values. Now if we have multiple machine,
filename pairs such as ('m1', 'f1'), ('m1', 'f2'), ('m2', 'f3'), ('m2', 'f4'),
we'd end up with batch jobs with host, files pairs of ('m1', 'f1,f2') and
('m2', 'f3,f4').

The worker will send the batch class, batcher args and batcher aggregate args to
the worker once per class, which is why these are class methods. It doesn't make
sense to have different ways to batch per individual task, so that's not
allowed.
daveFNbuck added a commit to Houzz/luigi that referenced this pull request May 31, 2016
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

In order to gain efficiency when many jobs are queued up, this change allows
workers to provide details of how jobs can be batched to the scheduler. If you
have several hourly jobs of the same type in the scheduler, it can combine them
into a single job for the worker. We allow parameters to be combined in three
ways: we can combine all the arguments in a csv, take the min and max to form
a range, or just provide the min or max. The csv gives the most specificity,
but range and min/max are available for when that's all you need. In particular,
the max function provides an implementation of spotify#570, allowing for jobs that
overwrite eachother to be grouped by just running the largest one.

In order to implement this, the scheduler will create a new task based on the
information sent by the worker. It's possible (as in the max/min case) that the
new task already exists, but if it doesn't it will be cleaned up at the end of
the run. While this new task is running, any other tasks will be marked as
BATCH_RUNNING. When the head task becomes DONE or FAILED, the BATCH_RUNNING
tasks will also be updated accordingly. They'll also have their tracking urls
updated to match the batch task.

This is a fairly big change to how the scheduler works, so there are a few
issues with it in the initial implementation:
  - newly created batch tasks don't show up in dependency graphs
  - the run summary doesn't know what happened to the batched tasks
  - batching takes quadratic time for simplicity of implementation
  - I'm not sure what would happen if there was a yield in a batch run function

For the user, batching is accomplished by setting batch_method in the parameters
that you wish to batch. You can limit the number of tasks allowed to run
simultaneously in a single batch by setting the class variable batch_size. If
you want to exempt a specific task from being part of a batch, simply set its
is_batchable property to False.
daveFNbuck added a commit to Houzz/luigi that referenced this pull request May 31, 2016
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

In order to gain efficiency when many jobs are queued up, this change allows
workers to provide details of how jobs can be batched to the scheduler. If you
have several hourly jobs of the same type in the scheduler, it can combine them
into a single job for the worker. We allow parameters to be combined in three
ways: we can combine all the arguments in a csv, take the min and max to form
a range, or just provide the min or max. The csv gives the most specificity,
but range and min/max are available for when that's all you need. In particular,
the max function provides an implementation of spotify#570, allowing for jobs that
overwrite eachother to be grouped by just running the largest one.

In order to implement this, the scheduler will create a new task based on the
information sent by the worker. It's possible (as in the max/min case) that the
new task already exists, but if it doesn't it will be cleaned up at the end of
the run. While this new task is running, any other tasks will be marked as
BATCH_RUNNING. When the head task becomes DONE or FAILED, the BATCH_RUNNING
tasks will also be updated accordingly. They'll also have their tracking urls
updated to match the batch task.

This is a fairly big change to how the scheduler works, so there are a few
issues with it in the initial implementation:
  - newly created batch tasks don't show up in dependency graphs
  - the run summary doesn't know what happened to the batched tasks
  - batching takes quadratic time for simplicity of implementation
  - I'm not sure what would happen if there was a yield in a batch run function

For the user, batching is accomplished by setting batch_method in the parameters
that you wish to batch. You can limit the number of tasks allowed to run
simultaneously in a single batch by setting the class variable batch_size. If
you want to exempt a specific task from being part of a batch, simply set its
is_batchable property to False.
daveFNbuck added a commit to Houzz/luigi that referenced this pull request Jun 1, 2016
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

In order to gain efficiency when many jobs are queued up, this change allows
workers to provide details of how jobs can be batched to the scheduler. If you
have several hourly jobs of the same type in the scheduler, it can combine them
into a single job for the worker. We allow parameters to be combined in three
ways: we can combine all the arguments in a csv, take the min and max to form
a range, or just provide the min or max. The csv gives the most specificity,
but range and min/max are available for when that's all you need. In particular,
the max function provides an implementation of spotify#570, allowing for jobs that
overwrite eachother to be grouped by just running the largest one.

In order to implement this, the scheduler will create a new task based on the
information sent by the worker. It's possible (as in the max/min case) that the
new task already exists, but if it doesn't it will be cleaned up at the end of
the run. While this new task is running, any other tasks will be marked as
BATCH_RUNNING. When the head task becomes DONE or FAILED, the BATCH_RUNNING
tasks will also be updated accordingly. They'll also have their tracking urls
updated to match the batch task.

This is a fairly big change to how the scheduler works, so there are a few
issues with it in the initial implementation:
  - newly created batch tasks don't show up in dependency graphs
  - the run summary doesn't know what happened to the batched tasks
  - batching takes quadratic time for simplicity of implementation
  - I'm not sure what would happen if there was a yield in a batch run function

For the user, batching is accomplished by setting batch_method in the parameters
that you wish to batch. You can limit the number of tasks allowed to run
simultaneously in a single batch by setting the class variable batch_size. If
you want to exempt a specific task from being part of a batch, simply set its
is_batchable property to False.
daveFNbuck added a commit to Houzz/luigi that referenced this pull request Jun 1, 2016
Sometimes it's more efficient to run a group of tasks all at once rather than
one at a time. With luigi, it's difficult to take advantage of this because your
batch size will also be the minimum granularity you're able to compute. So if
you have a job that runs hourly, you can't combine their computation when many
of them get backlogged. When you have a task that runs daily, you can't get
hourly runs.

In order to gain efficiency when many jobs are queued up, this change allows
workers to provide details of how jobs can be batched to the scheduler. If you
have several hourly jobs of the same type in the scheduler, it can combine them
into a single job for the worker. We allow parameters to be combined in three
ways: we can combine all the arguments in a csv, take the min and max to form
a range, or just provide the min or max. The csv gives the most specificity,
but range and min/max are available for when that's all you need. In particular,
the max function provides an implementation of spotify#570, allowing for jobs that
overwrite eachother to be grouped by just running the largest one.

In order to implement this, the scheduler will create a new task based on the
information sent by the worker. It's possible (as in the max/min case) that the
new task already exists, but if it doesn't it will be cleaned up at the end of
the run. While this new task is running, any other tasks will be marked as
BATCH_RUNNING. When the head task becomes DONE or FAILED, the BATCH_RUNNING
tasks will also be updated accordingly. They'll also have their tracking urls
updated to match the batch task.

This is a fairly big change to how the scheduler works, so there are a few
issues with it in the initial implementation:
  - newly created batch tasks don't show up in dependency graphs
  - the run summary doesn't know what happened to the batched tasks
  - batching takes quadratic time for simplicity of implementation
  - I'm not sure what would happen if there was a yield in a batch run function

For the user, batching is accomplished by setting batch_method in the parameters
that you wish to batch. You can limit the number of tasks allowed to run
simultaneously in a single batch by setting the class variable batch_size. If
you want to exempt a specific task from being part of a batch, simply set its
is_batchable property to False.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants