Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Stopping Logic and Maintain Stopping Latch Counter #877

Conversation

shrinandthakkar
Copy link
Collaborator

@shrinandthakkar shrinandthakkar commented Dec 13, 2021

Fixing stopping logic for tasks by preventing calling stop on dead-thread tasks.

Also maintaining the stopping latch counter irrespective of success or failure in acquiring task lock otherwise it could lead to a state which causes multiple instances to work on the same task concurrently.


Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.

Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md

@vmaheshw
Copy link
Collaborator

vmaheshw commented Feb 1, 2022

@shrinandthakkar Can you please drive this to completion?

Copy link
Collaborator

@vmaheshw vmaheshw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

_logger.info("{} stopped", _taskName);
}
}

protected void countDownStoppedLatch() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a weird api to expose to subclasses. How would an implr know when it is safe to invoke this?

@@ -352,9 +352,14 @@ public void run() {
LOG.info("Trying to acquire the lock on datastreamTask: {}", _datastreamTask);
_datastreamTask.acquire(LOCK_ACQUIRE_TIMEOUT);
} catch (DatastreamRuntimeException ex) {
// setting _stoppedLatch count to 0 since the lock couldn't be acquired,
// as a non-zero stoppedLatch value won't let the task to be stopped.
countDownStoppedLatch();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is being re-thrown anyway, so why not handle it in the super class? I don't like how tightly the super and sub classes are being coupled here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vmaheshw @surajkn @ryannedolan

Let me take this up in a separate PR but to reduce the coupling here, could we do something like this in the AbstractKafkaConnector(from where the task threads are initiated).

  private ConnectorTaskEntry createKafkaConnectorTask(DatastreamTask task) {
    _logger.info("Creating connector task for datastream task {}.", task);
    AbstractKafkaBasedConnectorTask connectorTask = createKafkaBasedConnectorTask(task);
    Thread taskThread = createTaskThread(connectorTask);
    taskThread.start();

    // task cleanup thread cleans up the task resources like stopped latch counts post shutdown of the task
    Thread taskCleanUpThread = new Thread(() -> {
      try {
        taskThread.join();
      } catch (InterruptedException exception) {
        _logger.error(String.format("Got interrupted exception while waiting for the completion of task : %s ", connectorTask), exception);
      } finally {
        connectorTask.postShutdownCleanUp();
      }
    });
    taskCleanUpThread.start();
    return new ConnectorTaskEntry(connectorTask, taskThread);
  } 

the postShutdownCleanUp method here would take care of counting the latch down so we don't have to handle it in run() functions of either derived or base class implementations.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically it will create extra thread for each thread. I think it should work, but with the constraint of double task threads.

_logger.info("{} stopped", _taskName);
}
}

protected void countDownStoppedLatch() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vmaheshw I remember you had expressed concern over this way of doing the count down. IIRC your concern was that any subclass of this can forget to call countDownStopppedLatch (although in this patch Shrinand added a call to it in KafkaMirrorMakerConnectorTask). So just wanted to check if that was considered and concluded that this is the most reasonable way of doing it?
cc @shrinandthakkar

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was not in support of doing it this way, but also did not have a better idea. So, we decided to do it, until we have a better idea.

@shrinandthakkar shrinandthakkar merged commit 5b30f54 into linkedin:master Jun 8, 2022
shrinandthakkar pushed a commit to shrinandthakkar/brooklin that referenced this pull request Jun 9, 2022
shrinandthakkar added a commit that referenced this pull request Jun 9, 2022
…fter merging #877 (#908)

Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn1.linkedin.biz>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants