Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): implement distributed lock to fix digest concurrency issues #2719

Merged
merged 4 commits into from
Feb 15, 2023

Conversation

p-fernandez
Copy link
Contributor

What change does this PR introduce?

Fixes the Digest implementation in a way that avoids when concurrent calls the notifications are not duplicated anymore and the digest is working as expected.
For that takes advantage of a distributed lock implementation with Redlock (Redis based).

Why was this change needed?

We were having multiple reports of Digest not working as expected. This is one step forward to fix some of the problems addressed.

Other information (Screenshots)

@p-fernandez p-fernandez self-assigned this Feb 9, 2023
@linear
Copy link

linear bot commented Feb 9, 2023

NV-1561 🐛 Bug Report: Duplicate emails when using digest

📜 Description

When more than 1 event is triggered in the same second (not exactly same timestamp) and there is a digest, sometimes I receive 2 or more emails for the same digest (events are in different order).

👟 Reproduction steps

  1. create template with digest
  2. trigger more than 1 event to that template in less than 1 second

👍 Expected behavior

send only 1 notification

👎 Actual Behavior with Screenshots

sends more than 1 email, with different event order.

🤖 Node Version

v16.13.1

📃 Provide any additional context for the Bug.

  • The digest time is 5 minutes but it happened also when tested on other times.
  • When triggering the event with novu sdk we send the recipient where the email and subscriberId are both the email address.
  • We trigger 1 event for each recipient separately as we though it might fix this bug but it didn't. before that we triggered once with a recipient list.
  • currently running our own hosting of Novu version 0.9.0 but this also happened when we used cloud hosted novu version 0.10.0

👀 Have you spent some time to check if this bug has been raised before?

  • I checked and didn't find similar issue

🏢 Have you read the Contributing Guidelines?

Are you willing to submit PR?

Yes I am willing to submit a PR!

@p-fernandez p-fernandez force-pushed the nv-1561-bug-report-duplicate-emails-when-using branch 2 times, most recently from f6ba8ed to c33f6ab Compare February 9, 2023 16:51
Comment on lines +57 to +64
workflowQueueService = session.testServer?.getService(WorkflowQueueService);

runJob = new RunJob(
jobRepository,
session.testServer.getService(SendMessage),
session.testServer.getService(QueueNextJob),
session.testServer.getService(StorageHelperService)
session.testServer?.getService(SendMessage),
session.testServer?.getService(QueueNextJob),
session.testServer?.getService(StorageHelperService)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linter warnings.

Comment on lines +125 to +139
const digestJob = jobs.find((job) => job.step?.template?.type === StepTypeEnum.DIGEST);
expect(digestJob?.digest?.amount).to.equal(5);
expect(digestJob?.digest?.unit).to.equal(DigestUnitEnum.MINUTES);
const job = jobs.find((item) => item.digest?.events?.length && item.digest.events.length > 0);
expect(job?.digest?.events?.length).to.equal(2);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linter warnings

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all heroes wear capes

}),
]);

await session.awaitRunningJobs(template?._id, false, 5);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We trigger 5 events so we would expect 5 running jobs, therefore the third param of this helper, unfinishedJobs is set to 5 to move on with the test execution.

@@ -52,9 +52,9 @@ export class AddDelayJob {
throw new ApiException(`Delay date at path ${delayPath} must be a future date`);
}

const noiIdenticalDelay = await this.noExistingDelayedJobForDate(data, delayPath, delayDate);
const noIdenticalDelay = await this.noExistingDelayedJobForDate(data, delayPath, delayDate);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

Comment on lines +77 to +78
currentDelayPath: string,
currentDelayDate: string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid cSpell complaining.

Comment on lines 73 to 92
private async shouldDelayDigestOrMergeWithLock(job: JobEntity): Promise<IFindAndUpdateResponse> {
const resource = `environment:${job._environmentId}:template:${job._templateId}:subscriber:${job._subscriberId}`;
const TTL = 250;

const shouldDelayDigestDigestJobOrMerge = async () => this.jobRepository.shouldDelayDigestJobOrMerge(job);

return await applyLock<IFindAndUpdateResponse>(
{
resource,
ttl: TTL,
},
shouldDelayDigestDigestJobOrMerge
);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock application. We surround the starting point of code we want to lock, so any call to that point of code, if the lock is taken, will be halted and retried once the lock is being freed.

const resource = `environment:${job._environmentId}:template:${job._templateId}:subscriber:${job._subscriberId}`;
const TTL = 250;

const shouldDelayDigestDigestJobOrMerge = async () => this.jobRepository.shouldDelayDigestJobOrMerge(job);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason of setting the arrow function like this instead directly as an argument is to have the function named and therefore use the name of the functions for the logs inside of the distributed lock to identify in the future potential failures.

}

