Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cache for event writer. #1032

Merged
merged 2 commits into from Jul 25, 2023

Conversation

Technoboy-
Copy link
Contributor

@Technoboy- Technoboy- commented Jul 25, 2023

Motivation

Add cache for the event writer to avoid creating event writer every time.
image

Modifications

Describe the modifications you've done.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@github-actions
Copy link

@Technoboy-:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

mattisonchao
mattisonchao previously approved these changes Jul 25, 2023
@@ -112,15 +125,14 @@ public CompletableFuture<Void> sendPSKEvent(PSKEvent event) {

@Override
public CompletableFuture<Void> sendEvent(MqttEvent event) {
CompletableFuture<SystemTopicClient.Writer<MqttEvent>> writerFuture = systemTopicClient.newWriterAsync();
CompletableFuture<SystemTopicClient.Writer<MqttEvent>> writerFuture = writerCaches.get(WRITER_KEY);
return writerFuture.thenCompose(writer -> {
CompletableFuture<MessageId> writeFuture = ActionType.DELETE.equals(event.getActionType())
? writer.deleteAsync(event.getKey(), event) : writer.writeAsync(event.getKey(), event);
writeFuture.whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] send event error.", SYSTEM_EVENT_TOPIC, ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be invalidated when the writer has any exceptions.

@Technoboy- Technoboy- merged commit a440913 into streamnative:master Jul 25, 2023
45 checks passed
Technoboy- added a commit that referenced this pull request Jul 25, 2023
Technoboy- added a commit that referenced this pull request Jul 25, 2023
Technoboy- added a commit that referenced this pull request Jul 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants