Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for post datastream create,update,delete and state change action #915

Merged
merged 20 commits into from
Nov 14, 2022

Conversation

hshukla
Copy link
Collaborator

@hshukla hshukla commented Oct 31, 2022

  • Connector::postDatastreamStateChangeAction(Datastream stream) Added to expose the hook for post datastream add/update/delete and state change action. The default has no implementation, any interested connectors need to add support by implementing this method.

  • In DatastreamResource, call this newly added method in Coordinator after create, update, delete, state change(Pause, Resume, Stop).

  • Add unit test to handle this new method functionality.

@hshukla hshukla requested a review from vmaheshw October 31, 2022 21:05
Copy link
Collaborator

@vmaheshw vmaheshw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In TestCoordinator, I just checked the update sequence and stopped at seeing multiple events published for one update. I did not evaluate the other tests.

Completed review for other source code.

@@ -438,6 +438,7 @@ public void onDatastreamUpdate() {
datastreamGroups.stream().filter(x -> x.getTaskPrefix().equals(task.getTaskPrefix())).findFirst();
if (dg.isPresent()) {
((DatastreamTaskImpl) task).setDatastreams(dg.get().getDatastreams());
dg.get().getDatastreams().forEach(this::invokePostDataStreamStateChangeAction);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right place to capture all the updates as this will get called by all the hosts, and it will result in this.

This listener is active in all the nodes listed for assignment change or any change in the datastream associated with the assignment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right. For the update, we do not have dedicated listener that runs only on a leader. I am thinking to make the postDS create, update, delete or state change not specific for DH publish. Like, if we want to allow any further followup actions, which requires task level knowledge, keeping logic here would be helpful and since we are running the post invocation logic as an async call, I thought it would be ok.

Comment on lines 1099 to 1100
Executors.newSingleThreadExecutor().submit(() -> {
try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a) Lifecycle management is required for all the executors to ensure they are correctly shut down.
b) Naming the thread pool improves debuggability, especially with thread dumps.
c) Since you are not reusing the executor. It will create N threads for N operations and will not be just a single thread. Overall this can be fatal if there is a bug in the code or the thread operation is blocked at some step; all the datastream updates will result in threads that will never die.

example:
/ An executor to spawn threads to close the producer. private final ExecutorService _producerCloseExecutorService = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setNameFormat("KafkaProducerWrapperClose-%d").build());

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should also check the lifecycle of an executor in the current file to see how the lifecycle mgmt is done during OnSessionExpiry.

final ConnectorInfo connectorInfo = _connectors.get(connectorName);
connectorInfo.getConnector().postDatastreamStateChangeAction(ds);
} catch (Exception e) {
// no need to re-throw the exception as we do not want to kill the leader thread
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specific to your use case where the postDatastreamInitialization step is not critical. It will be a good practice to handle the exceptions in the function definition rather than here. So, it gives leverage in the future that if there is a critical error and the exception is to be propagated, it is not swallowed in the middle.

In another place, you have the possible call in the non-leader code path. The comment is not aligning with the code path.

}

private void invokePostDataStreamStateChangeAction(final Datastream ds) {
Executors.newSingleThreadExecutor().submit(() -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this, all the logic is async, there is no propagation path to this. But, it will be useful to track the health of this executor/thread so that it is evident whether the thread is blocked or in use for easy debugging. Similarly, "ENTRY" and "EXIT" logging will be very helpful, given this is not a frequent operation.

* @param stream the datastream
* @throws Exception
*/
default void postDatastreamStateChangeAction(Datastream stream) throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky.
It will be interesting to understand the following scenarios:
a) update the datastream without restarting. Will this get called, then?
b) Datastream is paused or stopped, or stuck in a STOPPING state.

* Hook that can be used to do any additional operations once the datastream has been created, update or deleted.
* This method will be invoked for datastream state change too.
*
* NOTE: This method is called after the datastream is written to/deleted from ZooKeeper
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very critical to document this in the java docs, as this is a best-effort call. If there is a leadership transition or any other error, this method may not get called.


