New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for custom query prerequisite logic #16073
Conversation
fe7dd65
to
1e97206
Compare
514fb6c
to
ed5cc3d
Compare
beginQueued(now); | ||
queuedTime.compareAndSet(null, nanosSince(beginQueuedNanos, now)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems this two statement makes queuedTime almost 0. Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is how this class works in general. Here is another function :
private void beginDispatching(long now)
{
beginWaitingForResources(now);
resourceWaitingTime.compareAndSet(null, nanosSince(beginResourceWaitingNanos, now));
beginDispatchingNanos.compareAndSet(null, now);
}
The logic is as following:
beginQueued(now)
->beginQueued
hascompareAndSet
- this is there for situations where in the query goes fromWAITING_FOR_PREREQUISITES
toFAILED
- in that case this chaining of functions ensures that all the states are set. So if the query had already crossed queuing state - compareAndSet will make this function a no-opqueuedTime.compareAndSet(null, nanosSince(beginQueuedNanos, now));
- the start of WaitingForResources indicates the end of queuing and hence we are setting the queuing time now
|
presto-spi/src/main/java/com/facebook/presto/spi/dispatcher/QueryPrerequisitesFactory.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/dispatcher/DefaultQueryPrerequisites.java
Outdated
Show resolved
Hide resolved
} | ||
}); | ||
|
||
prerequisitesFuture.whenCompleteAsync((result, throwable) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't appear necesary to use the async
version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that if I don't use the async function - then depending on when the future finishes, that will decide the execution -
- If the future is finished before the callback is set - this will execute in the thread where the callback is registered (which is fine for us)
- If the future is finished after the callback is set - this will execute in the thread that completes the future - which is not desirable in my opinion - we should not load the threads that are computing the prerequisites with actual query registration stuff.
So given (2) - I went with the async option
@Test
public void testFutures()
throws Exception
{
CompletableFuture<?> a = CompletableFuture.completedFuture(null);
System.out.println(1);
a.whenComplete((x, y) -> {System.out.println(Thread.currentThread().getName());});
System.out.println(2);
Thread foo = new Thread(() -> a.whenComplete((x, y) -> {System.out.println(Thread.currentThread().getName());}), "test-thread");
foo.start();
foo.join();
CompletableFuture<?> b = new CompletableFuture<>();
System.out.println(3);
b.whenComplete((x, y) -> {System.out.println(Thread.currentThread().getName());});
System.out.println(4);
Thread bar = new Thread(() -> b.complete(null), "test-thread2");
bar.start();
bar.join();
Output:
1
main
2
test-thread
3
4
test-thread2
presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/dispatcher/QueryPrerequisitesManager.java
Show resolved
Hide resolved
7fbf9a3
to
53ca0b6
Compare
presto-main/src/main/java/com/facebook/presto/dispatcher/DefaultQueryPrerequisites.java
Show resolved
Hide resolved
default void queryFinished(QueryId queryId) | ||
{ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't feel like the right place to add logic which can be executed after the query is finished running since this is for just for query prerequisites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to inform the QueryPrerequisites that a query has finished and they can cleanup state if needed. This is useful since the query may fail before the prerequisites are satisfied (example - a user may kill their query).
Can you clarify your concerns here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method signature read like actions to be done post the query finished executing. Since this class is for processing prerequisites before the query is run, mixing pre and post actions in the same class was my concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I somehow don't have the same concern and hence unable to think of a solution. Do you have suggestions on how what I mentioned can be achieved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a separate class called PostQueryActions and add methods for queryFinished there. Since this is a high pri PR don't get blocked on this. Keep it in the backlog and see if this is possible.
@@ -97,6 +103,12 @@ public String getState() | |||
return state; | |||
} | |||
|
|||
@JsonProperty | |||
public boolean isWaitingForPrerequisites() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this if we already have state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am maintaining symmetry in this class. We already have isQueued
despite the state being there and hence added this. One of the reasons why I feel this might have been done is that state is a String and not an enum - and hence this could provide people an easy way without figuring out the strings if the query has started or not.
private final Duration queuedTime; | ||
|
||
public static DispatchInfo queued(Duration elapsedTime, Duration queuedTime) | ||
public static DispatchInfo pendingDispatch(Duration elapsedTime, Duration waitingForPrerequisitesTime, Duration queuedTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should consider combining these timings into a wrapper class? Might make it easier to add more timings like this in the future without having to refactor DispatchInfo, BasicQueryStats, LocalDispatchQuery, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding these states is not common and hence adding a wrapper class here might be more overhead than good.
public boolean transitionToQueued() | ||
{ | ||
queryStateTimer.beginQueued(); | ||
return queryState.setIf(QUEUED, currentState -> currentState.ordinal() < QUEUED.ordinal()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this idiom pre-dates your change, but it feels brittle to compare enums by ordinal this way. We should consider writing a comparator, or add a method in QueryStateMachine, that we can write unit tests against.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have unittests for these states. Is there a specific case that you feel should be tested and is not ?
{ | ||
QueryState state = queryError.map(error -> FAILED).orElse(QUEUED); | ||
QueryState state = queryError.map(error -> FAILED).orElse(WAITING_FOR_PREREQUISITES); | ||
return new QueryResults( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also modify the tests in TestQueryResource to account for this change
presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
public void loadQueryPrerequisites() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a lot of boilerplate that is copy-pasted from InternalResourceGroupManager. Perhaps we can consider a refactor here to avoid code duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost every Manager
is written this way - search for the managers that are used in PluginManager. I did consider doing some refactor - but then every manager has something different which makes consolidating them nuanced
private final Map<String, String> systemProperties; | ||
private final Map<ConnectorId, Map<String, String>> connectorProperties; | ||
|
||
public QueryPrerequisitesContext( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't all of these already in Session? Maybe add a convenience constructor that takes Session object so that it's easier to use this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Session
is not a part of SPI and hence I cannot use it. Even I would have preferred it instead of making a new one
} | ||
}); | ||
|
||
prerequisitesFuture.whenCompleteAsync((result, throwable) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add a timeout here to be defensive against a bad implementation that takes too long to complete. We could cancel the thread from here, rather than relying on implementations to properly enforce their own timeouts and cancel themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have the session properties query_max_run_time
which controls how long can a query stay in the system. And if that is reached - the query will fail and that will in turn cancel the future. What extra value do you feel can adding an additional timeout here add ? The implementations can always take care of this themselves by ensuring that they add a timeout on their end if they want a specific timeout
This new query state will support adding custom logic to wait for admin defined prerequistes to finish before a query can start. This commit just introduces the new state and the actual custom prerequisite functionality will be added in a subsequent commit.
Implement mechanism to allow for custom logic to be plugged in to make queries to wait till the time all of the needed prerequisites for a query have been satisfied. Admins can decide to plugin their custom prerequisite logic by implementing the `QueryPrerequisites` interface and injecting that by implementing the factory interface `QueryPrerequisitesFactory` and exposing it through a `Plugin`. If this is not needed, no action is required since a default no-op implementation will be loaded.
queueQuery(); | ||
}, queryExecutor); | ||
} | ||
catch (Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will throw here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waitForPrerequisites
is a plugin that any customer can implement so that plugin can throw
This PR adds a mechanism using which admins can check for certain prerequisites to be satisfied before a query is queued in the Presto queues. An example of this situation is in Facebook - where given our multi-region warehouse setup - we want to ensure that data is present in the region where the query is being executed before the query starts.
The high level design is:
QueryPrerequisites
which allows for custom logic to be implemented and injected using thePlugin
interface. By doing so, admins can choose to add custom logic before query execution and can ensure if some prerequisites need to be met - that can be achieved.WAITING_FOR_PREREQUISITES
as the first stage when a query starts and as theQueryPrerequisites
is complete, it will move toQUEUED
. This is done to ensure a clean line between when a query is ready to run and queued vs the admin needs to do some preparation before the query can be executed.To install a custom
QueryPrerequisites
implementation, a factory instanceQueryPrerequisitesFactory
needs to be implemented which is exposed as aPlugin
to the coordinator. Then the coordinator will load the correct factory by reading the configuration fileetc/query-prerequisites.properties
and the propertyquery-prerequisites.factory
. If nothing is specified, a default implementation will be loaded which finished immediately - hence it is not necessary to install a customQueryPrerequisites
instance.Test plan - Added tests to
TestLocalQueryDispatcher
which contains the logic for invoking and dealing with the prerequisites stage. In addition to this, on a local cluster in Facebook, developed our customQueryPrerequisites
logic and built and injected using thePlugin
mechanism and ensure that the system worked