private async shouldDelayDigestOrMergeWithLock(job: JobEntity): Promise<IFindAndUpdateResponse> {
const resource = `environment:${job._environmentId}:template:${job._templateId}:subscriber:${job._subscriberId}`;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opinionated structure. But the key params that identify if we have to merge digests that happen at the same time is based on these 3 values: environmentId, templateId and subscriberId.
If not using them we would expect faulty behaviour of the lock and the digest implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add here the digestKey as part of the lock? Since different digest keys might not affect each other and other is no need to lock them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can temporarily do it in case digestKey is provided. But I need to revisit the digestKey feature after this anyway. There have been feedback is not working properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's leave it for later than


private async shouldDelayDigestOrMergeWithLock(job: JobEntity): Promise<IFindAndUpdateResponse> {
const resource = `environment:${job._environmentId}:template:${job._templateId}:subscriber:${job._subscriberId}`;
const TTL = 250;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrary. Can be tuned.


if (job.type === StepTypeEnum.DIGEST && digestAmount === undefined) {
return;
}

if (digestAmount === undefined && delayAmount == undefined) {
if (digestAmount === undefined && delayAmount === undefined) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was simplified based on we are returning now undefined in the negate brace of the ternaries of calculating digestAmount and delayAmount.

Comment on lines -7 to -15
export class ShouldAddDigestJob {
constructor(private jobRepository: JobRepository) {}

public async execute(command: AddJobCommand): Promise<boolean> {
const data = await this.jobRepository.findById(command.jobId);

const isValidDigestStep = data.type === StepTypeEnum.DIGEST && data.digest.amount && data.digest.unit;
if (!isValidDigestStep || !data) {
return true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrarily I decided to remove this use case and integrate the logic in the AddDigestJob use case to be able to simplify the code. We were hitting the database extra times without any need (in my opinion for this flow) and therefore having to do many validations that were already checked, just because in the case this use case was used standalone.

apps/api/src/bootstrap.ts Outdated Show resolved Hide resolved
Comment on lines 19 to 23
automaticExtensionThreshold: 500,
driftFactor: 0.01,
retryCount: 50,
retryDelay: 100,
retryJitter: 200,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default settings (as recommended by Redlock).

Comment on lines 30 to 31
// TODO: Implement distributed nodes (at least 3 Redis instances)
const rawInstances = [redisUrl].filter((instance) => !!instance).map((url) => new Redis(url));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs an architecture and resource change needed to discuss. We might be able to sustain the functionality right now for a while.
@Cliftonz @scopsy we need to discuss this.

// Ignore cases where a resource is explicitly marked as locked on a client.
if (error instanceof ResourceLockedError) {
// TODO: Hide or move to trace.
Logger.error(error, LOG_CONTEXT);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logs the times some call hits an used lock. We do not need to log them but right now while we see how well the implementation works, might be useful to understand the process. Move it to debug level could be a good idea.

Comment on lines 122 to 135
export const applyLock = async <T>({ resource, ttl }: ILockOptions, handler: () => Promise<T>): Promise<T> => {
const releaseLock = await lock(resource, ttl);

try {
Logger.log(`Lock ${resource} for ${handler.name}`, LOG_CONTEXT);

const result = await handler();

return result;
} finally {
await releaseLock();
Logger.log(`Lock ${resource} released for ${handler.name}`, LOG_CONTEXT);
}
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically a wrapper that creates the needed lock, executes the closure given and releases the lock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the implementtation with the callback here

Comment on lines +170 to +206
public async shouldDelayDigestJobOrMerge(job: JobEntity): Promise<{ matched: number; modified: number }> {
const execution = {
matched: 0,
modified: 0,
};

const delayedDigestJobs = await this._model.find({
status: JobStatusEnum.DELAYED,
type: StepTypeEnum.DIGEST,
_templateId: job._templateId,
_environmentId: this.convertStringToObjectId(job._environmentId),
_subscriberId: this.convertStringToObjectId(job._subscriberId),
});

const matched = delayedDigestJobs.length;
execution.matched = matched;

if (execution.matched === 0) {
const updatedDigestJob = await this._model.updateOne(
{
_environmentId: job._environmentId,
_templateId: job._templateId,
_subscriberId: job._subscriberId,
_id: job._id,
},
{
$set: {
status: JobStatusEnum.DELAYED,
},
}
);

execution.modified = updatedDigestJob.modifiedCount;
}

return execution;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We moved the previous domain logic to the DAL layer to be able to fully lock it at DAL layer with the distributed lock. Unfortunately I haven't been able to implement with Mongoose/MongoDB functionalities the logic:
find if there is a existing delayed job for this subscriber-template-enviroment -> if not found make this certain job as the digest one delayed.
That was my goal. But weren't able to fix it with workflow changes or MongoDB functionalities (API, transactions and locks, etc)

That logic was implemented before in the domain. When concurrent calls hit it, there was a race condition as both calls when trying to find a existing delayed job didn't find anything, creating more than one single delayed job, therefore digest not working properly and duplicating the number of notifications sent.

@p-fernandez p-fernandez force-pushed the nv-1561-bug-report-duplicate-emails-when-using branch 7 times, most recently from eeafbf5 to f814a83 Compare February 9, 2023 19:07
@p-fernandez p-fernandez force-pushed the nv-1561-bug-report-duplicate-emails-when-using branch 16 times, most recently from b4e09ca to c240993 Compare February 14, 2023 12:32

const delayedJob = delayedJobs[0];

const updatedAt = delayedJob?.updatedAt as string;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TS warnings removal

Comment on lines -19 to -21
if (delayedDigests) {
continue;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit with the old shouldAddDigestUseCase were creating a double race condition that were mutually dependent.
When creating steps we were checking if there was a delayed digest and when deciding if adding the digest or not we were checking if existed or not.
We have yet to clarify the digest code flow a bit (in a forthcoming PR) but we decided to go through the direction of always creating all the steps for a digest job, no matter what, and at the time of deciding if adding the Digest job or not to the queue, either mark as merged/digested the steps for the digest jobs that are merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -42,7 +42,7 @@ export class DigestFilterSteps {
_creatorId: command.userId,
type: StepTypeEnum.TRIGGER,
content: '',
} as any,
} as MessageTemplateEntity,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to remove the any and the warning. This object misses 2 properties to not raise the warn.

@p-fernandez p-fernandez force-pushed the nv-1561-bug-report-duplicate-emails-when-using branch from c240993 to be3cbbd Compare February 14, 2023 12:52
Copy link
Contributor

@scopsy scopsy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing work! Added a few minor comments, other than that feel free to merge

Comment on lines +125 to +139
const digestJob = jobs.find((job) => job.step?.template?.type === StepTypeEnum.DIGEST);
expect(digestJob?.digest?.amount).to.equal(5);
expect(digestJob?.digest?.unit).to.equal(DigestUnitEnum.MINUTES);
const job = jobs.find((item) => item.digest?.events?.length && item.digest.events.length > 0);
expect(job?.digest?.events?.length).to.equal(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all heroes wear capes

Comment on lines +624 to +626
expect(delayedJobs.length).to.eql(1);

const delayedJob = delayedJobs[0];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are great test additions to make sure we are not generating more than needed

Comment on lines 7 to 11
private readonly distributedLockService: DistributedLockService;

constructor() {
this.distributedLockService = new DistributedLockService();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private readonly distributedLockService: DistributedLockService;
constructor() {
this.distributedLockService = new DistributedLockService();
}
private readonly distributedLockService = new DistributedLockService();
constructor() {
}

Minor not important cleaning could be made :)

this.distributedLockService = new DistributedLockService();
}

public async applyLock<T>(settings, handler): Promise<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any TS annotations we can add here for settings and handler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For settings we had it in Redlock 5. When downgrading we lost it. I can add it for handler.

Comment on lines -19 to -21
if (delayedDigests) {
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -41,16 +38,17 @@ export class DigestFilterStepsRegular {
}

private async getDigest(command: DigestFilterStepsCommand, step: NotificationStepEntity) {
const where: any = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏

const redisUrl =
process.env.REDIS_HOST && process.env.REDIS_PORT
? `${process.env.REDIS_HOST}:${process.env.REDIS_PORT}`
: 'localhost:6379';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think we can throw an error here instead of falling back to localhost, wdyt? We also have a validation in the env-validator for this value as a required value. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go for it, to have extra safety in case environment validator somehow stops working.

export class DistributedLockService {
public distributedLock: Redlock;
public instances: Redis[];
public lockCounter = {};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the system is distributed and this will be on each instance of the service, what is the user of this lockCounter? Should we maybe remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lockCounter is needed for the shutdown operation to avoid the system to shutdown while a lock is in operation (safety measure). What we need to do, and I think it got lost in my previous comments, is to know where I can engage the shutdown functionality in our Nest app, because I haven't found any way where we do operations before app Nest shutdown. Can you point me where we could do that?

Comment on lines +30 to +33
driftFactor: 0.01,
retryCount: 50,
retryDelay: 100,
retryJitter: 200,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the TTL and retry delay and count higher? (maybe just retryCount)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retryDelay might be ok to increase it. retryCount is that it will try 50 times to execute the operation before dismissing it so potentially a total amount of retryCount * retryDelay = 5000 ms now. So I would even decrease the retryCount.

@p-fernandez p-fernandez force-pushed the nv-1561-bug-report-duplicate-emails-when-using branch from 1a57de7 to 6f0750c Compare February 15, 2023 15:48
@p-fernandez p-fernandez added this pull request to the merge queue Feb 15, 2023
Merged via the queue into next with commit e1e1603 Feb 15, 2023
@p-fernandez p-fernandez deleted the nv-1561-bug-report-duplicate-emails-when-using branch February 15, 2023 17:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants