Skip to content

Commit

Permalink
feat: adds cache.set onlyIfEmpty option
Browse files Browse the repository at this point in the history
This keeps the cache from overwriting data at an existing key.

```javascript

const res = await cache.set("key", "asdf", { onlyIfEmpty: true })
if(!res){
  console.log("cache key has data, didn't overwrite")
}

```

It can also be used as a regional lock.

```javascript
const pid = "instance-" + Math.random()
// ttl lets the lock expire after 5s
const opts = { ttl: 5, onlyIfEmpty: true }
const hasLock = await cache.set("lock-key", pid, opts)

if(hasLock){
  // do some stuff
  //release lock
  await cache.del("lock-key")
}

```
  • Loading branch information
mrkurt committed Aug 22, 2018
1 parent 33a3e63 commit 986cefc
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 67 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ All notable changes to this project will be documented in this file. See [standa

<a name="0.40.0-caching.6"></a>
# [0.40.0-caching.6](https://github.com/superfly/fly/compare/v0.39.0...v0.40.0-caching.6) (2018-08-22)
<a name="0.40.0-caching.3"></a>
# [0.40.0-caching.3](https://github.com/superfly/fly/compare/v0.39.0...v0.40.0-caching.3) (2018-08-22)


### Bug Fixes
Expand All @@ -19,7 +21,9 @@ All notable changes to this project will be documented in this file. See [standa

### Features

* adds cache.set onlyIfEmpty option ([43e9601](https://github.com/superfly/fly/commit/43e9601))
* cache tagging, expire related cache entries ([#114](https://github.com/superfly/fly/issues/114)) ([2242a5a](https://github.com/superfly/fly/commit/2242a5a))
* global cache del/purgeTags ([703cfe6](https://github.com/superfly/fly/commit/703cfe6))



Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,4 @@
],
"all": true
}
}
}
6 changes: 3 additions & 3 deletions src/bridge/fly/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import log from '../../log'
import { transferInto } from '../../utils/buffer'
import { Bridge } from '../bridge';
import { Runtime } from '../../runtime';
import { CacheNotifierOperation, isCacheNotifierOperation } from '../../cache_notifier';
import { CacheOperation, isCacheOperation } from '../../cache_notifier';

const errCacheStoreUndefined = new Error("cacheStore is not defined in the config.")
const errCacheNotifierUndefined = new Error("cacheNotifier is not defined in the config")
Expand Down Expand Up @@ -117,13 +117,13 @@ registerBridge('flyCachePurgeTags',


registerBridge('flyCacheNotify',
function cacheDel(rt: Runtime, bridge: Bridge, type: string | CacheNotifierOperation, key: string, callback: ivm.Reference<Function>) {
function cacheDel(rt: Runtime, bridge: Bridge, type: string | CacheOperation, key: string, callback: ivm.Reference<Function>) {
if (!bridge.cacheStore || !bridge.cacheNotifier) {
callback.applyIgnored(null, [errCacheNotifierUndefined.toString()])
return
}

if (!isCacheNotifierOperation(type)) {
if (!isCacheOperation(type)) {
callback.applyIgnored(null, ["Invalid cache notification type"])
return
}
Expand Down
68 changes: 44 additions & 24 deletions src/cache_notifier.ts
Original file line number Diff line number Diff line change
@@ -1,51 +1,68 @@
import { CacheStore } from "./cache_store"
import { RedisCacheNotifier } from "./redis_cache_notifier"
import { hostname } from "os";

export enum CacheNotifierOperation {
export enum CacheOperation {
del = "del",
purgeTag = "purgeTag"
}

export interface CacheNotifyMessage {
type: CacheOperation,
ns: string,
value: string,
ts: number
}

export interface ReceiveHandler {
(op: CacheNotifierOperation, ns: string, value: string): void
(msg: CacheNotifyMessage): void
}
export interface CacheNotifierAdapter {
send(op: CacheNotifierOperation, ns: string, value: string): Promise<boolean>
send(op: CacheNotifyMessage): Promise<boolean>
start(handler: ReceiveHandler): void
}

export class CacheNotifier {
pid: string
constructor(
public cacheStore: CacheStore,
public adapter: CacheNotifierAdapter
) {
adapter.start((type, ns, value) => this.handle(type, ns, value))
this.pid = `${hostname()}-${process.pid}`
adapter.start(this.handle.bind(this))
}

async send(op: CacheNotifierOperation, ns: string, value: string) {
return this.adapter.send(op, ns, value)
async send(type: CacheOperation, ns: string, value: string) {
return this.adapter.send({ type, ns, value, ts: Date.now() })
}

async handle(type: CacheNotifierOperation, ns: string, value: string) {
switch (type) {
case CacheNotifierOperation.del: {
return await this.cacheStore.del(ns, value)
async handle({ type, ns, value, ts }: CacheNotifyMessage): Promise<boolean> {
const lockKey = "lock:" + [type, value, ts].join(":")
const hasLock = await this.cacheStore.set(ns, lockKey, this.pid, { ttl: 10, onlyIfEmpty: true })
if (!hasLock) return false
try {
switch (type) {
case CacheOperation.del: {
return await this.cacheStore.del(ns, value)
}
case CacheOperation.purgeTag: {
const res = await this.cacheStore.purgeTag(ns, value)
return res.length > 0
}
default:
throw new Error(`Unknown CacheNotifierOperation: ${type}`)
}
case CacheNotifierOperation.purgeTag: {
const res = await this.cacheStore.purgeTag(ns, value)
return res.length > 0
}
default:
throw new Error(`Unknown CacheNotifierOperation: ${type}`)
} finally {
// clean up lock
await this.cacheStore.del(ns, lockKey)
}
}

}

export function isCacheNotifierOperation(op: any): op is CacheNotifierOperation {
export function isCacheOperation(op: any): op is CacheOperation {
if (typeof op !== "string") return false

const v = Object.getOwnPropertyNames(CacheNotifierOperation)
const v = Object.getOwnPropertyNames(CacheOperation)

if (v.includes(op)) {
return true
Expand All @@ -55,11 +72,14 @@ export function isCacheNotifierOperation(op: any): op is CacheNotifierOperation

export class LocalCacheNotifier implements CacheNotifierAdapter {
private _handler: ReceiveHandler | undefined
async send(op: CacheNotifierOperation, ns: string, value: string) {
if (this._handler) {
this._handler(op, ns, value)
}
return true
async send(msg: CacheNotifyMessage) {
// using setImmediate here to fake an async adapter
return new Promise<boolean>((resolve, ) => {
setImmediate(() => {
this._handler && this._handler(msg)
resolve(true)
})
})
}

start(handler: ReceiveHandler) {
Expand Down
3 changes: 2 additions & 1 deletion src/cache_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { MemoryCacheStore } from "./memory_cache_store";

export interface CacheSetOptions {
ttl?: number,
tags?: string[]
tags?: string[],
onlyIfEmpty?: boolean
}
export interface CacheStore {
get(ns: string, key: string): Promise<Buffer | null>
Expand Down
26 changes: 22 additions & 4 deletions src/memory_cache_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,22 @@ export class MemoryCacheStore implements CacheStore {
const k = keyFor(ns, key)
const pipeline = this.redis.pipeline()
let ttl: number | undefined
let mode: string | undefined
if (typeof options === "number") {
ttl = options
} else if (options) {
ttl = options.ttl
mode = options.onlyIfEmpty && "NX" || undefined
}

if (mode) {
const p = ttl ?
this.redis.set(k, value, "EX", ttl, "NX") :
this.redis.set(k, value, "NX")
const result = await p
// this happens if the key already exists
if (result !== "OK") return false
}
if (ttl && !isNaN(ttl)) {
pipeline.set(k, value, 'EX', ttl)
} else {
Expand All @@ -49,13 +59,21 @@ export class MemoryCacheStore implements CacheStore {
}

async del(ns: string, key: string): Promise<boolean> {
console.log("memory cache del", key)
const result = await this.redis.del(keyFor(ns, key))
return result === OK
key = keyFor(ns, key)
await Promise.all([
this.redis.del(key),
this.redis.del(key + ":tags")
])
return true
}

async expire(ns: string, key: string, ttl: number): Promise<boolean> {
return (await this.redis.expire(keyFor(ns, key), ttl)) === 1
key = keyFor(ns, key)
await Promise.all([
this.redis.expire(key, ttl),
this.redis.expire(key + ":tags", ttl)
])
return true
}

async ttl(ns: string, key: string): Promise<number> {
Expand Down
37 changes: 11 additions & 26 deletions src/redis_cache_notifier.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CacheNotifierAdapter, CacheNotifierOperation, ReceiveHandler } from "./cache_notifier";
import { RedisClient, ClientOpts } from "redis";
import { CacheNotifierAdapter, CacheOperation, ReceiveHandler, CacheNotifyMessage } from "./cache_notifier";
import { RedisClient } from "redis";
import { RedisConnectionOptions, initRedisClient } from "./redis_adapter";
import { promisify } from "util";

Expand Down Expand Up @@ -27,18 +27,13 @@ export class RedisCacheNotifier implements CacheNotifierAdapter {
this.writer = initRedisClient(opts.writer)
}

send(type: CacheNotifierOperation, ns: string, value: string) {
const msg = {
type: type,
ns: ns,
value: value
}

send(msg: CacheNotifyMessage) {
return new Promise<boolean>((resolve, reject) => {
this.writer.zadd(notifierKey, Date.now(), JSON.stringify(msg), (err, _) => {
this.writer.zadd(notifierKey, msg.ts, JSON.stringify(msg), (err, _) => {
if (err) {
return reject(err)
}
this.writer.zremrangebyscore(notifierKey, 0, Date.now() - 600) // only keep 10 min
resolve(true)
})
})
Expand All @@ -47,10 +42,10 @@ export class RedisCacheNotifier implements CacheNotifierAdapter {
this._handler = handler

const configAsync = promisify(this.subscriber.config).bind(this.subscriber);
const zrangebyscore = promisify(this.reader.zrangebyscore).bind(this.reader)
const zrangebyscore = promisify(this.reader.zrangebyscore).bind(this.reader);

let [, conf] = await configAsync("get", "notify-keyspace-events")
if (!conf.includes("E") && !conf.includes('z')) {
if (!conf.includes("E") || !conf.includes('z')) {
conf = conf + "KEz"
console.log("Enabling zset notifications in redis:", conf)
await configAsync("set", "notify-keyspace-events", conf)
Expand All @@ -68,7 +63,7 @@ export class RedisCacheNotifier implements CacheNotifierAdapter {
try {
const msg = JSON.parse(raw)
if (this._handler && isNotifierMessage(msg)) {
this._handler(msg.type, msg.ns, msg.value)
this._handler(msg)
}
} catch (err) {
console.error("Error handling cache notifier:", err)
Expand All @@ -79,28 +74,18 @@ export class RedisCacheNotifier implements CacheNotifierAdapter {
}
}

function tryLock(redis: RedisClient, id: string, cb: Function) {
return new Promise<boolean>((resolve, reject) => {
redis.set("notifier:cache:lock", id, "NX", (err, result) => {
if (err) {
return reject(err)
}
resolve(result === "OK")
})
})
}

function isRedisCacheNotifierConfig(opts: any): opts is RedisCacheNotifierConfig {
if (typeof opts === "object" && opts.reader) {
return true
}
return false
}

function isNotifierMessage(msg: any): msg is { type: CacheNotifierOperation, ns: string, value: string } {
function isNotifierMessage(msg: any): msg is CacheNotifyMessage {
if (typeof msg.type === "string" &&
typeof msg.ns === "string" &&
typeof msg.value === "string") {
typeof msg.value === "string" &&
typeof msg.ts === "number") {
return true
}
return false
Expand Down
23 changes: 17 additions & 6 deletions src/redis_cache_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ export class RedisCacheStore implements CacheStore {
}

async set(ns: string, key: string, value: any, options?: CacheSetOptions | number): Promise<boolean> {
const start = Date.now()
const k = keyFor(ns, key)
let ttl: number | undefined
if (typeof options === "number") {
Expand All @@ -32,11 +31,23 @@ export class RedisCacheStore implements CacheStore {
ttl = options.ttl
}
const commands = new Array<any>()
if (ttl) {
commands.push(this.redis.setAsync(k, value, 'EX', ttl))
if (typeof options === "object" && options.onlyIfEmpty === true) {
// can't pipeline set NX
const p = ttl ?
this.redis.setAsync(k, value, "EX", ttl, "NX") :
this.redis.setAsync(k, value, "NX")
const result = await p
// this happens if the key already exists
if (result !== "OK") return false

// otherwise we carry on
} else {

commands.push(this.redis.setAsync(k, value))
if (ttl) {
this.redis.setAsync(k, value, "EX", ttl, "NX")
commands.push(this.redis.setAsync(k, value, 'EX', ttl))
} else {
commands.push(this.redis.setAsync(k, value))
}
}
if (typeof options === "object" && options && options.tags instanceof Array) {
commands.push(this.redis.saddAsync(k + ":tags", options.tags))
Expand Down Expand Up @@ -143,7 +154,7 @@ async function* setScanner(redis: FlyRedis, key: string) {

class FlyRedis {
getBufferAsync: (key: Buffer | string) => Promise<Buffer>
setAsync: (key: string, value: Buffer, mode?: number | string, duration?: number) => Promise<"OK" | undefined>
setAsync: (key: string, value: Buffer, mode?: number | string, duration?: number, exists?: string) => Promise<"OK" | undefined>
expireAsync: (key: string, ttl: number) => Promise<boolean>
ttlAsync: (key: string) => Promise<number>
delAsync: (...keys: string[]) => Promise<boolean>
Expand Down
13 changes: 11 additions & 2 deletions v8env/src/fly/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
declare var bridge: any
export interface CacheSetOptions {
ttl?: number,
tags?: string[]
tags?: string[],
onlyIfEmpty?: boolean
}


Expand Down Expand Up @@ -147,7 +148,15 @@ export function purgeTag(tag: string) {
* @returns true if delete was successful
*/
export function del(key: string) {
return expire(key, 0)
return new Promise<boolean>(function cacheDelPromise(resolve, reject) {
bridge.dispatch("flyCacheDel", key, function cacheDelCallback(err: string | null, ok?: boolean) {
if (err != null) {
reject(err)
return
}
resolve(ok)
})
})
}

/**
Expand Down
11 changes: 11 additions & 0 deletions v8env/test/fly.cache.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,15 @@ describe("@fly/cache", () => {
const result = await cache.getString(k)
expect(result).to.eq('')
})

it("handles set.onlyIfEmpty", async () => {
const k = `cache-test${Math.random()}`
await cache.set(k, 'asdf')

const setResult = await cache.set(k, 'jklm', { onlyIfEmpty: true })
const v = await cache.getString(k)

expect(setResult).to.eq(false)
expect(v).to.eq("asdf")
})
})

0 comments on commit 986cefc

Please sign in to comment.