Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling Metrics and SLA Reporting for Throughput Violating Topics via Datastream Update API Part 1 #928

Merged

Conversation

shrinandthakkar
Copy link
Collaborator

The EventProducer of every DatastreamTask reports SLA and latency metrics for every datastream record. But when topics (at least one partition) have higher throughput than the thresholds, it introduces latency and SLA misses in the mirroring pipeline.

This pull request is the first part of changes to handle the metrics and SLA reporting of throughput-violating topics via the datastream update API. It introduces the following changes:

  1. The datastream update endpoint shall simultaneously accept a datastream and a list of throughput-violating topics via its datastream metadata. The ZkAdapter would persist this information in the DatastreamStore.
  2. The update API touches every server host in its normal code path, and in that code path, every host (Coordinator) maintains a shared cache (Datastream -> Violating Topics Map) with the latest violations for every datastream.
  3. A new callback for the eventProducer to fetch the latest list of offending topics from the Coordinator. We ensure that the correct set of topics is excluded from reporting the metrics and SLAs for every record.

Part 2 of this series would take care of:

  1. Excluding the reporting of metrics for these throughput-violating topics within EventProducer.
  2. Introducing separate metrics for throughput-violating topics.

Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email security@linkedin.com instead, as described in
the contribution guidelines.

Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md

@shrinandthakkar shrinandthakkar force-pushed the inlogs-bad-actors-update-api branch 2 times, most recently from eb4de71 to ba7ba60 Compare March 17, 2023 07:32
@shrinandthakkar shrinandthakkar changed the title Handling Metrics and SLA Reporting for Throughput Violating Topics via Datastream Update API Handling Metrics and SLA Reporting for Throughput Violating Topics via Datastream Update API Part 1 Mar 17, 2023
@shrinandthakkar shrinandthakkar force-pushed the inlogs-bad-actors-update-api branch 8 times, most recently from ee784a5 to 6622bed Compare March 22, 2023 23:41
(t) -> getThroughputViolatingTopics(t.getDatastreams()) :
(t) -> new HashSet<>();

EventProducer producer =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From @vmaheshw

Will it make sense to simplify this at Datastream level rather than DatastreamTask level?
For eg: in digest, there is only one datastream. This function will result in 10 copies for 
the same datastream.

Also, hashing based on datastream name will be cheaper and more efficient.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of moving this at the Datastream level, I refactored to only maintaining a single initialization of this callback in the coordinator which is passed to each EventProducer init.

private final ReadWriteLock _throughputViolatingTopicsMapReadWriteLock = new ReentrantReadWriteLock();
private final Lock _throughputViolatingTopicsMapWriteLock = _throughputViolatingTopicsMapReadWriteLock.writeLock();
private final Lock _throughputViolatingTopicsMapReadLock = _throughputViolatingTopicsMapReadWriteLock.readLock();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From @vmaheshw

Do we need to go to low-level locks for this? Can we not rely on ConcurrentHashMap? 
Violation calculation does not have to be precise in the close second.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer not to use the concurrent hash map since while performing "replace-all" to this map, there may be chances that these violating topics' metrics and SLAs get reported. Hence to keep things consistent, I went with this approach.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend simplicity, especially since this is not on a critical path.

Copy link
Collaborator

@vmaheshw vmaheshw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am okay with the READ-WRITE lock if the other approver is also fine.

Please address the valid check comment.

Comment on lines +41 to +42
private static final Double ONE_MEBIBYTE = (double) (1024 * 1024);
private static final Double ZNODE_BLOB_SIZE_LIMIT = ONE_MEBIBYTE;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need 2 variables?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of these variables are little different

  • ZNODE_BLOB_SIZE_LIMIT ––> Validation of data size per znode based on the znode limit (=1MB)
  • ONE_MEBIBYTE --> Converting bytes to MB to get encoded data size in MBs.

Ideally, I can use the same variable, but I kept them both for better understanding purposes of two different operations happening on the same value.

@@ -127,6 +128,12 @@ public void updateDatastream(String key, Datastream datastream, boolean notifyLe
throw new DatastreamException("Datastream does not exists, can not be updated: " + key);
}

// As this limit is ZK specific, adding this validation check specifically in ZookeeperBackedDatastreamStore.
double datastreamBlobSizeInMBs = getBlobSizeInMBs(DatastreamUtils.toJSON(datastream));
Validate.isTrue(datastreamBlobSizeInMBs <= ZNODE_BLOB_SIZE_LIMIT,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation check is tricky, especially in the case of a Programmatic update. This will block the datastream update until the logic in the caller is fixed. Can you think of a less disruptive way?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your concerns @vmaheshw? I understand that we need to add similar validation on the client side as well. If a client request is breaching the ZK node size limit, it will eventually fail the update anyway, it's just the failure will happen in the ZkAdapter/ZkClient layer. This to me is a more explicit and descriptive way to fail.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought that it can impact the datastream restart, but I was wrong.

However, in the scenario, the limit goes >1MB because of the violation list, we will not be able to disable this at the server level and will have to rely on the external service to disable this feature. Until then, any other update/allowlisting will not go through.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vmaheshw understood your concern.

I have added steps in both update and create paths to only adhere to the throughputViolatingTopic metadata if the corresponding config is enabled in our server.

private final ReadWriteLock _throughputViolatingTopicsMapReadWriteLock = new ReentrantReadWriteLock();
private final Lock _throughputViolatingTopicsMapWriteLock = _throughputViolatingTopicsMapReadWriteLock.writeLock();
private final Lock _throughputViolatingTopicsMapReadLock = _throughputViolatingTopicsMapReadWriteLock.readLock();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend simplicity, especially since this is not on a critical path.

Copy link
Collaborator

@jzakaryan jzakaryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments. Overall looks good.

@@ -127,6 +128,12 @@ public void updateDatastream(String key, Datastream datastream, boolean notifyLe
throw new DatastreamException("Datastream does not exists, can not be updated: " + key);
}

// As this limit is ZK specific, adding this validation check specifically in ZookeeperBackedDatastreamStore.
double datastreamBlobSizeInMBs = getBlobSizeInMBs(DatastreamUtils.toJSON(datastream));
Validate.isTrue(datastreamBlobSizeInMBs <= ZNODE_BLOB_SIZE_LIMIT,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are your concerns @vmaheshw? I understand that we need to add similar validation on the client side as well. If a client request is breaching the ZK node size limit, it will eventually fail the update anyway, it's just the failure will happen in the ZkAdapter/ZkClient layer. This to me is a more explicit and descriptive way to fail.

Copy link
Collaborator

@jzakaryan jzakaryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going forward, please retain the commit history in your PRs. Don't rebase/squash those commits locally. Having a commit history retained in the PR helps reviewers see changes over time.

@shrinandthakkar shrinandthakkar merged commit 6ebb701 into linkedin:master Mar 31, 2023
shrinandthakkar added a commit that referenced this pull request Apr 13, 2023
* Releasing a new version And Minor improvements

* Using immutable empty set & keeping SNAPSHOT to accidently not release any version

---------

Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn2.linkedin.biz>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants