Skip to content

Commit

Permalink
feat: global cache del/purgeTag
Browse files Browse the repository at this point in the history
Purge cache entries globally using `cache.global`.

```javascript
import cache from "@fly/cache"

// delete a single key everywhere
await cache.global.del(key)

// delete every key with a tag
await cache.global.purgeTag(key)
```
  • Loading branch information
mrkurt committed Aug 22, 2018
1 parent 7561320 commit 33a3e63
Show file tree
Hide file tree
Showing 15 changed files with 408 additions and 80 deletions.
28 changes: 25 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ compiler: clang
matrix:
include:
- os: linux
name: "Linux + node 8"
addons: &linux_addons
apt:
sources:
Expand All @@ -15,23 +16,44 @@ matrix:
- nasm
env: &linux_env
- MATRIX_EVAL="export CXX=g++-5"
- REDIS_CACHE_URL=redis://localhost/
node_js: 8
- ox: linux
- os: linux
name: "Linux + node 10.3"
addons: *linux_addons
env: *linux_env
node_js: 10.3
- ox: linux
- os: linux
name: "Linux + node 10.8"
addons: *linux_addons
env: *linux_env
node_js: 10.8.0

- os: linux
name: "Linux + Redis + node 8"
addons: *linux_addons
env: &linux_env_redis
- MATRIX_EVAL="export CXX=g++-5" REDIS_CACHE_URL=redis://localhost/ REDIS_CACHE_NOTIFICATION_URL=redis://localhost
node_js: 8
- os: linux
name: "Linux + Redis + node 10.3"
addons: *linux_addons
env: *linux_env_redis
node_js: 10.3
- os: linux
name: "Linux + Redis + node 10.8"
addons: *linux_addons
env: *linux_env_redis
node_js: 10.8.0
- os: osx
name: "OSX + Node 8"
osx_image: xcode9.4
node_js: 8
- os: osx
name: "OSX + Node 10.3"
osx_image: xcode9.4
node_js: 10.3
- os: osx
name: "OSX + Node 10.8"
osx_image: xcode9.4
node_js: 10.8.0

Expand Down
5 changes: 4 additions & 1 deletion src/bridge/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { catalog, BridgeFunction } from './'
import { Runtime } from '../runtime';
import { DataStore } from '../data_store';
import { defaultCacheStore } from '../cache_store';
import { defaultCacheNotifier, CacheNotifier } from '../cache_notifier';

const errNoSuchBridgeFn = "Attempted to call a unregistered bridge function."

Expand All @@ -40,20 +41,22 @@ export interface BridgeOptions {
cacheStore?: CacheStore
fileStore?: FileStore
dataStore?: DataStore
cacheNotifier?: CacheNotifier
}

