Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fault tolerance for long running queries #455

Closed
voycey opened this issue Mar 12, 2019 · 13 comments
Closed

Fault tolerance for long running queries #455

voycey opened this issue Mar 12, 2019 · 13 comments

Comments

@voycey
Copy link

voycey commented Mar 12, 2019

We raised the following issue on prestodb (prestodb/presto#11241) this was closed in favour of prestodb/presto#9855 which @martint suggested.

We run a lot of longer running queries on Presto and whilst we try and split them up (we use Airflow as workflow management) it is still a pain if several fail at once.

The ideal end goal is to be able to run presto on GCE Dataproc with Pre-emptible instances (90% cheaper than standard GCE), this could only happen if some degree of fault tolerance is provided.

Even the possibility of having the instance "tell" Presto "Hey- I'm going to shut down in like 30 seconds" and things 'pausing' while it does it and gets replaced would be a huge step forward for us :)

@dain dain changed the title Fault Tolerance is PrestoSQL Fault tolerance for long running queries Mar 12, 2019
@sopel39
Copy link
Member

sopel39 commented Mar 12, 2019

Even the possibility of having the instance "tell" Presto "Hey- I'm going to shut down in like 30 seconds" and things 'pausing' while it does it and gets replaced would be a huge step forward for us :)

What do you mean by instance being replaced? Are all processes moved to the new instance (e.g. like saving/restoring instance state)?

@voycey
Copy link
Author

voycey commented Mar 12, 2019 via email

@kokes
Copy link

kokes commented Apr 22, 2019

Not sure how Google Cloud announces terminations, but on AWS, this can be detected using an API call from within the instance. It's recommended to poll this every five seconds and you then get a two-minute window to clean up whatever you're working on.

https://aws.amazon.com/blogs/aws/new-ec2-spot-instance-termination-notices/

@voycey
Copy link
Author

voycey commented Apr 22, 2019 via email

@kokes
Copy link

kokes commented Apr 23, 2019

