Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto Remove Deleted Subscribers from topic #5187

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import axios from 'axios';
import { beforeEach } from 'mocha';
import { expect } from 'chai';

import { ExternalSubscriberId, TopicId, TopicKey, TopicName } from '@novu/shared';
import { SubscriberEntity, SubscriberRepository, TopicSubscribersRepository } from '@novu/dal';
import { UserSession, SubscribersService } from '@novu/testing';

import { topicSubscriberNormalize } from './topic-subscriber-normalize.migration';

const axiosInstance = axios.create();
const TOPIC_PATH = '/v1/topics';

describe('Remove all the stale topic subscriber relations', () => {
let session: UserSession;
let subscriberService: SubscribersService;
const subscriberRepository = new SubscriberRepository();
const topicSubscribersRepository = new TopicSubscribersRepository();

beforeEach(async () => {
session = new UserSession();
await session.initialize();
subscriberService = new SubscribersService(session.organization._id, session.environment._id);
});

it('should remove topic subscriber relation record on removed subscribers', async () => {
const subscriberId = '123';
const createdSubscriber = await subscriberService.createSubscriber({ subscriberId: subscriberId });
const firstTopicKey = `topic-key-1-trigger-event`;
const firstTopicName = `topic-name-1-trigger-event`;
const newTopic = await createTopic(session, firstTopicKey, firstTopicName);
await addSubscribersToTopic(session, { _id: newTopic._id, key: newTopic.key }, [createdSubscriber]);

// create subscriber and its relation to topic
const subscriber = await subscriberRepository.findBySubscriberId(session.environment._id, subscriberId);
const topicSubscriber = await topicSubscribersRepository.findOne({
_environmentId: session.environment._id,
_organizationId: session.organization._id,
externalSubscriberId: subscriberId,
});

if (!subscriber) {
expect(subscriber).to.be.ok;
throw new Error('Subscriber not found');
}
if (!topicSubscriber) {
expect(topicSubscriber).to.be.ok;
throw new Error('topicSubscriber not found');
}

expect(subscriber.subscriberId).to.be.equal(subscriberId);
expect(topicSubscriber.externalSubscriberId).to.be.equal(subscriberId);
// END - create subscriber and its relation to topic

await subscriberRepository.delete({
_environmentId: subscriber._environmentId,
_organizationId: subscriber._organizationId,
subscriberId: subscriber.subscriberId,
});

const subscriberAfterDeletion = await subscriberRepository.findBySubscriberId(
session.environment._id,
subscriberId
);
const topicSubscriberAfterDeletion = await topicSubscribersRepository.findOne({
_environmentId: session.environment._id,
_organizationId: session.organization._id,
externalSubscriberId: subscriberId,
});

expect(subscriberAfterDeletion).to.not.be.ok;
expect(topicSubscriberAfterDeletion).to.be.ok;

await topicSubscriberNormalize();

const topicSubscriberAfterMigration = await topicSubscribersRepository.findOne({
_environmentId: session.environment._id,
_organizationId: session.organization._id,
externalSubscriberId: subscriberId,
});

expect(topicSubscriberAfterMigration).to.not.be.ok;
});
});

