diff --git a/src/helpers/cache/index.js b/src/helpers/cache/index.js index 03cf099..7880da6 100644 --- a/src/helpers/cache/index.js +++ b/src/helpers/cache/index.js @@ -36,9 +36,15 @@ export default class Cache { this.#workerManager.setOnCacheUpdates((updates) => this.#handleCacheUpdates(updates)); + this.#workerManager.setOnCacheDeletions((deletions) => + this.#handleCacheDeletions(deletions)); + this.#workerManager.setOnCacheVersionRequest((domainId) => this.#handleCacheVersionRequest(domainId)); + this.#workerManager.setOnCachedDomainIdsRequest(() => + this.#handleCachedDomainIdsRequest()); + this.#workerManager.setOnError((error) => { Logger.error('Cache worker error:', error); }); @@ -47,10 +53,8 @@ export default class Cache { } async stopScheduledUpdates() { - if (this.#workerManager) { - await this.#workerManager.stop(); - this.#workerManager = null; - } + await this.#workerManager?.stop(); + this.#workerManager = null; } async #updateCache(domain) { @@ -62,7 +66,6 @@ export default class Cache { this.#set(domain._id, { data: reduceSnapshot(result.data.domain), - lastUpdate: domain.lastUpdate, version: result.data.domain.version }); } @@ -71,19 +74,25 @@ export default class Cache { for (const update of updates) { this.#set(update.domainId, { data: update.data, - lastUpdate: update.lastUpdate, version: update.version }); } } + #handleCacheDeletions(deletions) { + for (const domainId of deletions) { + this.#instance.delete(String(domainId)); + } + } + #handleCacheVersionRequest(domainId) { const cached = this.#instance.get(String(domainId)); - const cachedVersion = cached?.lastUpdate || null; - - if (this.#workerManager) { - this.#workerManager.sendCacheVersionResponse(domainId, cachedVersion); - } + this.#workerManager.sendCacheVersionResponse(domainId, cached?.version); + } + + #handleCachedDomainIdsRequest() { + const domainIds = Array.from(this.#instance.keys()); + this.#workerManager.sendCachedDomainIdsResponse(domainIds); } #set(key, value) { diff --git a/src/helpers/cache/worker-manager.js b/src/helpers/cache/worker-manager.js index 7054db7..7bd3dcb 100644 --- a/src/helpers/cache/worker-manager.js +++ b/src/helpers/cache/worker-manager.js @@ -12,8 +12,11 @@ export const EVENT_TYPE = { STOPPED: 'stopped', READY: 'ready', CACHE_UPDATES: 'cache-updates', + CACHE_DELETIONS: 'cache-deletions', REQUEST_CACHE_VERSION: 'request-cache-version', CACHE_VERSION_RESPONSE: 'cache-version-response', + REQUEST_CACHED_DOMAIN_IDS: 'request-cached-domain-ids', + CACHED_DOMAIN_IDS_RESPONSE: 'cached-domain-ids-response', ERROR: 'error' }; @@ -30,53 +33,69 @@ export class CacheWorkerManager { constructor(options = {}) { this.worker = null; this.status = STATUS_TYPE.STOPPED; + this.onCacheUpdates = null; + this.onCacheDeletions = null; + this.onCachedDomainIdsRequest = null; + this.onError = null; this.options = { interval: this.DEFAULT_INTERVAL, ...options }; - this.onCacheUpdates = null; - this.onError = null; + } + + #buildEvents(resolve) { + return new Map([ + [EVENT_TYPE.READY, () => { + this.worker.postMessage({ type: EVENT_TYPE.START }); + }], + [EVENT_TYPE.STARTED, () => { + this.status = STATUS_TYPE.RUNNING; + resolve(); + }], + [EVENT_TYPE.STOPPED, () => { + this.status = STATUS_TYPE.STOPPED; + }], + [EVENT_TYPE.CACHE_UPDATES, (message) => { + if (this.onCacheUpdates) { + this.onCacheUpdates(message.updates); + } + }], + [EVENT_TYPE.CACHE_DELETIONS, (message) => { + if (this.onCacheDeletions) { + this.onCacheDeletions(message.deletions); + } + }], + [EVENT_TYPE.REQUEST_CACHE_VERSION, (message) => { + if (this.onCacheVersionRequest) { + this.onCacheVersionRequest(message.domainId); + } + }], + [EVENT_TYPE.REQUEST_CACHED_DOMAIN_IDS, () => { + if (this.onCachedDomainIdsRequest) { + this.onCachedDomainIdsRequest(); + } + }], + [EVENT_TYPE.ERROR, (message) => { + if (this.onError) { + this.onError(new Error(message.error)); + } + }] + ]); } start() { return new Promise((resolve, reject) => { const workerPath = join(__dirname, 'worker.js'); + const eventHandlers = this.#buildEvents(resolve); + this.worker = new Worker(workerPath, { workerData: this.options }); this.worker.on('message', (message) => { - switch (message.type) { - case EVENT_TYPE.READY: - this.worker.postMessage({ type: EVENT_TYPE.START }); - break; - - case EVENT_TYPE.STARTED: - this.status = STATUS_TYPE.RUNNING; - resolve(); - break; - - case EVENT_TYPE.STOPPED: - this.status = STATUS_TYPE.STOPPED; - break; - - case EVENT_TYPE.CACHE_UPDATES: - if (this.onCacheUpdates) { - this.onCacheUpdates(message.updates); - } - break; - - case EVENT_TYPE.REQUEST_CACHE_VERSION: - if (this.onCacheVersionRequest) { - this.onCacheVersionRequest(message.domainId); - } - break; - - case EVENT_TYPE.ERROR: - if (this.onError) { - this.onError(new Error(message.error)); - } - break; + const handler = eventHandlers.get(message.type); + if (handler) { + handler(message); } }); @@ -129,8 +148,6 @@ export class CacheWorkerManager { }; this.worker.on('message', onMessage); - - // Send stop message this.worker.postMessage({ type: EVENT_TYPE.STOP }); }); } @@ -143,10 +160,18 @@ export class CacheWorkerManager { this.onCacheUpdates = callback; } + setOnCacheDeletions(callback) { + this.onCacheDeletions = callback; + } + setOnCacheVersionRequest(callback) { this.onCacheVersionRequest = callback; } + setOnCachedDomainIdsRequest(callback) { + this.onCachedDomainIdsRequest = callback; + } + sendCacheVersionResponse(domainId, cachedVersion) { if (this.worker && this.status === STATUS_TYPE.RUNNING) { this.worker.postMessage({ @@ -157,6 +182,15 @@ export class CacheWorkerManager { } } + sendCachedDomainIdsResponse(domainIds) { + if (this.worker && this.status === STATUS_TYPE.RUNNING) { + this.worker.postMessage({ + type: EVENT_TYPE.CACHED_DOMAIN_IDS_RESPONSE, + domainIds + }); + } + } + setOnError(callback) { this.onError = callback; } diff --git a/src/helpers/cache/worker.js b/src/helpers/cache/worker.js index 2573b08..f0f2c0f 100644 --- a/src/helpers/cache/worker.js +++ b/src/helpers/cache/worker.js @@ -9,7 +9,9 @@ let intervalId = null; let dbInitialized = false; let getAllDomains = null; -// Initialize worker and send ready signal +/** + * Initialize worker and send ready signal + */ (async () => { await initializeWorker(); if (dbInitialized) { @@ -17,10 +19,14 @@ let getAllDomains = null; } })(); +/** + * Initialize worker by opening database connection and loading services + */ async function initializeWorker() { try { await import('../../db/mongoose.js'); const domainService = await import('../../services/domain.js'); + getAllDomains = domainService.getAllDomains; dbInitialized = true; Logger.info('Worker database connection initialized'); @@ -33,7 +39,10 @@ async function initializeWorker() { } } -async function checkForUpdates() { +/** + * Refresh the cache by checking for updates and deletions. + */ +async function refreshCache() { if (isRunning || !dbInitialized) { return; } @@ -42,22 +51,8 @@ async function checkForUpdates() { try { const domains = await getAllDomains(); - const updates = []; - - for (const domain of domains) { - try { - const cacheCheckResult = await fetchCacheVersion(domain); - const dbVersion = domain.lastUpdate; - const cachedVersion = cacheCheckResult; - - if (isCacheOutdated(cachedVersion, dbVersion)) { - await updateDomainCacheSnapshot(domain, updates); - } - } catch (domainError) { - Logger.error(`Error processing domain ${domain._id}:`, domainError.message); - // Continue with next domain instead of failing completely - } - } + const deletions = await checkForDeletions(domains); + const updates = await checkForDomainUpdates(domains); if (updates.length > 0) { parentPort.postMessage({ @@ -65,6 +60,13 @@ async function checkForUpdates() { updates }); } + + if (deletions.length > 0) { + parentPort.postMessage({ + type: EVENT_TYPE.CACHE_DELETIONS, + deletions + }); + } } catch (error) { Logger.error('Worker checkForUpdates error:', error); parentPort.postMessage({ @@ -76,6 +78,46 @@ async function checkForUpdates() { } } +async function checkForDeletions(domains) { + const deletions = []; + + try { + const currentDomainIds = new Set(domains.map(domain => domain._id.toString())); + const cachedDomainIds = await getAllCachedDomainIds(); + + for (const cachedDomainId of cachedDomainIds) { + if (!currentDomainIds.has(cachedDomainId)) { + deletions.push(cachedDomainId); + } + } + } catch (error) { + Logger.error('Error checking for deletions:', error); + } + + return deletions; +} + +async function checkForDomainUpdates(domains) { + const updates = []; + + for (const domain of domains) { + try { + const cacheCheckResult = await fetchCacheVersion(domain); + const dbVersion = domain.lastUpdate; + const cachedVersion = cacheCheckResult; + + if (isCacheOutdated(cachedVersion, dbVersion)) { + await updateDomainCacheSnapshot(domain, updates); + } + } catch (domainError) { + Logger.error(`Error processing domain ${domain._id}:`, domainError.message); + // Continue with next domain instead of failing completely + } + } + + return updates; +} + function isCacheOutdated(cachedVersion, dbVersion) { return !cachedVersion || dbVersion !== cachedVersion; } @@ -95,7 +137,6 @@ async function updateDomainCacheSnapshot(domain, updates) { updates.push({ domainId: domain._id.toString(), data: reduceSnapshot(result.data.domain), - lastUpdate: domain.lastUpdate, version: result.data.domain.version }); } @@ -109,7 +150,7 @@ async function fetchCacheVersion(domain) { }, 1000); const messageHandler = (message) => { - if (message.type === EVENT_TYPE.CACHE_VERSION_RESPONSE && message.domainId === domain._id.toString()) { + if (isCacheVersionResponseFromDomain(message, domain)) { clearTimeout(timeout); parentPort.off('message', messageHandler); resolve(message.cachedVersion); @@ -124,13 +165,42 @@ async function fetchCacheVersion(domain) { }); } -// Handle messages from main thread +function isCacheVersionResponseFromDomain(message, domain) { + return message.type === EVENT_TYPE.CACHE_VERSION_RESPONSE && + message.domainId === domain._id.toString(); +} + +async function getAllCachedDomainIds() { + return await new Promise((resolve) => { + const timeout = setTimeout(() => { + parentPort.off('message', messageHandler); + resolve([]); + }, 1000); + + const messageHandler = (message) => { + if (message.type === EVENT_TYPE.CACHED_DOMAIN_IDS_RESPONSE) { + clearTimeout(timeout); + parentPort.off('message', messageHandler); + resolve(message.domainIds); + } + }; + + parentPort.on('message', messageHandler); + parentPort.postMessage({ + type: EVENT_TYPE.REQUEST_CACHED_DOMAIN_IDS + }); + }); +} + +/** + * Handle messages from main thread + */ parentPort.on('message', async (message) => { try { switch (message.type) { case EVENT_TYPE.START: if (!intervalId && dbInitialized) { - intervalId = setInterval(() => checkForUpdates(), interval); + intervalId = setInterval(() => refreshCache(), interval); parentPort.postMessage({ type: EVENT_TYPE.STARTED }); } else if (!dbInitialized) { parentPort.postMessage({ diff --git a/tests/unit-test/cache.test.js b/tests/unit-test/cache.test.js index ae2e88e..9d937c4 100644 --- a/tests/unit-test/cache.test.js +++ b/tests/unit-test/cache.test.js @@ -47,11 +47,11 @@ describe('Test cache', () => { expect(cache.status()).toBe('running'); }); - test('UNIT_SUITE - Should update cache when new domain version is available', async () => { + test('UNIT_SUITE - Should update cache when new Domain version is available', async () => { // test cache = Cache.getInstance(); await cache.initializeCache(); - await cache.startScheduledUpdates(); + await cache.startScheduledUpdates({ interval: 500 }); // assert expect(cache.status()).toBe('running'); @@ -59,17 +59,59 @@ describe('Test cache', () => { // update DB Domain version await Domain.findByIdAndUpdate(domainId, { $inc: { lastUpdate: 1 } }); - const { updatedSuccessfully, domainFromCache } = await waitForDomainUpdate(domain.version, 10, 1000); + const { updatedSuccessfully, domainFromCache } = await waitForDomainUpdate(domainId, domain.version, 10, 500); + + expect(domainFromCache).toBeDefined(); + expect(updatedSuccessfully).toBe(true); + }, 20000); + + test('UNIT_SUITE - Should update cache when new Domain is created', async () => { + // test + cache = Cache.getInstance(); + await cache.initializeCache(); + await cache.startScheduledUpdates({ interval: 500 }); + + // assert + expect(cache.status()).toBe('running'); + + // create new DB Domain + const newDomain = await Domain.create({ name: 'New Domain', lastUpdate: 1, owner: new mongoose.Types.ObjectId() }); + await Domain.findByIdAndUpdate(newDomain._id, { $inc: { lastUpdate: 1 } }); + const { updatedSuccessfully, domainFromCache } = await waitForDomainUpdate(newDomain._id, newDomain.lastUpdate, 10, 500); expect(domainFromCache).toBeDefined(); expect(updatedSuccessfully).toBe(true); }, 20000); + test('UNIT_SUITE - Should update cache when Domain is deleted', async () => { + // test + cache = Cache.getInstance(); + await cache.initializeCache(); + await cache.startScheduledUpdates({ interval: 500 }); + + // assert + expect(cache.status()).toBe('running'); + + // create new DB Domain + const newDomain = await Domain.create({ name: 'Delete Me', lastUpdate: 1, owner: new mongoose.Types.ObjectId() }); + await Domain.findByIdAndUpdate(newDomain._id, { $inc: { lastUpdate: 1 } }); + let { updatedSuccessfully, domainFromCache } = await waitForDomainUpdate(newDomain._id, newDomain.lastUpdate, 10, 500); + + expect(domainFromCache).toBeDefined(); + expect(updatedSuccessfully).toBe(true); + + // delete DB Domain + await Domain.findByIdAndDelete(newDomain._id); + await new Promise(resolve => setTimeout(resolve, 1000)); + + domainFromCache = cache.get(newDomain._id); + expect(domainFromCache).toBeUndefined(); + }, 20000); }); // Helpers -async function waitForDomainUpdate(currentDomainVersion, maxAttempts, delay) { +async function waitForDomainUpdate(domainId, currentVersion, maxAttempts, delay) { let domainFromCache; let attempt = 0; let updatedSuccessfully = false; @@ -78,10 +120,13 @@ async function waitForDomainUpdate(currentDomainVersion, maxAttempts, delay) { await new Promise(resolve => setTimeout(resolve, delay)); attempt++; - domainFromCache = cache.get(domainId); - if (domainFromCache.version != currentDomainVersion) { - updatedSuccessfully = true; + domainFromCache = cache.get(domainId) + if (domainFromCache != null) { + if (domainFromCache.version != currentVersion) { + updatedSuccessfully = true; + } } } + return { updatedSuccessfully, domainFromCache }; }