/**
* @private
*/
export class Bridge {
cacheStore: CacheStore
fileStore?: FileStore
dataStore?: DataStore
cacheNotifier?: CacheNotifier

functions: Map<string, BridgeFunction>

constructor(opts: BridgeOptions = {}) {
this.cacheStore = opts.cacheStore || defaultCacheStore()
this.cacheNotifier = opts.cacheNotifier || defaultCacheNotifier(this.cacheStore)
this.fileStore = opts.fileStore
this.dataStore = opts.dataStore
this.functions = new Map<string, BridgeFunction>(
Expand Down
40 changes: 31 additions & 9 deletions src/bridge/fly/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import log from '../../log'
import { transferInto } from '../../utils/buffer'
import { Bridge } from '../bridge';
import { Runtime } from '../../runtime';
import { CacheNotifierOperation, isCacheNotifierOperation } from '../../cache_notifier';

const errCacheStoreUndefined = new Error("cacheStore is not defined in the config.")
const errCacheNotifierUndefined = new Error("cacheNotifier is not defined in the config")

registerBridge('flyCacheSet', function cacheSet(rt: Runtime, bridge: Bridge, key: string, value: ArrayBuffer | string, options: string | undefined, callback: ivm.Reference<Function>) {
if (!bridge.cacheStore) {
Expand All @@ -27,7 +29,7 @@ registerBridge('flyCacheSet', function cacheSet(rt: Runtime, bridge: Bridge, key
callback.applyIgnored(null, [err.toString()])
return
}
bridge.cacheStore.set(rt, key, buf, opts).then((ok) => {
bridge.cacheStore.set(rt.app.id, key, buf, opts).then((ok) => {
rt.reportUsage("cache:set", { size: buf.byteLength })
callback.applyIgnored(null, [null, ok])
}).catch((err) => {
Expand All @@ -43,7 +45,7 @@ registerBridge('flyCacheExpire', function cacheExpire(rt: Runtime, bridge: Bridg
return
}

bridge.cacheStore.expire(rt, key, ttl).then((ok) => {
bridge.cacheStore.expire(rt.app.id, key, ttl).then((ok) => {
callback.applyIgnored(null, [null, ok])
}).catch((err) => {
callback.applyIgnored(null, [err.toString()])
Expand All @@ -57,7 +59,7 @@ registerBridge('flyCacheGet',
return
}

bridge.cacheStore.get(rt, key).then((buf) => {
bridge.cacheStore.get(rt.app.id, key).then((buf) => {
rt.reportUsage("cache:get", { size: buf ? buf.byteLength : 0 })
callback.applyIgnored(null, [null, transferInto(buf)])
}).catch((err) => {
Expand All @@ -73,11 +75,11 @@ registerBridge('flyCacheDel',
return
}

bridge.cacheStore.del(rt, key).then((result) => {
bridge.cacheStore.del(rt.app.id, key).then((result) => {
callback.applyIgnored(null, [null, !!result])
}).catch((err) => {
log.error("got err in cache.del", err)
callback.applyIgnored(null, [null, null]) // swallow errors on get for now
callback.applyIgnored(null, [err.toString()])
})
})

Expand All @@ -88,11 +90,11 @@ registerBridge('flyCacheSetTags',
return
}

bridge.cacheStore.setTags(rt, key, tags).then((result) => {
bridge.cacheStore.setTags(rt.app.id, key, tags).then((result) => {
callback.applyIgnored(null, [null, result])
}).catch((err) => {
log.error("got err in cache.setTags", err)
callback.applyIgnored(null, [null, null]) // swallow errors on get for now
callback.applyIgnored(null, [err.toString()])
})
})

Expand All @@ -103,12 +105,32 @@ registerBridge('flyCachePurgeTags',
return
}

bridge.cacheStore.purgeTags(rt, key).then((result) => {
bridge.cacheStore.purgeTag(rt.app.id, key).then((result) => {
setImmediate(() => {
callback.applyIgnored(null, [null, JSON.stringify(result)])
})
}).catch((err) => {
log.error("got err in cache.purgeTags", err)
callback.applyIgnored(null, [null, null]) // swallow errors on get for now
callback.applyIgnored(null, [err.toString()])
})
})


registerBridge('flyCacheNotify',
function cacheDel(rt: Runtime, bridge: Bridge, type: string | CacheNotifierOperation, key: string, callback: ivm.Reference<Function>) {
if (!bridge.cacheStore || !bridge.cacheNotifier) {
callback.applyIgnored(null, [errCacheNotifierUndefined.toString()])
return
}

if (!isCacheNotifierOperation(type)) {
callback.applyIgnored(null, ["Invalid cache notification type"])
return
}
bridge.cacheNotifier.send(type, rt.app.id, key).then((result) => {
callback.applyIgnored(null, [null, !!result])
}).catch((err) => {
log.error("got err in cacheNotify", err)
callback.applyIgnored(null, [err.toString()])
})
})
86 changes: 86 additions & 0 deletions src/cache_notifier.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { CacheStore } from "./cache_store"
import { RedisCacheNotifier } from "./redis_cache_notifier"

export enum CacheNotifierOperation {
del = "del",
purgeTag = "purgeTag"
}

export interface ReceiveHandler {
(op: CacheNotifierOperation, ns: string, value: string): void
}
export interface CacheNotifierAdapter {
send(op: CacheNotifierOperation, ns: string, value: string): Promise<boolean>
start(handler: ReceiveHandler): void
}

export class CacheNotifier {
constructor(
public cacheStore: CacheStore,
public adapter: CacheNotifierAdapter
) {
adapter.start((type, ns, value) => this.handle(type, ns, value))
}

async send(op: CacheNotifierOperation, ns: string, value: string) {
return this.adapter.send(op, ns, value)
}

async handle(type: CacheNotifierOperation, ns: string, value: string) {
switch (type) {
case CacheNotifierOperation.del: {
return await this.cacheStore.del(ns, value)
}
case CacheNotifierOperation.purgeTag: {
const res = await this.cacheStore.purgeTag(ns, value)
return res.length > 0
}
default:
throw new Error(`Unknown CacheNotifierOperation: ${type}`)
}
}

}

export function isCacheNotifierOperation(op: any): op is CacheNotifierOperation {
if (typeof op !== "string") return false

const v = Object.getOwnPropertyNames(CacheNotifierOperation)

if (v.includes(op)) {
return true
}
return false
}

export class LocalCacheNotifier implements CacheNotifierAdapter {
private _handler: ReceiveHandler | undefined
async send(op: CacheNotifierOperation, ns: string, value: string) {
if (this._handler) {
this._handler(op, ns, value)
}
return true
}

start(handler: ReceiveHandler) {
this._handler = handler
}
/*switch(type) {
case CacheNotifierOperation.del:
return await this.cacheStore.del(ns, key)
case CacheNotifierOperation.purgeTags:
return !!(await this.cacheStore.purgeTags(ns, key))
default:
throw new Error(`Unknown CacheNotifierOperation: ${type}`)
}
}*/
}//*/e

export function defaultCacheNotifier(cacheStore: CacheStore) {
let adapter: CacheNotifierAdapter = new LocalCacheNotifier()
if (process.env.REDIS_CACHE_NOTIFIER_URL) {
console.log("Using Redis Cache Notifier")
adapter = new RedisCacheNotifier(process.env.REDIS_CACHE_NOTIFIER_URL)
}
return new CacheNotifier(cacheStore, adapter)
}
14 changes: 7 additions & 7 deletions src/cache_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ export interface CacheSetOptions {
tags?: string[]
}
export interface CacheStore {
get(rt: Runtime, key: string): Promise<Buffer | null>
set(rt: Runtime, key: string, value: any, options?: CacheSetOptions | number): Promise<boolean>
del(rt: Runtime, key: string): Promise<boolean>
expire(rt: Runtime, key: string, ttl: number): Promise<boolean>
ttl(rt: Runtime, key: string): Promise<number>,
setTags(rt: Runtime, key: string, tags: string[]): Promise<boolean>,
purgeTags(rt: Runtime, tags: string): Promise<string[]>
get(ns: string, key: string): Promise<Buffer | null>
set(ns: string, key: string, value: any, options?: CacheSetOptions | number): Promise<boolean>
del(ns: string, key: string): Promise<boolean>
expire(ns: string, key: string, ttl: number): Promise<boolean>
ttl(ns: string, key: string): Promise<number>,
setTags(ns: string, key: string, tags: string[]): Promise<boolean>,
purgeTag(ns: string, tag: string): Promise<string[]>
rand?: number
}

Expand Down
7 changes: 5 additions & 2 deletions src/cmd/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import { Command } from 'commandpost/lib';
import * as path from 'path'
import * as fs from 'fs'
import { spawn } from 'child_process'
import { FileAppStore } from "../file_app_store"
import { Server } from "../server"
import { RedisCacheNotifier } from '../redis_cache_notifier';

interface ServerOptions {
port?: string
Expand All @@ -21,8 +24,8 @@ root
.option("--inspect", "use the v8 inspector on your fly app")
.option("--uglify", "uglify your code like we'll use in production (warning: slow!)")
.action(async function (this: Command<ServerOptions, ServerArguments>, opts, args, rest) {
const { FileAppStore } = require('../file_app_store')
const { Server } = require('../server')
//const { FileAppStore } = require('../file_app_store')
//const { Server } = require('../server')

let cwd = args.path || process.cwd()
if (!fs.existsSync(cwd)) {
Expand Down

0 comments on commit 33a3e63

Please sign in to comment.