Skip to content

Commit

Permalink
wip - redis references
Browse files Browse the repository at this point in the history
  • Loading branch information
Simone Sanfratello committed Nov 15, 2021
1 parent 95f5525 commit 0e826d5
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 27 deletions.
44 changes: 44 additions & 0 deletions examples/redis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict'

const createStorage = require('../storage')
const { Cache } = require('../')
const Redis = require('ioredis')

// TODO

async function main () {
const redisClient = new Redis()

const cache = new Cache({
ttl: 2, // default ttl, in seconds
storage: createStorage('redis', { client: redisClient, log: console }),
onDedupe: (key) => {
console.log('deduped', key)
}
})

cache.define('fetchSomething', {
references: (args, key, result) => ['somethings', `some-${result}`]
}, async (k) => {
console.log('query', k)
// query 42
// query 24

return { k }
})

const p1 = cache.fetchSomething(42)
const p2 = cache.fetchSomething(24)
const p3 = cache.fetchSomething(42)

const res = await Promise.all([p1, p2, p3])

console.log(res)
}
// [
// { k: 42 },
// { k: 24 }
// { k: 42 }
// ]

main()
1 change: 1 addition & 0 deletions storage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ function createStorage (type, options) {
if (type === StorageOptionsType.redis) {
return new StorageRedis(options)
}
// TODO memory without invalidation
return new StorageMemory(options)
}

Expand Down
4 changes: 3 additions & 1 deletion storage/memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const DEFAULT_CACHE_SIZE = 1024
* @property {?Logger} [log]
*/

// TODO use a worker thread to sync references as well as redis