Short of cancelling the query and restarting it again after the instance is replaced I don't see the benefit of this?

  1. You could starve the instance of any new splits, just let it finish whatever it's working on, send the remainder of the queue elsewhere, and don't send any new work its way (if possible, I'm not terribly familiar with Presto's internals)
  2. I'd rather the current state was sent to a different node and this "old" node was completely forgotten. The idea of waiting for a node replacement is, I think, not covering all usual workflows - e.g. you could be downsizing a cluster, so this node is going away and is not to be replaced.
  3. Or, instead of the two points above, there could be a full-blown failure recovery mechanism, which would allow for task repeats, lineage tracking etc., but I thought that this was intentionally not implemented to avoid its overhead (and it's one of the architectural differences from Spark).

@dain
Copy link
Member

dain commented Apr 23, 2019

Short of cancelling the query and restarting it again after the instance is replaced I don't see the benefit of this?

  1. You could starve the instance of any new splits, just let it finish whatever it's working on, send the remainder of the queue elsewhere, and don't send any new work its way (if possible, I'm not terribly familiar with Presto's internals)

This is already supported. Looking at the code, you http PUT containing the json 'SHUTTING_DOWN' to /v1/info/state/

  1. I'd rather the current state was sent to a different node and this "old" node was completely forgotten. The idea of waiting for a node replacement is, I think, not covering all usual workflows - e.g. you could be downsizing a cluster, so this node is going away and is not to be replaced.

I think this is the best long term approach, but it is much more complex due to the adaptive streaming nature of Presto.

  1. Or, instead of the two points above, there could be a full-blown failure recovery mechanism, which would allow for task repeats, lineage tracking etc., but I thought that this was intentionally not implemented to avoid its overhead (and it's one of the architectural differences from Spark).

I think this is an orthogonal feature. In some cases it might be nice to have materialization points like Hadoop uses, but in a much more targeted manor (i.e., don't materialize between every single stage).

@voycey
Copy link
Author

voycey commented Apr 24, 2019

I believe the way it is handled in things like Hive are that the dead / dying nodes are just 'excluded' from the available node pool - any splits it was handling is returned to the queue (I don't know the correct terms excuse me) and this is then picked up by the next node.
As Dataproc (and AWS spot instances) continually cycle in and out this can happen fairly regularly.
I'm sure most people who run these kind of workloads would be happy with not 100% efficiency on Presto considering it is orders of magnitude faster than the next best thing (which is Hive) - we certainly would :)

@voycey
Copy link
Author

voycey commented Apr 24, 2019

  1. You could starve the instance of any new splits, just let it finish whatever it's working on, send the remainder of the queue elsewhere, and don't send any new work its way (if possible, I'm not terribly familiar with Presto's internals)

This is already supported. Looking at the code, you http PUT containing the json 'SHUTTING_DOWN' to /v1/info/state/

The time limit is the factor here - on GCP the instance gets a 30 second warning, this can trigger a script (and send the PUT request you mention) but if the instance is force terminated in that time does the whole query still fail?

@electrum
Copy link
Member

Hive on MapReduce executes by breaking the query into a series of MapReduce jobs, each of which has a set of Map and Reduce tasks. If any of the tasks fail, they must be retried. The final output of each reduce task is written to HDFS. The input to each mapper is one or more files on HDFS, from either the original input table, or the output of a previous MapReduce job.

Presto is a streaming query execution engine, with no intermediate data written to disk (aside from spilling which is local to a task on a worker). This is one of the reasons why Presto can be much more efficient than Hive. It also means that losing a machine will fail all queries running on that machine.

This is a trade-off that assumes that providing fault tolerance for every query is more expensive than re-running the rare queries that fail. Spot instances (and unreliable hardware in general) violate this assumption. Or more generally, you can think of this as a curve with MTTF and query length being two of the variables.

@electrum
Copy link
Member

Support for spot instances (or their equivalent) has recently become a highly requested feature due the substantial cost savings. I plan to work on this in the near future. The idea is to support evacuation of a node when requested by moving all of its state to different nodes.

We will get the framework in place by starting with the simplest possible task shape, likely the final stage of an aggregation, then expand additional shapes incrementally, based on complexity and user demand. We have nearly 50 different query operators (components of a task), and support will need to be added for each one (where possible). This is similar to the spilling work, where support was added incrementally.

Note that this does not address unexpected node failures, as that is a very different problem, and likely requires checkpointing or a similar mechanism.

@voycey
Copy link
Author

voycey commented Apr 24, 2019

Hive on MapReduce executes by breaking the query into a series of MapReduce jobs, each of which has a set of Map and Reduce tasks. If any of the tasks fail, they must be retried. The final output of each reduce task is written to HDFS. The input to each mapper is one or more files on HDFS, from either the original input table, or the output of a previous MapReduce job.

Presto is a streaming query execution engine, with no intermediate data written to disk (aside from spilling which is local to a task on a worker). This is one of the reasons why Presto can be much more efficient than Hive. It also means that losing a machine will fail all queries running on that machine.

This is a trade-off that assumes that providing fault tolerance for every query is more expensive than re-running the rare queries that fail. Spot instances (and unreliable hardware in general) violate this assumption. Or more generally, you can think of this as a curve with MTTF and query length being two of the variables.

We use Airflow to retry the queries that we can - unfortunately even broken down to their smallest components some queries take a few hours to run, running this on normal nodes isn't really feasible due to the costs involved. There was some talk of async checkpoints when I brought this up on the previous repo - not sure how feasible it was?

@voycey
Copy link
Author

voycey commented Apr 24, 2019

We will get the framework in place by starting with the simplest possible task shape, likely the final stage of an aggregation, then expand additional shapes incrementally, based on complexity and user demand. We have nearly 50 different query operators (components of a task), and support will need to be added for each one (where possible). This is similar to the spilling work, where support was added incrementally.

This is great news - as long as I know that this is somewhat on the roadmap I am happy, we are currently offloading many of these long running queries to BigQuery and I would prefer to keep it all in one place!

@hashhar
Copy link
Member

hashhar commented Mar 4, 2022

Superseded by #9101

@hashhar hashhar closed this as completed Mar 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

6 participants