const createTopic = async (
session: UserSession,
key: TopicKey,
name: TopicName
): Promise<{ _id: TopicId; key: TopicKey }> => {
const response = await axiosInstance.post(
`${session.serverUrl}${TOPIC_PATH}`,
{
key,
name,
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);

expect(response.status).to.eql(201);
const body = response.data;
expect(body.data._id).to.exist;
expect(body.data.key).to.eql(key);

return body.data;
};

const addSubscribersToTopic = async (
session: UserSession,
createdTopicDto: { _id: TopicId; key: TopicKey },
subscribers: SubscriberEntity[]
) => {
const subscriberIds: ExternalSubscriberId[] = subscribers.map(
(subscriber: SubscriberEntity) => subscriber.subscriberId
);

const response = await axiosInstance.post(
`${session.serverUrl}${TOPIC_PATH}/${createdTopicDto.key}/subscribers`,
{
subscribers: subscriberIds,
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);

expect(response.status).to.be.eq(200);
expect(response.data.data).to.be.eql({
succeeded: subscriberIds,
});
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import '../../src/config';

import { NestFactory } from '@nestjs/core';
import { SubscriberRepository, TopicSubscribersRepository } from '@novu/dal';

import { AppModule } from '../../src/app.module';

/*
* topic subscriber normalize - will remove deleted subscribers from topic subscribers
*/
export async function topicSubscriberNormalize() {
// eslint-disable-next-line no-console
console.log('start migration - topic subscriber normalize - will remove deleted subscribers from topic subscribers');

const app = await NestFactory.create(AppModule, {
logger: false,
});
const topicSubscribersRepository = app.get(TopicSubscribersRepository);
const subscriberRepository = app.get(SubscriberRepository);

const cursor = await topicSubscribersRepository._model
.find({} as any)
.batchSize(1000)
.cursor();

for await (const topicSubscriber of cursor) {
const subscriber = await subscriberRepository.findBySubscriberId(
topicSubscriber._environmentId.toString(),
topicSubscriber.externalSubscriberId
);

if (!subscriber) {
// eslint-disable-next-line no-console
console.log(
`remove relation topic subscriber ${topicSubscriber.externalSubscriberId} from topic ${topicSubscriber._topicId}`
);

await topicSubscribersRepository.delete({
_environmentId: topicSubscriber._environmentId.toString(),
_organizationId: topicSubscriber._organizationId,
externalSubscriberId: topicSubscriber.externalSubscriberId,
});
}
}

// eslint-disable-next-line no-console
console.log('end migration- topic subscriber normalize');

app.close();
}
98 changes: 95 additions & 3 deletions apps/api/src/app/subscribers/e2e/remove-subscriber.e2e.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import { UserSession } from '@novu/testing';
import { SubscriberRepository } from '@novu/dal';
import { SubscribersService, UserSession } from '@novu/testing';
import { SubscriberEntity, SubscriberRepository, TopicSubscribersRepository } from '@novu/dal';
import { expect } from 'chai';
import axios from 'axios';
import { ExternalSubscriberId, TopicId, TopicKey, TopicName } from '@novu/shared';

const axiosInstance = axios.create();
const TOPIC_PATH = '/v1/topics';

describe('Delete Subscriber - /subscribers/:subscriberId (DELETE)', function () {
let session: UserSession;
let subscriberService: SubscribersService;
const subscriberRepository = new SubscriberRepository();
const topicSubscribersRepository = new TopicSubscribersRepository();

beforeEach(async () => {
session = new UserSession();
await session.initialize();
subscriberService = new SubscribersService(session.organization._id, session.environment._id);
});

it('should delete an existing subscriber', async function () {
Expand Down Expand Up @@ -49,8 +54,95 @@ describe('Delete Subscriber - /subscribers/:subscriberId (DELETE)', function ()
_environmentId: session.environment._id,
subscriberId: '123',
})
)[0];
)?.[0];

expect(deletedSubscriber.deleted).to.equal(true);
});

it('should dispose subscriber relations to topic once he was removed', async () => {
const subscriberId = '123';

const subscriber = await subscriberService.createSubscriber({ subscriberId: subscriberId });
for (let i = 0; i < 50; i++) {
const firstTopicKey = `topic-key-${i}-trigger-event`;
const firstTopicName = `topic-name-${i}-trigger-event`;
const newTopic = await createTopic(session, firstTopicKey, firstTopicName);
await addSubscribersToTopic(session, { _id: newTopic._id, key: newTopic.key }, [subscriber]);
}

const createdRelations = await topicSubscribersRepository.find({
_environmentId: session.environment._id,
_organizationId: session.organization._id,
externalSubscriberId: subscriberId,
});

expect(createdRelations.length).to.equal(50);

await axiosInstance.delete(`${session.serverUrl}/v1/subscribers/${subscriberId}`, {
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
});

const deletedRelations = await topicSubscribersRepository.find({
_environmentId: session.environment._id,
_organizationId: session.organization._id,
externalSubscriberId: subscriberId,
});

expect(deletedRelations.length).to.equal(0);
});
});

const createTopic = async (
session: UserSession,
key: TopicKey,
name: TopicName
): Promise<{ _id: TopicId; key: TopicKey }> => {
const response = await axiosInstance.post(
`${session.serverUrl}${TOPIC_PATH}`,
{
key,
name,
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);

expect(response.status).to.eql(201);
const body = response.data;
expect(body.data._id).to.exist;
expect(body.data.key).to.eql(key);

return body.data;
};

const addSubscribersToTopic = async (
session: UserSession,
createdTopicDto: { _id: TopicId; key: TopicKey },
subscribers: SubscriberEntity[]
) => {
const subscriberIds: ExternalSubscriberId[] = subscribers.map(
(subscriber: SubscriberEntity) => subscriber.subscriberId
);

const response = await axiosInstance.post(
`${session.serverUrl}${TOPIC_PATH}/${createdTopicDto.key}/subscribers`,
{
subscribers: subscriberIds,
},
{
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
}
);

expect(response.status).to.be.eq(200);
expect(response.data.data).to.be.eql({
succeeded: subscriberIds,
});
};
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Injectable } from '@nestjs/common';
import { SubscriberRepository, DalException } from '@novu/dal';
import { SubscriberRepository, DalException, TopicSubscribersRepository } from '@novu/dal';
import { buildSubscriberKey, InvalidateCacheService } from '@novu/application-generic';

import { RemoveSubscriberCommand } from './remove-subscriber.command';
Expand All @@ -11,7 +11,8 @@ export class RemoveSubscriber {
constructor(
private invalidateCache: InvalidateCacheService,
private subscriberRepository: SubscriberRepository,
private getSubscriber: GetSubscriber
private getSubscriber: GetSubscriber,
private topicSubscribersRepository: TopicSubscribersRepository
) {}

async execute(command: RemoveSubscriberCommand) {
Expand All @@ -35,6 +36,12 @@ export class RemoveSubscriber {
_organizationId: subscriber._organizationId,
subscriberId: subscriber.subscriberId,
});

await this.topicSubscribersRepository.delete({
_environmentId: subscriber._environmentId,
_organizationId: subscriber._organizationId,
externalSubscriberId: subscriber.subscriberId,
});
Comment on lines +40 to +44
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, that was easy peasy :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

easy peasy but we postponed the async matter here, but imo we definitely should start the discussion in order to support possible long processing into an async service like an external queue.
REF https://linear.app/novu/issue/NV-3230/auto-remove-deleted-subscribers-from-topic#comment-13955c84

} catch (e) {
if (e instanceof DalException) {
throw new ApiException(e.message);
Expand Down
Loading