class StorageMemory extends StorageInterface {
/**
* in-memory storage
Expand All @@ -30,7 +32,7 @@ class StorageMemory extends StorageInterface {
init () {
this.store = new LRUCache(this.size)
// key -> references, keys are strings, references are sorted array strings
// TODO use a btree instead of array to speed up writes
// TODO use a btree instead of array to speed up writes? do benchmarks before
this.keysReferences = new Map()
// same as above, bunt inverted
this.referencesKeys = new Map()
Expand Down
83 changes: 83 additions & 0 deletions storage/redis-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
'use strict'

const { workerData } = require('worker_threads')
const Redis = require('ioredis')

async function sync (options) {
options = options ? JSON.parse(options) : {}
const db = options.db || 0

const redisSubscription = new Redis({
...options,
db,
connectionName: 'acd-redis-worker-sub',
readOnly: true
})

const redisClient = new Redis({
...options,
db,
connectionName: 'acd-redis-worker-rw'
})

try {
const subscription = await redisSubscription.subscribe(`__keyevent@${db}__:expire`)
console.log({ subscription })
// TODO if subscription !== 1 throw new Error('cant subscribe to redis')
} catch (err) {
console.error(err)
// TODO log.error
throw err
}

console.log('****************')

redisSubscription.on('message', (channel, key) => {
clearReferences(redisClient, key)
})

// @see https://redis.io/topics/notifications
// redis-cli config set notify-keyspace-events KEA
// redis-cli --csv psubscribe '__key*__:*'
}

async function clearReferences (redisClient, key) {
console.log(`redis-worker, removed ${key}`)
try {
const references = await redisClient.smembers(`k:${key}`)

if (!references || references.length < 1) {
console.log('no references for:', key)
return
}

console.log({ references })

const reads = references.map(reference => (['smembers', `r:${reference}`]))

console.log({ reads })

const referencesKeys = await redisClient.pipeline(reads).exec()

console.log({ referencesKeys })

const writes = []
for (let i = 0; i < referencesKeys.length; i++) {
const rk = referencesKeys[i]
rk[1].forEach(r => {
console.log({ r })
writes.push(['srem', 'r:' + references[i], key])
})
}
writes.push(['del', 'k:' + key])

console.log({ writes })

await redisClient.pipeline(writes).exec()
} catch (err) {
// TODO log.error
console.error(err)
}
}

sync(workerData)
49 changes: 39 additions & 10 deletions storage/redis.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
'use strict'

const path = require('path')
const stringify = require('safe-stable-stringify')
const nullLogger = require('abstract-logging')
const StorageInterface = require('./interface')
const { worker } = require('../util')

/**
* @typedef StorageRedisOptions
Expand All @@ -16,9 +18,33 @@ class StorageRedis extends StorageInterface {
*/
constructor (options) {
// TODO validate options
// if (!options.client) {
// throw new Error('Redis client is required')
// }
super(options)
this.store = options.client

this.log = options.log || nullLogger

this.store = options.client

// TODO option disable invalidation

if (options && options.client && options.client.options) {
// TODO check "notify-keyspace-events KEA" on redis
// TODO tried piscina with no luck
const sync = worker(path.resolve(__dirname, 'redis-worker.js'), stringify(options.client.options))
this.worker = sync.worker
sync.promise
.then(data => { this.log.debug({ msg: 'acd/storage/redis worker ends', data }) })
.catch(err => { this.log.error({ msg: 'acd/storage/redis worker error', err }) })
}
}

async end () {
if (!this.worker) {
return
}
this.worker.terminate()
}

/**
Expand Down Expand Up @@ -46,6 +72,7 @@ class StorageRedis extends StorageInterface {
* @param {?string[]} references
*/
async set (key, value, ttl, references) {
// TODO can keys contains * or so?
this.log.debug({ msg: 'acd/storage/redis.set key', key, value, ttl, references })

ttl = Number(ttl)
Expand All @@ -56,43 +83,45 @@ class StorageRedis extends StorageInterface {
try {
await this.store.set(key, stringify(value), 'EX', ttl)

if (!references) {
if (!references) { // TODO || !this.options.invalidation
return
}

const writes = []
for (let i = 0; i < references.length; i++) {
const reference = references[i]
this.log.debug({ msg: 'acd/storage/redis.set reference', key, reference })
// r: -> reference->keys
writes.push(['sadd', 'r:' + reference, key])
}
// k: -> key-references
writes.push(['sadd', 'k:' + key, references])

await this.store.pipeline(writes).exec()
} catch (err) {
this.log.error({ msg: 'acd/storage/redis.set error', err, key, ttl, references })
}
// TODO clear references of expired keys
}

/**
* remove all entries if name is not provided
* remove entries where key starts with name if provided
* TODO sync references
* @param {?string} name
* remove an entry by key
* @param {string} key
* @returns {boolean} indicates if key was removed
*/
async remove (key) {
this.log.debug({ msg: 'acd/storage/redis.remove', key })
try {
this.store.del(key)
return await this.store.del(key) > 0
} catch (err) {
this.log.error({ msg: 'acd/storage/redis.remove error', err, key })
}
}

/**
* TODO sync references
* @param {string[]} references
*/
async invalidate (references) {
// TODO if(!this.options.invalidation) { return }
this.log.debug({ msg: 'acd/storage/redis.invalidate', references })

try {
Expand Down Expand Up @@ -130,7 +159,7 @@ class StorageRedis extends StorageInterface {
return
}

const keys = await this.store.keys(name + '*')
const keys = await this.store.keys(`${name}*`)
this.log.debug({ msg: 'acd/storage/redis.clear keys', keys })

const removes = keys.map(key => ['del', key])
Expand Down
6 changes: 3 additions & 3 deletions test/storage-memory.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ test('storage memory', async (t) => {
const storage = createStorage('memory')
await storage.set('foo', 'bar', 10, ['fooers'])

await storage.remove('foo')
t.equal(await storage.remove('foo'), true)

t.equal(await storage.get('foo'), undefined)
})
Expand All @@ -193,7 +193,7 @@ test('storage memory', async (t) => {
const storage = createStorage('memory')
await storage.set('foo', 'bar', 10, ['fooers'])

await storage.remove('fooz')
t.equal(await storage.remove('fooz'), false)

t.equal(await storage.get('foo'), 'bar')
t.equal(await storage.get('fooz'), undefined)
Expand All @@ -207,7 +207,7 @@ test('storage memory', async (t) => {
await storage.set('d', 1, 10, ['consonantes'])
await storage.set('e', 1, 10, ['vowels'])

await storage.remove('a')
t.equal(await storage.remove('a'), true)

t.equal(await storage.get('a'), undefined)
t.equal(await storage.get('b'), 1)
Expand Down
Loading

0 comments on commit 0e826d5

Please sign in to comment.