From 4f7b2b930ad6bd793d88c9466209805962d7a0b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20S=C3=A1ez?= Date: Mon, 8 Aug 2022 17:24:01 -0400 Subject: [PATCH] pipeline redis sets & don't await redis set by default (with new option) (#8) * redis pipeline set * New "awaitRedisSet" option to allow skipping awaiting the Redis set on getCached logic execution * put "size" param first on pipelined logging * change default value of awaitRedisSet * remove changeset * pipeline set * wrap pipeline set error * fix * remove wrapping --- .changeset/four-geese-warn.md | 7 ++ .changeset/silver-glasses-warn.md | 5 + src/fineGrained.ts | 191 ++++++++++++++++++++++++++---- test/fineGrained.spec.ts | 69 +++++++++-- test/utils.ts | 1 + 5 files changed, 244 insertions(+), 29 deletions(-) create mode 100644 .changeset/four-geese-warn.md create mode 100644 .changeset/silver-glasses-warn.md diff --git a/.changeset/four-geese-warn.md b/.changeset/four-geese-warn.md new file mode 100644 index 0000000..116f073 --- /dev/null +++ b/.changeset/four-geese-warn.md @@ -0,0 +1,7 @@ +--- +"@soundxyz/fine-grained-cache": minor +--- + +New "awaitRedisSet" option to allow skipping awaiting the Redis set on getCached logic execution + +Default value is `process.env.NODE_ENV === "test"` diff --git a/.changeset/silver-glasses-warn.md b/.changeset/silver-glasses-warn.md new file mode 100644 index 0000000..bbef517 --- /dev/null +++ b/.changeset/silver-glasses-warn.md @@ -0,0 +1,5 @@ +--- +"@soundxyz/fine-grained-cache": minor +--- + +New "pipelineRedisSET" option to pipeline redis SETs diff --git a/src/fineGrained.ts b/src/fineGrained.ts index 1695bb7..bd4bc90 100644 --- a/src/fineGrained.ts +++ b/src/fineGrained.ts @@ -61,6 +61,7 @@ export const Events = { INVALIDATED_KEYS: "INVALIDATED_KEYS", EXECUTION_TIME: "EXECUTION_TIME", PIPELINED_REDIS_GETS: "PIPELINED_REDIS_GETS", + PIPELINED_REDIS_SET: "PIPELINED_REDIS_SET", REDLOCK_ACQUIRED: "REDLOCK_ACQUIRED", REDLOCK_RELEASED: "REDLOCK_RELEASED", REDLOCK_GET_AFTER_ACQUIRE: "REDLOCK_GET_AFTER_ACQUIRE", @@ -86,7 +87,9 @@ export function FineGrainedCache({ logEvents, GETRedisTimeout, pipelineRedisGET, + pipelineRedisSET, defaultUseMemoryCache = true, + awaitRedisSet = process.env.NODE_ENV === "test", }: { redis: Redis; redLock?: { @@ -119,6 +122,14 @@ export function FineGrainedCache({ * If "number" is specified, that's the maximum amount of operations to be sent in a single pipeline */ pipelineRedisGET?: boolean | number; + + /** + * Enable usage of redis pipelines for redis SET. + * + * If "number" is specified, that's the maximum amount of operations to be sent in a single pipeline + */ + pipelineRedisSET?: boolean | number; + /** * Should `getCached` use memory cache by default? * @@ -127,6 +138,13 @@ export function FineGrainedCache({ * @default true */ defaultUseMemoryCache?: boolean; + + /** + * Should `getCached` await the Redis set + * + * @default process.env.NODE_ENV === "test" + */ + awaitRedisSet?: boolean; }) { const redLock = redLockConfig?.client; const defaultMaxExpectedTime = redLockConfig?.maxExpectedTime || "5 seconds"; @@ -236,12 +254,12 @@ export function FineGrainedCache({ if (tracing) { logMessage("PIPELINED_REDIS_GETS", { + size, keys: commands.map(([, key]) => key).join(","), cache: results ?.map(([, result]) => (typeof result === "string" ? "HIT" : "MISS")) .join(",") || "null", - size, time: tracing(), }); } @@ -269,6 +287,108 @@ export function FineGrainedCache({ } } + let pendingRedisSets: { key: string; promise: DeferredPromise; ttl?: number }[] = []; + + let pendingRedisSetTimeout: ReturnType | undefined; + + function pipelinedRedisSet({ key, value, ttl }: { key: string; value: string; ttl?: number }) { + if (pendingRedisSetTimeout !== undefined) { + clearTimeout(pendingRedisSetTimeout); + } + + if (typeof pipelineRedisSET === "number" && pendingRedisSets.length >= pipelineRedisSET) { + executePipeline(); + } + + const promise = createDeferredPromise(); + + pendingRedisSets.push({ + key, + promise, + ttl, + }); + + pendingRedisSetTimeout = setTimeout(executePipeline); + + return promise.promise; + + async function executePipeline() { + pendingRedisSetTimeout = undefined; + + const size = pendingRedisSets.length; + const { promises, commands } = pendingRedisSets.reduce<{ + promises: { + promise: DeferredPromise; + index: number; + key: string; + ttl: number | undefined; + }[]; + commands: Array< + | [cmd: "set", key: string, value: string] + | [cmd: "setex", key: string, ttl: number, value: string] + >; + }>( + (acc, { key, promise, ttl }, index) => { + acc.promises[index] = { + promise, + index, + key, + ttl, + }; + + if (ttl != null) { + acc.commands[index] = ["setex", key, ttl, value]; + } else { + acc.commands[index] = ["set", key, value]; + } + + return acc; + }, + { + promises: new Array(size), + commands: new Array(size), + } + ); + + const tracing = enabledLogEvents?.PIPELINED_REDIS_SET ? getTracing() : null; + + pendingRedisSets = []; + + try { + const pipeline = redis.pipeline(commands); + + const results = await pipeline.exec(); + + if (tracing) { + logMessage("PIPELINED_REDIS_SET", { + size, + keys: promises.map(({ key }) => key).join(","), + ttl: promises.map(({ ttl }) => ttl ?? -1).join(","), + time: tracing(), + }); + } + + for (const { promise, index } of promises) { + const result = results?.[index]; + + if (!result) { + promise.resolve(); + } else { + if (result[0]) { + promise.reject(result[0]); + } else { + promise.resolve(); + } + } + } + } catch (err) { + for (const { promise } of promises) { + promise.reject(err); + } + } + } + } + async function getRedisCacheValue( key: string, useSuperjson: boolean, @@ -537,30 +657,59 @@ export function FineGrainedCache({ : JSON.stringify(newValue); if (expirySeconds > 0) { - const tracing = enabledLogEvents?.REDIS_SET ? getTracing() : null; - - await redis.setex(key, expirySeconds, stringifiedValue); - - if (tracing) { - logMessage("REDIS_SET", { + if (pipelineRedisSET != null) { + const set = pipelinedRedisSet({ key, - expirySeconds, - timedInvalidationDate: timedInvalidationDate?.toISOString(), - time: tracing(), - }); + value: stringifiedValue, + ttl: expirySeconds, + }).catch(onError); + + if (awaitRedisSet) await set; + } else { + const tracing = enabledLogEvents?.REDIS_SET ? getTracing() : null; + + const set = redis + .setex(key, expirySeconds, stringifiedValue) + .then(() => { + if (tracing) { + logMessage("REDIS_SET", { + key, + expirySeconds, + timedInvalidationDate: timedInvalidationDate?.toISOString(), + time: tracing(), + }); + } + }) + .catch(onError); + + if (awaitRedisSet) await set; } } else if (ttl === "Infinity") { - const tracing = enabledLogEvents?.REDIS_SET ? getTracing() : null; - - await redis.set(key, stringifiedValue); - - if (tracing) { - logMessage("REDIS_SET", { + if (pipelineRedisSET != null) { + const set = pipelinedRedisSet({ key, - expirySeconds: "Infinity", - timedInvalidationDate: timedInvalidationDate?.toISOString(), - time: tracing(), - }); + value: stringifiedValue, + }).catch(onError); + + if (awaitRedisSet) await set; + } else { + const tracing = enabledLogEvents?.REDIS_SET ? getTracing() : null; + + const set = redis + .set(key, stringifiedValue) + .then(() => { + if (tracing) { + logMessage("REDIS_SET", { + key, + expirySeconds: "Infinity", + timedInvalidationDate: timedInvalidationDate?.toISOString(), + time: tracing(), + }); + } + }) + .catch(onError); + + if (awaitRedisSet) await set; } } else if (enabledLogEvents?.REDIS_SKIP_SET) { logMessage("REDIS_SKIP_SET", { diff --git a/test/fineGrained.spec.ts b/test/fineGrained.spec.ts index f453bb1..bc8e8e7 100644 --- a/test/fineGrained.spec.ts +++ b/test/fineGrained.spec.ts @@ -3,6 +3,7 @@ import { join } from "path"; import { CachedCallback, FineGrainedCache, LogEventArgs } from "../src"; import { getCached, invalidateCache, logEverything, memoryCache, redis } from "./utils"; import { createDeferredPromise } from "../src/utils"; +import { setTimeout } from "timers/promises"; test.beforeEach(async () => { await redis.flushall(); @@ -18,7 +19,7 @@ test("fine grained - with memory cache", async (t) => { async function cb() { ++calls; - await new Promise((resolve) => setTimeout(resolve, 50)); + await setTimeout(50); return "hello world" as const; } @@ -55,7 +56,7 @@ test("fine grained - without memory cache", async (t) => { async function cb() { ++calls; - await new Promise((resolve) => setTimeout(resolve, 50)); + await setTimeout(50); return "hello world" as const; } @@ -94,7 +95,7 @@ test("fine grained - without memory cache and invalidate pattern", async (t) => async function cb() { ++calls; - await new Promise((resolve) => setTimeout(resolve, 50)); + await setTimeout(50); return "hello world" as const; } @@ -189,7 +190,7 @@ test("fine grained - timed invalidation", async (t) => { async function cb() { const data = ++calls; - await new Promise((resolve) => setTimeout(resolve, 10)); + await setTimeout(10); return data; } @@ -230,7 +231,7 @@ test("fine grained - timed invalidation", async (t) => { t.assert(cacheTtl > 0 && cacheTtl <= 2, "Should use the invalidation date remaining seconds"); // Wait 1 second - await new Promise((resolve) => setTimeout(resolve, 1000)); + await setTimeout(1000); t.is((await redis.keys("*")).length, 1); @@ -245,7 +246,7 @@ test("fine grained - timed invalidation", async (t) => { t.is(calls, 1); // Wait 1 second - await new Promise((resolve) => setTimeout(resolve, 1000)); + await setTimeout(1000); // Redis should have invalidated correctly t.is((await redis.keys("*")).length, 0); @@ -271,7 +272,7 @@ test("fine grained - forceUpdate", async (t) => { async function cb() { ++calls; - await new Promise((resolve) => setTimeout(resolve, 50)); + await setTimeout(50); return "hello world" + calls; } @@ -330,7 +331,7 @@ test("fine grained - dynamic ttl", async (t) => { const cb: CachedCallback = async function cb({ setTTL, getTTL }) { ++calls; - await new Promise((resolve) => setTimeout(resolve, 50)); + await setTimeout(50); t.deepEqual(getTTL(), { ttl: "10 seconds", @@ -508,3 +509,55 @@ test("pipelined gets", async (t) => { t.is(events[4].code, "REDIS_SET"); }); + +test("pipelined sets", async (t) => { + const events: LogEventArgs[] = []; + + const { getCached } = FineGrainedCache({ + redis, + logEvents: { + log: (args) => events.push(args), + events: logEverything, + }, + pipelineRedisSET: true, + onError: (err) => { + throw err; + }, + }); + + await Promise.all([ + getCached( + async () => { + return 123; + }, + { + keys: "test", + ttl: "5 minutes", + } + ), + getCached( + () => { + return 123; + }, + { + keys: "test2", + ttl: "Infinity", + } + ), + ]); + + t.is(events.length, 5); + + t.is(events[0].code, "REDIS_GET"); + t.is(events[1].code, "REDIS_GET"); + + t.is(events[2].code, "EXECUTION_TIME"); + + t.is(events[3].code, "EXECUTION_TIME"); + + t.is(events[4].code, "PIPELINED_REDIS_SET"); + + t.is(events[4].params.size, 2); + + t.is(events[4].params.ttl, "300,-1"); +}); diff --git a/test/utils.ts b/test/utils.ts index bd2c9e5..a4e5c8e 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -29,4 +29,5 @@ export const logEverything: Required = { REDLOCK_ACQUIRED: true, REDLOCK_RELEASED: true, REDLOCK_GET_AFTER_ACQUIRE: true, + PIPELINED_REDIS_SET: true, };