// connector1 is assigned on coordinator1 which is a leader
// and post datastream create/delete or state chance method should be invoked only for a leader instance
Assert.assertEquals(connector1.getPostDSStatechangeActionInvokeCount(), 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check on connector2 as well with value as 0

Comment on lines 1372 to 1374
DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType, "datastream1");
Datastream datastream = list[0];
LOG.info("Created datastream: {}", datastream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create another datastream and ensure that your new API is not getting called for this datastream when the other datastream changed.

Comment on lines 1401 to 1405
assertConnectorReceiveDatastreamUpdate(connector1, datastream);
Assert.assertTrue(PollUtils.poll(() -> connector1.getPostDSStatechangeActionInvokeCount() == 2, 200,
WAIT_TIMEOUT_MS));
Assert.assertTrue(PollUtils.poll(() -> connector2.getPostDSStatechangeActionInvokeCount() == 1, 200,
WAIT_TIMEOUT_MS));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should connector2 also publish an event? If there are 100 hosts and the tasks are assigned to all of them, an update operation will result in 100 events.

Copy link
Collaborator

@shrinandthakkar shrinandthakkar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud for a couple of cases,

When we delete a datastream,

There would be two threads,

  1. The thread that handles the datastream resource request, updates the zookeeper and notifies the leader of the change.
  2. The second thread invokes postDatastreamStateChangeAction.

Now, the leader would create a new assignment and invoke the respective followers to update their respective tasks.

So, wouldn't there be chances of race conditions between deleting those tasks (for a delete request) and fetching the source information from the tasks for the postDatastreamStateChangeAction thread?

When we create a new datastream,

Similar race condition?

try {
final String connectorName = datastream.getConnectorName();
final ConnectorInfo connectorInfo = _connectors.get(connectorName);
connectorInfo.getConnector().postDatastreamStateChangeAction(datastream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we decided on asynchronous calls to these updates, would individual connectors down the line invoke them in an async fashion?

If yes, then wouldn't having the async behavior implemented here be more accessible instead of expecting every new connector implementation to know about implementing it in a separate thread?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking process was same when I started implementing this logic to have the async behavior here in coordinator so that connectors do not need to worry about it. However, if you think and compare it with other existing methods from connector interface, ideally it would be each connectors responsibility to implement the way it wants. For our use case we are ok with fire and forget call, what if some one else needs a blocking call, similar to validateUpdate or postDatastreamInitialization? This made be push the async logic inside the connector and not in the upper level(coordinator). What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I understand that it would be difficult to manage the life cycle of the thread from the coordinator, I was only concerned as this new method is being called while handling a client request, and I think we shouldn't ideally be supporting a blocking call outside of this repo for that.

Can we somehow enforce a max timeout for these invocations in this implementation, so that we don't block the corresponding request due to unknown implementations outside of this repo?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, this would introduce yet extra operation during REST call, however, I don't think metadata publish usecase should determine the blocking vs non-blocking call for open source use case.

Can we somehow enforce a max timeout for these invocations in this implementation, so that we don't block the corresponding request due to unknown implementations outside of this repo?

I don't think there would be a way unless we use a separate thread and make it a blocking call with timeout(1 or 2sec). Right now, the client request timeout will kick it, which I am not sure what it is (Ideally 5sec or default 30secs). I still feel that would be inside connectors as they will be interested in the result of this call and not the coordinator. The new method java doc explicitly calls out the behavior of this method call and if you are worried about API call latency, make sure it gets addressed in connector implementation.

I don't think it would be difficult to manage the life cycle of thread. With this design I see this as yet another helper method for connectors to do their work. If they want it to be blocking call, they can do so. For us, right now we are talking about async call for metadata publish, but that would not be a case for other connectors or other use case. For metadata publish, we can make an async call in the respective connectors, if additional functionality is needed in the future which requires blocking call, this approach will let us do that.

I am not strongly opposing this idea, but design wise it make more sense to have this within connector.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced up with @hshukla on this, as long as we somehow enforce that these are implemented asynchronously, within their connector implementation, we should be good.

And in some case when some implementations are made blocking as such that they timeout the request, it won't cause much of any harm since the state change would have been performed as expected unless in cases when multiple rest calls are invoked (in a combined fashion, which is rare).

try {
final String connectorName = datastream.getConnectorName();
final ConnectorInfo connectorInfo = _connectors.get(connectorName);
connectorInfo.getConnector().postDatastreamStateChangeAction(datastream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced up with @hshukla on this, as long as we somehow enforce that these are implemented asynchronously, within their connector implementation, we should be good.

And in some case when some implementations are made blocking as such that they timeout the request, it won't cause much of any harm since the state change would have been performed as expected unless in cases when multiple rest calls are invoked (in a combined fashion, which is rare).

@shrinandthakkar
Copy link
Collaborator

Thinking out loud for a couple of cases,

When we delete a datastream,

There would be two threads,

  1. The thread that handles the datastream resource request, updates the zookeeper and notifies the leader of the change.
  2. The second thread invokes postDatastreamStateChangeAction.

Now, the leader would create a new assignment and invoke the respective followers to update their respective tasks.
So, wouldn't there be chances of race conditions between deleting those tasks (for a delete request) and fetching the source information from the tasks for the postDatastreamStateChangeAction thread?

When we create a new datastream,

Similar race condition?

Synced up with @hshukla on this, and we decided that we'd try to get all the required info for these cases from the connector level and not the task level to avoid these race conditions.

Copy link
Collaborator

@shrinandthakkar shrinandthakkar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Collaborator

@vmaheshw vmaheshw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hshukla hshukla merged commit 71a9dea into linkedin:master Nov 14, 2022
@hshukla hshukla deleted the postDSUpsertDelete branch November 14, 2022 19:12
@ehoner ehoner mentioned this pull request Jan 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants