New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature : Ability to cancel any arbitrary events from digest #4888
Feature : Ability to cancel any arbitrary events from digest #4888
Conversation
NV-3169 bug: ability to cancel any arbitrary events from digest
Usecase:- Subscriber receives messages in a discord like platform. Each time a new message is sent to user, workflow is triggered and added into digest. This digest is send to user at some defined time of the day. before that defined time, if user sees any of the message, it should be removed from digest and rest of unseen messages should stay in digest Current Working:- Novu allows canceling only first event from digest. and if first event is cancelled then digest is deleted and all events in digest got deleted. Defintion of Done.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some duplication in the new tests and comments as I thought it would be easier to read in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to dedicated file cancel-event.e2e.ts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to dedicated file cancel-event.e2e.ts
apps/api/src/app/events/usecases/cancel-delayed/cancel-delayed.usecase.ts
Outdated
Show resolved
Hide resolved
@@ -34,6 +37,71 @@ export class CancelDelayed { | |||
} | |||
); | |||
|
|||
const mainDigestJob = transactionJobs.find((job) => isMainDigest(job.type, job.status)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here i want to point to the naming issue I had, i am not entirely sure what to call the digest that is responsible for executing the digest in the end of the digest time.
At the current state, I referred to this digest job as main
, and if we would find a new transaction that will place it a follower
.
If someone has any other ideas I'd love to hear them.
} | ||
); | ||
|
||
// meaning that only one trigger was send, and it was cancelled in the CancelDelayed.execute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if keep those comments in the code or not, because IMO this flow is very complicated to grasp.
* that mean that we need to assign a new active digest follower job to replace it. | ||
* so from now on we will continue the follower transaction as main digest job. | ||
*/ | ||
private assignNewDigestExecutor(activeDigestFollower: JobEntity): JobEntity { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created this redundant function in order to create a place where i can easily explain what is happening once we assign this main follower
.
} | ||
|
||
const count = await this.jobRepository.count({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this query because we already have the newly fetched job and its status.
const canceled = job.status === JobStatusEnum.CANCELED; | ||
|
||
if (job.status === JobStatusEnum.CANCELED) { | ||
activeDigestFollower = await this.activeDigestMainFollowerExist(job); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the current digest is cancelled we are looking for follower.
apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts
Outdated
Show resolved
Hide resolved
const value = 'true'; | ||
// const value = process.env.IS_USE_MERGED_DIGEST_ID_ENABLED; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗑️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really amazing work with this 👑
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remembered some doubts i had here, i wonder if we should add lock in this use case
@davidsoderberg what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And use the main digest as lock object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well to tell the truth I am not sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you George, amazing work! 🔥 👏
left a few small suggestions and questions ;)
let delayedJobs = await jobRepository.find({ | ||
_environmentId: session.environment._id, | ||
_templateId: template._id, | ||
type: StepTypeEnum.DELAY, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you miss the check for delayed jobs length?
delayedJobs = await jobRepository.find({ | ||
_environmentId: session.environment._id, | ||
_templateId: template._id, | ||
type: StepTypeEnum.DELAY, | ||
transactionId: id, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here?
expect(fourthMergedTrigger.digest?.events?.length).to.eql(0); | ||
expect(fourthMergedTrigger.status).to.eql(JobStatusEnum.CANCELED); | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we please have also a few tests for the digest with backoff?
const possiblePendingJobs: PartialJob[] = await this.jobRepository.find( | ||
{ | ||
_environmentId: command.environmentId, | ||
transactionId: command.transactionId, | ||
status: [JobStatusEnum.PENDING], | ||
}, | ||
'_id type status _environmentId _subscriberId' | ||
); | ||
|
||
transactionJobs = [...transactionJobs, ...possiblePendingJobs]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting to see that we have pending jobs after delayed digest... wondering why we do create them in advance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the current state while we process a notification we create all its step's jobs as pending.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LetItRock If you think we should rethink the flow please create an initial ticket for us so we can track this thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the ticket actually: https://linear.app/novu/issue/NV-2671/optimize-regular-digest ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry, now i understand the comment you raised in discord better. Thanks for raising it up.
apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts
Outdated
Show resolved
Hide resolved
apps/worker/src/app/workflow/usecases/run-job/run-job.usecase.ts
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function is not digest filter related and we can move to utils/object in order to reuse it.
@@ -24,30 +22,12 @@ const LOG_CONTEXT = 'GetDigestEvents'; | |||
export abstract class GetDigestEvents { | |||
constructor(protected jobRepository: JobRepository, private createExecutionDetails: CreateExecutionDetails) {} | |||
|
|||
protected getJobDigest(job: JobEntity): { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function is not get-digest-events related and we can move to utils/digest in order to reuse it.
return null; | ||
} | ||
|
||
const { digestKey, digestValue } = getJobDigest(job); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after we moved the getNestedValue getJobDigest function into separated stateless functions now we can reuse getJobDigest here.
@@ -21,7 +21,7 @@ export class JobEntity { | |||
_notificationId: string; | |||
subscriberId: string; | |||
_subscriberId: string; | |||
_mergedDigestId?: string; | |||
_mergedDigestId?: string | null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, we can remove assignments from _mergedDigestId. When a merged digest becomes a main follower
, it's no longer merged, so it can be empty/null.
…o-cancel-any-arbitrary-events-from-digest # Conflicts: # apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events.usecase.ts # packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts
…o-cancel-any-arbitrary-events-from-digest # Conflicts: # apps/api/src/app/events/e2e/digest-events.e2e.ts
What change does this PR introduce?
Subscriber receives messages in a discord like platform. Each time a new message is sent to user, workflow is triggered and added into digest. This digest is send to user at some defined time of the day. before that defined time, if user sees any of the message, it should be removed from digest and rest of unseen messages should stay in digest
Current Working
Novu allows canceling only first event from digest. and if first event is cancelled then digest is deleted and all events in digest got deleted.
Why was this change needed?
Additional cancellation support.
Other information (Screenshots)
Defintion of Done.
if there are 4 events in digest, user should be able to cancel 3rd or 2nd or 4th event before digest time completion and that cancelled event should not be included in final digest events array
The current solution I came up with help from Gali, David, and Dima.
The resolution involves:
That means executing its digest, updating all associated child job steps, and queuing their jobs for execution under 'QueueNextJob'.
Another note that i want to raise is that i was trying to avoid creating a new job in the API level as it would mean that we would need to add
AddJob
use case and its queue service dep that connects tostandard
queue. Creating those changes in the API would corrupt its responsibility and couple us to thestandard
queue that we just decoupled.Miro REF - https://miro.com/app/board/uXjVNL6VbtQ=/
The main concern is about other than regular digest flow, as at his task i was mainly working with regular digest. would love to hear your opinion on the matter @davidsoderberg @ainouzgali