From 33a3e637600c49b118c6894b32cbf4313259bdf3 Mon Sep 17 00:00:00 2001 From: Kurt Date: Tue, 21 Aug 2018 15:23:44 -0500 Subject: [PATCH] feat: global cache del/purgeTag Purge cache entries globally using `cache.global`. ```javascript import cache from "@fly/cache" // delete a single key everywhere await cache.global.del(key) // delete every key with a tag await cache.global.purgeTag(key) ``` --- .travis.yml | 28 ++++++- src/bridge/bridge.ts | 5 +- src/bridge/fly/cache.ts | 40 +++++++--- src/cache_notifier.ts | 86 +++++++++++++++++++++ src/cache_store.ts | 14 ++-- src/cmd/server.ts | 7 +- src/memory_cache_store.ts | 42 +++++----- src/redis_adapter.ts | 14 ++++ src/redis_cache_notifier.ts | 107 ++++++++++++++++++++++++++ src/redis_cache_store.ts | 50 ++++++------ test/fly.cache.spec.ts | 8 +- test/js-fails.spec.ts | 4 +- v8env/src/fly/cache/global.ts | 30 ++++++++ v8env/src/fly/cache/index.ts | 20 +++-- v8env/test/fly.cache.notifier.spec.js | 33 ++++++++ 15 files changed, 408 insertions(+), 80 deletions(-) create mode 100644 src/cache_notifier.ts create mode 100644 src/redis_adapter.ts create mode 100644 src/redis_cache_notifier.ts create mode 100644 v8env/src/fly/cache/global.ts create mode 100644 v8env/test/fly.cache.notifier.spec.js diff --git a/.travis.yml b/.travis.yml index 63e6d0f9..334f5402 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ compiler: clang matrix: include: - os: linux + name: "Linux + node 8" addons: &linux_addons apt: sources: @@ -15,23 +16,44 @@ matrix: - nasm env: &linux_env - MATRIX_EVAL="export CXX=g++-5" - - REDIS_CACHE_URL=redis://localhost/ node_js: 8 - - ox: linux + - os: linux + name: "Linux + node 10.3" addons: *linux_addons env: *linux_env node_js: 10.3 - - ox: linux + - os: linux + name: "Linux + node 10.8" addons: *linux_addons env: *linux_env node_js: 10.8.0 + + - os: linux + name: "Linux + Redis + node 8" + addons: *linux_addons + env: &linux_env_redis + - MATRIX_EVAL="export CXX=g++-5" REDIS_CACHE_URL=redis://localhost/ REDIS_CACHE_NOTIFICATION_URL=redis://localhost + node_js: 8 + - os: linux + name: "Linux + Redis + node 10.3" + addons: *linux_addons + env: *linux_env_redis + node_js: 10.3 + - os: linux + name: "Linux + Redis + node 10.8" + addons: *linux_addons + env: *linux_env_redis + node_js: 10.8.0 - os: osx + name: "OSX + Node 8" osx_image: xcode9.4 node_js: 8 - os: osx + name: "OSX + Node 10.3" osx_image: xcode9.4 node_js: 10.3 - os: osx + name: "OSX + Node 10.8" osx_image: xcode9.4 node_js: 10.8.0 diff --git a/src/bridge/bridge.ts b/src/bridge/bridge.ts index 0df5d883..b793059b 100644 --- a/src/bridge/bridge.ts +++ b/src/bridge/bridge.ts @@ -23,6 +23,7 @@ import { catalog, BridgeFunction } from './' import { Runtime } from '../runtime'; import { DataStore } from '../data_store'; import { defaultCacheStore } from '../cache_store'; +import { defaultCacheNotifier, CacheNotifier } from '../cache_notifier'; const errNoSuchBridgeFn = "Attempted to call a unregistered bridge function." @@ -40,8 +41,8 @@ export interface BridgeOptions { cacheStore?: CacheStore fileStore?: FileStore dataStore?: DataStore + cacheNotifier?: CacheNotifier } - /** * @private */ @@ -49,11 +50,13 @@ export class Bridge { cacheStore: CacheStore fileStore?: FileStore dataStore?: DataStore + cacheNotifier?: CacheNotifier functions: Map constructor(opts: BridgeOptions = {}) { this.cacheStore = opts.cacheStore || defaultCacheStore() + this.cacheNotifier = opts.cacheNotifier || defaultCacheNotifier(this.cacheStore) this.fileStore = opts.fileStore this.dataStore = opts.dataStore this.functions = new Map( diff --git a/src/bridge/fly/cache.ts b/src/bridge/fly/cache.ts index 576f8990..6dc72c82 100644 --- a/src/bridge/fly/cache.ts +++ b/src/bridge/fly/cache.ts @@ -4,8 +4,10 @@ import log from '../../log' import { transferInto } from '../../utils/buffer' import { Bridge } from '../bridge'; import { Runtime } from '../../runtime'; +import { CacheNotifierOperation, isCacheNotifierOperation } from '../../cache_notifier'; const errCacheStoreUndefined = new Error("cacheStore is not defined in the config.") +const errCacheNotifierUndefined = new Error("cacheNotifier is not defined in the config") registerBridge('flyCacheSet', function cacheSet(rt: Runtime, bridge: Bridge, key: string, value: ArrayBuffer | string, options: string | undefined, callback: ivm.Reference) { if (!bridge.cacheStore) { @@ -27,7 +29,7 @@ registerBridge('flyCacheSet', function cacheSet(rt: Runtime, bridge: Bridge, key callback.applyIgnored(null, [err.toString()]) return } - bridge.cacheStore.set(rt, key, buf, opts).then((ok) => { + bridge.cacheStore.set(rt.app.id, key, buf, opts).then((ok) => { rt.reportUsage("cache:set", { size: buf.byteLength }) callback.applyIgnored(null, [null, ok]) }).catch((err) => { @@ -43,7 +45,7 @@ registerBridge('flyCacheExpire', function cacheExpire(rt: Runtime, bridge: Bridg return } - bridge.cacheStore.expire(rt, key, ttl).then((ok) => { + bridge.cacheStore.expire(rt.app.id, key, ttl).then((ok) => { callback.applyIgnored(null, [null, ok]) }).catch((err) => { callback.applyIgnored(null, [err.toString()]) @@ -57,7 +59,7 @@ registerBridge('flyCacheGet', return } - bridge.cacheStore.get(rt, key).then((buf) => { + bridge.cacheStore.get(rt.app.id, key).then((buf) => { rt.reportUsage("cache:get", { size: buf ? buf.byteLength : 0 }) callback.applyIgnored(null, [null, transferInto(buf)]) }).catch((err) => { @@ -73,11 +75,11 @@ registerBridge('flyCacheDel', return } - bridge.cacheStore.del(rt, key).then((result) => { + bridge.cacheStore.del(rt.app.id, key).then((result) => { callback.applyIgnored(null, [null, !!result]) }).catch((err) => { log.error("got err in cache.del", err) - callback.applyIgnored(null, [null, null]) // swallow errors on get for now + callback.applyIgnored(null, [err.toString()]) }) }) @@ -88,11 +90,11 @@ registerBridge('flyCacheSetTags', return } - bridge.cacheStore.setTags(rt, key, tags).then((result) => { + bridge.cacheStore.setTags(rt.app.id, key, tags).then((result) => { callback.applyIgnored(null, [null, result]) }).catch((err) => { log.error("got err in cache.setTags", err) - callback.applyIgnored(null, [null, null]) // swallow errors on get for now + callback.applyIgnored(null, [err.toString()]) }) }) @@ -103,12 +105,32 @@ registerBridge('flyCachePurgeTags', return } - bridge.cacheStore.purgeTags(rt, key).then((result) => { + bridge.cacheStore.purgeTag(rt.app.id, key).then((result) => { setImmediate(() => { callback.applyIgnored(null, [null, JSON.stringify(result)]) }) }).catch((err) => { log.error("got err in cache.purgeTags", err) - callback.applyIgnored(null, [null, null]) // swallow errors on get for now + callback.applyIgnored(null, [err.toString()]) }) }) + + +registerBridge('flyCacheNotify', + function cacheDel(rt: Runtime, bridge: Bridge, type: string | CacheNotifierOperation, key: string, callback: ivm.Reference) { + if (!bridge.cacheStore || !bridge.cacheNotifier) { + callback.applyIgnored(null, [errCacheNotifierUndefined.toString()]) + return + } + + if (!isCacheNotifierOperation(type)) { + callback.applyIgnored(null, ["Invalid cache notification type"]) + return + } + bridge.cacheNotifier.send(type, rt.app.id, key).then((result) => { + callback.applyIgnored(null, [null, !!result]) + }).catch((err) => { + log.error("got err in cacheNotify", err) + callback.applyIgnored(null, [err.toString()]) + }) + }) \ No newline at end of file diff --git a/src/cache_notifier.ts b/src/cache_notifier.ts new file mode 100644 index 00000000..09b21713 --- /dev/null +++ b/src/cache_notifier.ts @@ -0,0 +1,86 @@ +import { CacheStore } from "./cache_store" +import { RedisCacheNotifier } from "./redis_cache_notifier" + +export enum CacheNotifierOperation { + del = "del", + purgeTag = "purgeTag" +} + +export interface ReceiveHandler { + (op: CacheNotifierOperation, ns: string, value: string): void +} +export interface CacheNotifierAdapter { + send(op: CacheNotifierOperation, ns: string, value: string): Promise + start(handler: ReceiveHandler): void +} + +export class CacheNotifier { + constructor( + public cacheStore: CacheStore, + public adapter: CacheNotifierAdapter + ) { + adapter.start((type, ns, value) => this.handle(type, ns, value)) + } + + async send(op: CacheNotifierOperation, ns: string, value: string) { + return this.adapter.send(op, ns, value) + } + + async handle(type: CacheNotifierOperation, ns: string, value: string) { + switch (type) { + case CacheNotifierOperation.del: { + return await this.cacheStore.del(ns, value) + } + case CacheNotifierOperation.purgeTag: { + const res = await this.cacheStore.purgeTag(ns, value) + return res.length > 0 + } + default: + throw new Error(`Unknown CacheNotifierOperation: ${type}`) + } + } + +} + +export function isCacheNotifierOperation(op: any): op is CacheNotifierOperation { + if (typeof op !== "string") return false + + const v = Object.getOwnPropertyNames(CacheNotifierOperation) + + if (v.includes(op)) { + return true + } + return false +} + +export class LocalCacheNotifier implements CacheNotifierAdapter { + private _handler: ReceiveHandler | undefined + async send(op: CacheNotifierOperation, ns: string, value: string) { + if (this._handler) { + this._handler(op, ns, value) + } + return true + } + + start(handler: ReceiveHandler) { + this._handler = handler + } + /*switch(type) { + case CacheNotifierOperation.del: + return await this.cacheStore.del(ns, key) + case CacheNotifierOperation.purgeTags: + return !!(await this.cacheStore.purgeTags(ns, key)) + default: + throw new Error(`Unknown CacheNotifierOperation: ${type}`) + } + }*/ +}//*/e + +export function defaultCacheNotifier(cacheStore: CacheStore) { + let adapter: CacheNotifierAdapter = new LocalCacheNotifier() + if (process.env.REDIS_CACHE_NOTIFIER_URL) { + console.log("Using Redis Cache Notifier") + adapter = new RedisCacheNotifier(process.env.REDIS_CACHE_NOTIFIER_URL) + } + return new CacheNotifier(cacheStore, adapter) +} \ No newline at end of file diff --git a/src/cache_store.ts b/src/cache_store.ts index 8ecf0596..7a838fc7 100644 --- a/src/cache_store.ts +++ b/src/cache_store.ts @@ -7,13 +7,13 @@ export interface CacheSetOptions { tags?: string[] } export interface CacheStore { - get(rt: Runtime, key: string): Promise - set(rt: Runtime, key: string, value: any, options?: CacheSetOptions | number): Promise - del(rt: Runtime, key: string): Promise - expire(rt: Runtime, key: string, ttl: number): Promise - ttl(rt: Runtime, key: string): Promise, - setTags(rt: Runtime, key: string, tags: string[]): Promise, - purgeTags(rt: Runtime, tags: string): Promise + get(ns: string, key: string): Promise + set(ns: string, key: string, value: any, options?: CacheSetOptions | number): Promise + del(ns: string, key: string): Promise + expire(ns: string, key: string, ttl: number): Promise + ttl(ns: string, key: string): Promise, + setTags(ns: string, key: string, tags: string[]): Promise, + purgeTag(ns: string, tag: string): Promise rand?: number } diff --git a/src/cmd/server.ts b/src/cmd/server.ts index 5f90e6e2..7e33b4db 100644 --- a/src/cmd/server.ts +++ b/src/cmd/server.ts @@ -3,6 +3,9 @@ import { Command } from 'commandpost/lib'; import * as path from 'path' import * as fs from 'fs' import { spawn } from 'child_process' +import { FileAppStore } from "../file_app_store" +import { Server } from "../server" +import { RedisCacheNotifier } from '../redis_cache_notifier'; interface ServerOptions { port?: string @@ -21,8 +24,8 @@ root .option("--inspect", "use the v8 inspector on your fly app") .option("--uglify", "uglify your code like we'll use in production (warning: slow!)") .action(async function (this: Command, opts, args, rest) { - const { FileAppStore } = require('../file_app_store') - const { Server } = require('../server') + //const { FileAppStore } = require('../file_app_store') + //const { Server } = require('../server') let cwd = args.path || process.cwd() if (!fs.existsSync(cwd)) { diff --git a/src/memory_cache_store.ts b/src/memory_cache_store.ts index f29aad35..ff84d75c 100644 --- a/src/memory_cache_store.ts +++ b/src/memory_cache_store.ts @@ -1,6 +1,5 @@ import { CacheStore, CacheSetOptions } from './cache_store' import * as IORedis from 'ioredis' -import { Runtime } from './runtime'; const Redis = require('ioredis-mock') const OK = 'OK' @@ -13,15 +12,15 @@ export class MemoryCacheStore implements CacheStore { this.rand = Math.random() } - async get(rt: Runtime, key: string): Promise { - const buf = await this.redis.getBuffer(keyFor(rt, key)) + async get(ns: string, key: string): Promise { + const buf = await this.redis.getBuffer(keyFor(ns, key)) if (!buf) return null return Buffer.from(buf) } - async set(rt: Runtime, key: string, value: any, options?: CacheSetOptions | number): Promise { - const k = keyFor(rt, key) + async set(ns: string, key: string, value: any, options?: CacheSetOptions | number): Promise { + const k = keyFor(ns, key) const pipeline = this.redis.pipeline() let ttl: number | undefined if (typeof options === "number") { @@ -38,7 +37,7 @@ export class MemoryCacheStore implements CacheStore { if (typeof options !== "number" && options && options.tags instanceof Array) { pipeline.sadd(k + ":tags", ...options.tags) - this.setTags(rt, key, options.tags, pipeline) + this.setTags(ns, key, options.tags, pipeline) if (ttl) { pipeline.expire(k + ":tags", ttl) } @@ -49,27 +48,28 @@ export class MemoryCacheStore implements CacheStore { return pipelineResultOK(result) } - async del(rt: Runtime, key: string): Promise { - const result = await this.redis.del(keyFor(rt, key)) + async del(ns: string, key: string): Promise { + console.log("memory cache del", key) + const result = await this.redis.del(keyFor(ns, key)) return result === OK } - async expire(rt: Runtime, key: string, ttl: number): Promise { - return (await this.redis.expire(keyFor(rt, key), ttl)) === 1 + async expire(ns: string, key: string, ttl: number): Promise { + return (await this.redis.expire(keyFor(ns, key), ttl)) === 1 } - async ttl(rt: Runtime, key: string): Promise { - return this.redis.ttl(keyFor(rt, key)) + async ttl(ns: string, key: string): Promise { + return this.redis.ttl(keyFor(ns, key)) } - async setTags(rt: Runtime, key: string, tags: string[], pipeline?: IORedis.Pipeline): Promise { + async setTags(ns: string, key: string, tags: string[], pipeline?: IORedis.Pipeline): Promise { const doSave = !pipeline if (!pipeline) { pipeline = this.redis.pipeline() } - const k = keyFor(rt, key) + const k = keyFor(ns, key) for (let s of tags) { - s = tagKeyFor(rt, s) + s = tagKeyFor(ns, s) pipeline.sadd(s, k) } if (doSave) { @@ -80,8 +80,8 @@ export class MemoryCacheStore implements CacheStore { } } - async purgeTags(rt: Runtime, tags: string): Promise { - const s = tagKeyFor(rt, tags) + async purgeTag(ns: string, tags: string): Promise { + const s = tagKeyFor(ns, tags) const checks = this.redis.pipeline() const keysToDelete = new Array() const keysToCheck = [] @@ -120,11 +120,11 @@ async function* setScanner(redis: IORedis.Redis, key: string) { } while (cursor > 0) } -function tagKeyFor(rt: Runtime, tag: string) { - return `tag:${rt.app.name}:${tag}` +function tagKeyFor(ns: string, tag: string) { + return `tag:${ns}:${tag}` } -function keyFor(rt: Runtime, key: string) { - return `cache:${rt.app.name}:${key}` +function keyFor(ns: string, key: string) { + return `cache:${ns}:${key}` } function pipelineResultOK(result: any) { diff --git a/src/redis_adapter.ts b/src/redis_adapter.ts new file mode 100644 index 00000000..7aecda99 --- /dev/null +++ b/src/redis_adapter.ts @@ -0,0 +1,14 @@ +import { RedisClient, ClientOpts as RedisClientOpts } from "redis"; + +export type RedisConnectionOptions = RedisClientOpts | string + +/** + * Utility method for taking common redis configs and creating a RedisClient + */ +export function initRedisClient(opts: RedisConnectionOptions) { + if (opts instanceof RedisClient) return opts + if (typeof opts === "string") { + opts = { url: opts } + } + return new RedisClient(opts) +} \ No newline at end of file diff --git a/src/redis_cache_notifier.ts b/src/redis_cache_notifier.ts new file mode 100644 index 00000000..30ea8c26 --- /dev/null +++ b/src/redis_cache_notifier.ts @@ -0,0 +1,107 @@ +import { CacheNotifierAdapter, CacheNotifierOperation, ReceiveHandler } from "./cache_notifier"; +import { RedisClient, ClientOpts } from "redis"; +import { RedisConnectionOptions, initRedisClient } from "./redis_adapter"; +import { promisify } from "util"; + +export interface RedisCacheNotifierConfig { + reader: RedisConnectionOptions, + writer?: RedisConnectionOptions +} +const notifierKey = "notifier:cache" +export class RedisCacheNotifier implements CacheNotifierAdapter { + subscriber: RedisClient + reader: RedisClient + writer: RedisClient + private _handler: ReceiveHandler | undefined + private _lastEventTime = Date.now() - 10000 + + constructor(opts: RedisCacheNotifierConfig | RedisConnectionOptions) { + if (!isRedisCacheNotifierConfig(opts)) { + opts = { reader: opts } + } + if (!opts.writer) { + opts.writer = opts.reader + } + this.subscriber = initRedisClient(opts.reader) + this.reader = initRedisClient(opts.reader) + this.writer = initRedisClient(opts.writer) + } + + send(type: CacheNotifierOperation, ns: string, value: string) { + const msg = { + type: type, + ns: ns, + value: value + } + + return new Promise((resolve, reject) => { + this.writer.zadd(notifierKey, Date.now(), JSON.stringify(msg), (err, _) => { + if (err) { + return reject(err) + } + resolve(true) + }) + }) + } + async start(handler: ReceiveHandler) { + this._handler = handler + + const configAsync = promisify(this.subscriber.config).bind(this.subscriber); + const zrangebyscore = promisify(this.reader.zrangebyscore).bind(this.reader) + + let [, conf] = await configAsync("get", "notify-keyspace-events") + if (!conf.includes("E") && !conf.includes('z')) { + conf = conf + "KEz" + console.log("Enabling zset notifications in redis:", conf) + await configAsync("set", "notify-keyspace-events", conf) + } + this._lastEventTime = Date.now() + const dbIndex = parseInt((this.subscriber).selected_db || 0) + this.subscriber.subscribe(`__keyspace@${dbIndex}__:notifier:cache`) + this.subscriber.on('message', async (channel, message) => { + const start = this._lastEventTime + this._lastEventTime = Date.now() + + if (message === "zadd") { + const changes = await zrangebyscore(notifierKey, start, '+inf') + for (const raw of changes) { + try { + const msg = JSON.parse(raw) + if (this._handler && isNotifierMessage(msg)) { + this._handler(msg.type, msg.ns, msg.value) + } + } catch (err) { + console.error("Error handling cache notifier:", err) + } + } + } + }) + } +} + +function tryLock(redis: RedisClient, id: string, cb: Function) { + return new Promise((resolve, reject) => { + redis.set("notifier:cache:lock", id, "NX", (err, result) => { + if (err) { + return reject(err) + } + resolve(result === "OK") + }) + }) +} + +function isRedisCacheNotifierConfig(opts: any): opts is RedisCacheNotifierConfig { + if (typeof opts === "object" && opts.reader) { + return true + } + return false +} + +function isNotifierMessage(msg: any): msg is { type: CacheNotifierOperation, ns: string, value: string } { + if (typeof msg.type === "string" && + typeof msg.ns === "string" && + typeof msg.value === "string") { + return true + } + return false +} \ No newline at end of file diff --git a/src/redis_cache_store.ts b/src/redis_cache_store.ts index 5cddbc1f..82305c13 100644 --- a/src/redis_cache_store.ts +++ b/src/redis_cache_store.ts @@ -1,27 +1,30 @@ import { CacheStore, CacheSetOptions } from './cache_store'; import { RedisClient } from 'redis'; -import { Runtime } from "./runtime"; import { promisify } from 'util'; +import { RedisConnectionOptions, initRedisClient } from './redis_adapter'; export class RedisCacheStore implements CacheStore { redis: FlyRedis - constructor(redis: string | RedisClient) { + constructor(redis: RedisConnectionOptions | RedisClient) { if (typeof redis === "string") { - redis = new RedisClient({ url: redis, detect_buffers: true }) + redis = { url: redis, detect_buffers: true } + } + if (!(redis instanceof RedisClient)) { + redis = initRedisClient(redis) } this.redis = new FlyRedis(redis) } - async get(rt: Runtime, key: string): Promise { - const ret = await this.redis.getBufferAsync(Buffer.from(keyFor(rt, key))) + async get(ns: string, key: string): Promise { + const ret = await this.redis.getBufferAsync(Buffer.from(keyFor(ns, key))) return ret } - async set(rt: Runtime, key: string, value: any, options?: CacheSetOptions | number): Promise { + async set(ns: string, key: string, value: any, options?: CacheSetOptions | number): Promise { const start = Date.now() - const k = keyFor(rt, key) + const k = keyFor(ns, key) let ttl: number | undefined if (typeof options === "number") { ttl = options @@ -37,7 +40,7 @@ export class RedisCacheStore implements CacheStore { } if (typeof options === "object" && options && options.tags instanceof Array) { commands.push(this.redis.saddAsync(k + ":tags", options.tags)) - commands.push(this.setTags(rt, key, options.tags)) + commands.push(this.setTags(ns, key, options.tags)) if (ttl) { commands.push(this.redis.expireAsync(k + ":tags", ttl)) } @@ -48,8 +51,8 @@ export class RedisCacheStore implements CacheStore { return redisGroupOK(result) } - async expire(rt: Runtime, key: string, ttl: number): Promise { - const k = keyFor(rt, key) + async expire(ns: string, key: string, ttl: number): Promise { + const k = keyFor(ns, key) const cmds = await Promise.all([ this.redis.expireAsync(k, ttl), this.redis.expireAsync(k + ":tags", ttl) @@ -57,27 +60,27 @@ export class RedisCacheStore implements CacheStore { return redisGroupOK(cmds) } - async ttl(rt: Runtime, key: string): Promise { - return this.redis.ttlAsync(keyFor(rt, key)) + async ttl(ns: string, key: string): Promise { + return this.redis.ttlAsync(keyFor(ns, key)) } - async del(rt: Runtime, key: string): Promise { - const k = keyFor(rt, key) + async del(ns: string, key: string): Promise { + const k = keyFor(ns, key) const cmds = await Promise.all([ this.redis.delAsync(k), this.redis.delAsync(k + ":tags") ]) return redisGroupOK(cmds) } - async setTags(rt: Runtime, key: string, tags: string[]): Promise { - const k = keyFor(rt, key) - const p = tags.map((t) => this.redis.saddAsync(tagKeyFor(rt, t), k)) + async setTags(ns: string, key: string, tags: string[]): Promise { + const k = keyFor(ns, key) + const p = tags.map((t) => this.redis.saddAsync(tagKeyFor(ns, t), k)) const result = await Promise.all(p) return result.filter((r) => !r).length > 0 } - async purgeTags(rt: Runtime, tags: string): Promise { - const s = tagKeyFor(rt, tags) + async purgeTag(ns: string, tags: string): Promise { + const s = tagKeyFor(ns, tags) const keysToDelete = new Array() const keysToCheck = new Array() const checks = new Array>() @@ -106,11 +109,11 @@ export class RedisCacheStore implements CacheStore { } } -function tagKeyFor(rt: Runtime, tag: string) { - return `tag:${rt.app.id}:${tag}` +function tagKeyFor(ns: string, tag: string) { + return `tag:${ns}:${tag}` } -function keyFor(rt: Runtime, key: string) { - return `cache:${rt.app.id}:${key}` +function keyFor(ns: string, key: string) { + return `cache:${ns}:${key}` } function redisGroupOK(result: any) { @@ -168,7 +171,6 @@ class FlyRedis { async sscanShim(key: string, cursor: number, count?: number) { if (!count) count = 10 const members = await this.smembersAsync(key) - console.log("got members:", members) if (members && cursor < members.length) { let newCursor = cursor + count if (newCursor > members.length) { diff --git a/test/fly.cache.spec.ts b/test/fly.cache.spec.ts index 10c6cb58..6e7b48d4 100644 --- a/test/fly.cache.spec.ts +++ b/test/fly.cache.spec.ts @@ -13,7 +13,7 @@ describe('Cache API', function () { expect(res.status).to.equal(200) - let cached = await cacheStore.get(this.server.runtime, `${url}`) + let cached = await cacheStore.get(this.server.runtime.app.id, `${url}`) expect((cached).toString()).to.equal(url) }) @@ -24,7 +24,7 @@ describe('Cache API', function () { expect(res.status).to.equal(200) - let ttl = await cacheStore.ttl(this.server.runtime, url) + let ttl = await cacheStore.ttl(this.server.runtime.app.id, url) expect(ttl).to.equal(3600) }) @@ -36,7 +36,7 @@ describe('Cache API', function () { it('gets a value', async function () { let data = "cached:" + Math.random().toString() - await cacheStore.set(this.server.runtime, "http://test/cache-api/get", data) + await cacheStore.set(this.server.runtime.app.id, "http://test/cache-api/get", data) let res = await axios.get("http://127.0.0.1:3333/cache-api/get", { headers: { 'Host': 'test' } }) expect(res.status).to.equal(200) @@ -45,7 +45,7 @@ describe('Cache API', function () { it('getString with garbage cache data', async function () { let data = new Buffer([-1, -1, -1, -1]) - await cacheStore.set(this.server.runtime, "http://test/cache-api/get", data) + await cacheStore.set(this.server.runtime.app.id, "http://test/cache-api/get", data) let res = await axios.get("http://127.0.0.1:3333/cache-api/get", { headers: { 'Host': 'test' } }) expect(res.status).to.equal(200) diff --git a/test/js-fails.spec.ts b/test/js-fails.spec.ts index 2c66fb1d..b965aff8 100644 --- a/test/js-fails.spec.ts +++ b/test/js-fails.spec.ts @@ -22,7 +22,7 @@ describe('JS Fails', function () { describe("setTimeout fires after response", function () { before(startServer(`fails/async-app`)) before(function () { - return cacheStore.set(this.server.runtime, "long-wait-after-response", "no") + return cacheStore.set(this.server.runtime.app.id, "long-wait-after-response", "no") }) after(stopServer) @@ -44,7 +44,7 @@ describe('JS Fails', function () { await (sleep(150)) - let cached = await cacheStore.get(this.server.runtime, "long-wait-after-response") + let cached = await cacheStore.get(this.server.runtime.app.id, "long-wait-after-response") expect((cached).toString()).to.equal(cacheValue) }) }) diff --git a/v8env/src/fly/cache/global.ts b/v8env/src/fly/cache/global.ts new file mode 100644 index 00000000..97276da9 --- /dev/null +++ b/v8env/src/fly/cache/global.ts @@ -0,0 +1,30 @@ +declare var bridge: any + +export async function del(key: string): Promise { + return new Promise(function globalDelPromise(resolve, reject) { + bridge.dispatch("flyCacheNotify", "del", key, function globalDelCallback(err: string | null, ok?: boolean) { + if (err != null) { + reject(err) + return + } + resolve(ok) + }) + }) +} + +export async function purgeTag(key: string): Promise { + return new Promise(function globalDelPromise(resolve, reject) { + bridge.dispatch("flyCacheNotify", "purgeTag", key, function globalPurgeTagCallback(err: string | null, ok?: boolean) { + if (err != null) { + reject(err) + return + } + resolve(ok) + }) + }) +} + +export default { + del, + purgeTag +} \ No newline at end of file diff --git a/v8env/src/fly/cache/index.ts b/v8env/src/fly/cache/index.ts index 2cf69d4b..60adf6c4 100644 --- a/v8env/src/fly/cache/index.ts +++ b/v8env/src/fly/cache/index.ts @@ -150,6 +150,16 @@ export function del(key: string) { return expire(key, 0) } +/** + * A library for caching/retrieving Response objects + */ +export { default as responseCache } from "./response" + +/** + * Global cache operations + */ +import { default as global } from "./global" + const cache = { get, getString, @@ -157,11 +167,7 @@ const cache = { expire, del, setTags, - purgeTag + purgeTag, + global } -export default cache - -/** - * A library for caching/retrieving Response objects - */ -export { default as responseCache } from "./response" \ No newline at end of file +export default cache \ No newline at end of file diff --git a/v8env/test/fly.cache.notifier.spec.js b/v8env/test/fly.cache.notifier.spec.js new file mode 100644 index 00000000..d5bfc2e8 --- /dev/null +++ b/v8env/test/fly.cache.notifier.spec.js @@ -0,0 +1,33 @@ +import { expect } from "chai" + +import cache from "@fly/cache" + +function sleep(timeout) { + return new Promise((resolve, ) => { + setTimeout(resolve, timeout) + }) +} +describe("@fly/cache/global", () => { + it("sends del notification", async () => { + const key = "notifier_test_key_" + Math.random() + await cache.set(key, "asdf") + await cache.global.del(key) + + // TODO: this can be race-y, global del is not guaranteed to happen before the next read + await sleep(20) + const res = await cache.getString(key) + expect(res).to.eq(null) + + }) + + it("sends purgeTag notifications", async () => { + const key = "purge_test_key_" + Math.random() + await cache.set(key, "jklm", { tags: ["purge_test"] }) + + await cache.global.purgeTag("purge_test") + + await sleep(20) + const res = await cache.getString(key) + expect(res).to.eq(null) + }) +}) \ No newline at end of file