Skip to content

Commit

Permalink
pipeline redis sets & don't await redis set by default (with new opti…
Browse files Browse the repository at this point in the history
…on) (#8)

* redis pipeline set

* New "awaitRedisSet" option to allow skipping awaiting the Redis set on getCached logic execution

* put "size" param first on pipelined logging

* change default value of awaitRedisSet

* remove changeset

* pipeline set

* wrap pipeline set error

* fix

* remove wrapping
  • Loading branch information
PabloSzx committed Aug 8, 2022
1 parent 5d86915 commit 4f7b2b9
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 29 deletions.
7 changes: 7 additions & 0 deletions .changeset/four-geese-warn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@soundxyz/fine-grained-cache": minor
---

New "awaitRedisSet" option to allow skipping awaiting the Redis set on getCached logic execution

Default value is `process.env.NODE_ENV === "test"`
5 changes: 5 additions & 0 deletions .changeset/silver-glasses-warn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@soundxyz/fine-grained-cache": minor
---

New "pipelineRedisSET" option to pipeline redis SETs
191 changes: 170 additions & 21 deletions src/fineGrained.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export const Events = {
INVALIDATED_KEYS: "INVALIDATED_KEYS",
EXECUTION_TIME: "EXECUTION_TIME",
PIPELINED_REDIS_GETS: "PIPELINED_REDIS_GETS",
PIPELINED_REDIS_SET: "PIPELINED_REDIS_SET",
REDLOCK_ACQUIRED: "REDLOCK_ACQUIRED",
REDLOCK_RELEASED: "REDLOCK_RELEASED",
REDLOCK_GET_AFTER_ACQUIRE: "REDLOCK_GET_AFTER_ACQUIRE",
Expand All @@ -86,7 +87,9 @@ export function FineGrainedCache({
logEvents,
GETRedisTimeout,
pipelineRedisGET,
pipelineRedisSET,
defaultUseMemoryCache = true,
awaitRedisSet = process.env.NODE_ENV === "test",
}: {
redis: Redis;
redLock?: {
Expand Down Expand Up @@ -119,6 +122,14 @@ export function FineGrainedCache({
* If "number" is specified, that's the maximum amount of operations to be sent in a single pipeline
*/
pipelineRedisGET?: boolean | number;

/**
* Enable usage of redis pipelines for redis SET.
*
* If "number" is specified, that's the maximum amount of operations to be sent in a single pipeline
*/
pipelineRedisSET?: boolean | number;

/**
* Should `getCached` use memory cache by default?
*
Expand All @@ -127,6 +138,13 @@ export function FineGrainedCache({
* @default true
*/
defaultUseMemoryCache?: boolean;

/**
* Should `getCached` await the Redis set
*
* @default process.env.NODE_ENV === "test"
*/
awaitRedisSet?: boolean;
}) {
const redLock = redLockConfig?.client;
const defaultMaxExpectedTime = redLockConfig?.maxExpectedTime || "5 seconds";
Expand Down Expand Up @@ -236,12 +254,12 @@ export function FineGrainedCache({

if (tracing) {
logMessage("PIPELINED_REDIS_GETS", {
size,
keys: commands.map(([, key]) => key).join(","),
cache:
results
?.map(([, result]) => (typeof result === "string" ? "HIT" : "MISS"))
.join(",") || "null",
size,
time: tracing(),
});
}
Expand Down Expand Up @@ -269,6 +287,108 @@ export function FineGrainedCache({
}
}

let pendingRedisSets: { key: string; promise: DeferredPromise<void>; ttl?: number }[] = [];

let pendingRedisSetTimeout: ReturnType<typeof setTimeout> | undefined;

function pipelinedRedisSet({ key, value, ttl }: { key: string; value: string; ttl?: number }) {
if (pendingRedisSetTimeout !== undefined) {
clearTimeout(pendingRedisSetTimeout);
}

if (typeof pipelineRedisSET === "number" && pendingRedisSets.length >= pipelineRedisSET) {
executePipeline();
}

const promise = createDeferredPromise<void>();

pendingRedisSets.push({
key,
promise,
ttl,
});

pendingRedisSetTimeout = setTimeout(executePipeline);

return promise.promise;

async function executePipeline() {
pendingRedisSetTimeout = undefined;

const size = pendingRedisSets.length;
const { promises, commands } = pendingRedisSets.reduce<{
promises: {
promise: DeferredPromise<void>;
index: number;
key: string;
ttl: number | undefined;
}[];
commands: Array<
| [cmd: "set", key: string, value: string]
| [cmd: "setex", key: string, ttl: number, value: string]
>;
}>(
(acc, { key, promise, ttl }, index) => {
acc.promises[index] = {
promise,
index,
key,
ttl,
};

if (ttl != null) {
acc.commands[index] = ["setex", key, ttl, value];
} else {
acc.commands[index] = ["set", key, value];
}

return acc;
},
{
promises: new Array(size),
commands: new Array(size),
}
);

const tracing = enabledLogEvents?.PIPELINED_REDIS_SET ? getTracing() : null;

pendingRedisSets = [];

try {
const pipeline = redis.pipeline(commands);

const results = await pipeline.exec();

if (tracing) {
logMessage("PIPELINED_REDIS_SET", {
size,
keys: promises.map(({ key }) => key).join(","),
ttl: promises.map(({ ttl }) => ttl ?? -1).join(","),
time: tracing(),
});
}

for (const { promise, index } of promises) {
const result = results?.[index];

if (!result) {
promise.resolve();
} else {
if (result[0]) {
promise.reject(result[0]);
} else {
promise.resolve();
}
}
}
} catch (err) {
for (const { promise } of promises) {
promise.reject(err);
}
}
}
}

async function getRedisCacheValue<T>(
key: string,
useSuperjson: boolean,
Expand Down Expand Up @@ -537,30 +657,59 @@ export function FineGrainedCache({
: JSON.stringify(newValue);

if (expirySeconds > 0) {
const tracing = enabledLogEvents?.REDIS_SET ? getTracing() : null;

await redis.setex(key, expirySeconds, stringifiedValue);

if (tracing) {
logMessage("REDIS_SET", {
if (pipelineRedisSET != null) {
const set = pipelinedRedisSet({
key,
expirySeconds,
timedInvalidationDate: timedInvalidationDate?.toISOString(),
time: tracing(),
});
value: stringifiedValue,
ttl: expirySeconds,
}).catch(onError);

if (awaitRedisSet) await set;
} else {
const tracing = enabledLogEvents?.REDIS_SET ? getTracing() : null;

const set = redis
.setex(key, expirySeconds, stringifiedValue)
.then(() => {
if (tracing) {
logMessage("REDIS_SET", {
key,
expirySeconds,
timedInvalidationDate: timedInvalidationDate?.toISOString(),
time: tracing(),
});
}
})
.catch(onError);

if (awaitRedisSet) await set;
}
} else if (ttl === "Infinity") {
const tracing = enabledLogEvents?.REDIS_SET ? getTracing() : null;

await redis.set(key, stringifiedValue);

if (tracing) {
logMessage("REDIS_SET", {
if (pipelineRedisSET != null) {
const set = pipelinedRedisSet({
key,
expirySeconds: "Infinity",
timedInvalidationDate: timedInvalidationDate?.toISOString(),
time: tracing(),
});
value: stringifiedValue,
}).catch(onError);

if (awaitRedisSet) await set;
} else {
const tracing = enabledLogEvents?.REDIS_SET ? getTracing() : null;

const set = redis
.set(key, stringifiedValue)
.then(() => {
if (tracing) {
logMessage("REDIS_SET", {
key,
expirySeconds: "Infinity",
timedInvalidationDate: timedInvalidationDate?.toISOString(),
time: tracing(),
});
}
})
.catch(onError);

if (awaitRedisSet) await set;
}
} else if (enabledLogEvents?.REDIS_SKIP_SET) {
logMessage("REDIS_SKIP_SET", {
Expand Down
69 changes: 61 additions & 8 deletions test/fineGrained.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { join } from "path";
import { CachedCallback, FineGrainedCache, LogEventArgs } from "../src";
import { getCached, invalidateCache, logEverything, memoryCache, redis } from "./utils";
import { createDeferredPromise } from "../src/utils";
import { setTimeout } from "timers/promises";

test.beforeEach(async () => {
await redis.flushall();
Expand All @@ -18,7 +19,7 @@ test("fine grained - with memory cache", async (t) => {

async function cb() {
++calls;
await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

return "hello world" as const;
}
Expand Down Expand Up @@ -55,7 +56,7 @@ test("fine grained - without memory cache", async (t) => {

async function cb() {
++calls;
await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

return "hello world" as const;
}
Expand Down Expand Up @@ -94,7 +95,7 @@ test("fine grained - without memory cache and invalidate pattern", async (t) =>

async function cb() {
++calls;
await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

return "hello world" as const;
}
Expand Down Expand Up @@ -189,7 +190,7 @@ test("fine grained - timed invalidation", async (t) => {
async function cb() {
const data = ++calls;

await new Promise((resolve) => setTimeout(resolve, 10));
await setTimeout(10);

return data;
}
Expand Down Expand Up @@ -230,7 +231,7 @@ test("fine grained - timed invalidation", async (t) => {
t.assert(cacheTtl > 0 && cacheTtl <= 2, "Should use the invalidation date remaining seconds");

// Wait 1 second
await new Promise((resolve) => setTimeout(resolve, 1000));
await setTimeout(1000);

t.is((await redis.keys("*")).length, 1);

Expand All @@ -245,7 +246,7 @@ test("fine grained - timed invalidation", async (t) => {
t.is(calls, 1);

// Wait 1 second
await new Promise((resolve) => setTimeout(resolve, 1000));
await setTimeout(1000);

// Redis should have invalidated correctly
t.is((await redis.keys("*")).length, 0);
Expand All @@ -271,7 +272,7 @@ test("fine grained - forceUpdate", async (t) => {

async function cb() {
++calls;
await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

return "hello world" + calls;
}
Expand Down Expand Up @@ -330,7 +331,7 @@ test("fine grained - dynamic ttl", async (t) => {

const cb: CachedCallback<unknown> = async function cb({ setTTL, getTTL }) {
++calls;
await new Promise((resolve) => setTimeout(resolve, 50));
await setTimeout(50);

t.deepEqual(getTTL(), {
ttl: "10 seconds",
Expand Down Expand Up @@ -508,3 +509,55 @@ test("pipelined gets", async (t) => {

t.is(events[4].code, "REDIS_SET");
});

test("pipelined sets", async (t) => {
const events: LogEventArgs[] = [];

const { getCached } = FineGrainedCache({
redis,
logEvents: {
log: (args) => events.push(args),
events: logEverything,
},
pipelineRedisSET: true,
onError: (err) => {
throw err;
},
});

await Promise.all([
getCached(
async () => {
return 123;
},
{
keys: "test",
ttl: "5 minutes",
}
),
getCached(
() => {
return 123;
},
{
keys: "test2",
ttl: "Infinity",
}
),
]);

t.is(events.length, 5);

t.is(events[0].code, "REDIS_GET");
t.is(events[1].code, "REDIS_GET");

t.is(events[2].code, "EXECUTION_TIME");

t.is(events[3].code, "EXECUTION_TIME");

t.is(events[4].code, "PIPELINED_REDIS_SET");

t.is(events[4].params.size, 2);

t.is(events[4].params.ttl, "300,-1");
});
Loading

0 comments on commit 4f7b2b9

Please sign in to comment.