Skip to content

Commit

Permalink
feat: improve cache and ensure requests are deduped
Browse files Browse the repository at this point in the history
  • Loading branch information
AVVS committed Mar 19, 2022
1 parent 3039c89 commit 16c5db9
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 84 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"@makeomatic/deploy": "^12.6.3",
"@node-rs/xxhash-linux-x64-musl": "^1.2.0",
"@swc-node/register": "^1.4.2",
"@swc/core-linux-x64-musl": "^1.2.156",
"@swc/core-linux-x64-musl": "^1.2.158",
"multi-semantic-release": "^2.11.1",
"tsconfig-paths": "^3.14.0",
"typescript": "^4.6.2"
Expand Down
7 changes: 4 additions & 3 deletions packages/amqp-codec/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"devDependencies": {
"@node-rs/xxhash-linux-x64-musl": "^1.2.0",
"@swc-node/register": "^1.4.2",
"@swc/core-linux-x64-musl": "^1.2.155",
"@swc/core-linux-x64-musl": "^1.2.158",
"@types/debug": "^4.1.7",
"@types/jest": "^27.4.1",
"@types/node": "^17.0.21",
Expand All @@ -46,9 +46,10 @@
"typescript": "^4.6.2"
},
"dependencies": {
"debug": "^4.3.3"
"debug": "^4.3.4"
},
"files": [
"lib/"
"lib/",
"src/"
]
}
5 changes: 3 additions & 2 deletions packages/amqp-coffee/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"async": "^3.2.3",
"bson": "^4.6.1",
"bytes": "^3.1.2",
"debug": "^4.3.3",
"debug": "^4.3.4",
"fastq": "^1.13.0",
"ffi-napi": "^4.0.3",
"lodash": "^4.17.21",
Expand All @@ -47,7 +47,7 @@
"@makeomatic/deploy": "^12.6.3",
"@node-rs/xxhash-linux-x64-musl": "^1.2.0",
"@swc-node/register": "^1.4.2",
"@swc/core-linux-x64-musl": "^1.2.155",
"@swc/core-linux-x64-musl": "^1.2.158",
"@types/async": "^3.2.12",
"@types/bytes": "^3.1.1",
"@types/debug": "^4.1.7",
Expand Down Expand Up @@ -86,6 +86,7 @@
},
"files": [
"bin/src/",
"src/",
"test/*.js"
]
}
9 changes: 6 additions & 3 deletions packages/transport-amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"eslint-plugin-import": "^2.25.4",
"eslint-plugin-mocha": "^10.0.3",
"eslint-plugin-promise": "^6.0.0",
"eslint-plugin-unicorn": "^41.0.0",
"eslint-plugin-unicorn": "^41.0.1",
"jaeger-client": "^3.19.0",
"microtime": "^3.0.0",
"mocha": "^9.2.2",
Expand All @@ -65,23 +65,26 @@
"dependencies": {
"@microfleet/amqp-coffee": "workspace:^",
"@types/common-errors": "^1.0.2",
"async-cache-dedupe": "^1.2.2",
"bluebird": "^3.7.2",
"debug": "^4.3.3",
"debug": "^4.3.4",
"eventemitter3": "^4.0.7",
"flatstr": "^1.0.12",
"hashlru": "^2.3.0",
"hyperid": "^3.0.1",
"is": "^3.3.0",
"joi": "^17.6.0",
"json-stringify-safe": "^5.0.1",
"lodash": "^4.17.21",
"object-hash": "^3.0.0",
"pino": "^7.8.1",
"pino": "^7.9.1",
"read-pkg": "^5.2.0",
"type-fest": "^2.12.0",
"uuid": "^8.3.2"
},
"files": [
"lib/",
"src/",
"types/"
]
}
22 changes: 19 additions & 3 deletions packages/transport-amqp/src/amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { once } from 'events'
import os = require('os')
import is = require('is')
import assert = require('assert')
import hyperid = require('hyperid')
import {
NotPermittedError,
ArgumentError,
Expand Down Expand Up @@ -127,6 +128,7 @@ export class AMQPTransport extends EventEmitter {
private _replyTo: false | string | null = null
private _consumers = new Set<Consumer>()
private _boundEmit = this.emit.bind(this)
private getCorrelationId: hyperid.Instance

constructor(opts: PartialDeep<Configuration> = { version: 'n/a' }) {
super()
Expand All @@ -137,6 +139,7 @@ export class AMQPTransport extends EventEmitter {
})

this.config = config
this.getCorrelationId = hyperid({ urlSafe: true })

// prepares logger
this.log = loggerUtils.prepareLogger(config)
Expand All @@ -150,7 +153,7 @@ export class AMQPTransport extends EventEmitter {
* reply storage, where we'd save correlation ids
* and callbacks to be called once we are done
*/
this.replyStorage = new ReplyStorage()
this.replyStorage = new ReplyStorage(this.cache)

/**
* delay settings for reconnect
Expand Down Expand Up @@ -923,28 +926,41 @@ export class AMQPTransport extends EventEmitter {

// work with cache if options.cache is set and is number
// otherwise cachedResponse is always null
const cachedResponse = this.cache.get(message, options.cache)
const cachedResponse = this.cache.get({
route,
message,
headers: options.headers,
exchange: options.exchange,
routingKey: options.routingKey,
}, options.cache)

if (cachedResponse !== null && typeof cachedResponse === 'object') {
return adaptResponse(cachedResponse.value, replyOptions)
}

const { replyStorage } = this

// generate response id
const correlationId = options.correlationId || uuid.v4()
const correlationId = options.correlationId || this.getCorrelationId()

// timeout before RPC times out
const timeout = options.timeout || this.config.timeout

// slightly longer timeout, if message was not consumed in time, it will return with expiration
const future = replyStorage.push(correlationId, {
timer: null,
future: null,
timeout,
time,
routing: route,
replyOptions,
cache: cachedResponse,
})

if (future.deduped) {
return future.promise
}

// debugging
if (this.log.isLevelEnabled('trace')) {
this.log.trace('message pushed into reply queue in %s', latency(time))
Expand Down
26 changes: 19 additions & 7 deletions packages/transport-amqp/src/utils/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import { latency } from './latency'

export class Cache {
public readonly enabled: boolean

private readonly cache!: ReturnType<typeof HLRU>
private readonly dedupes = new Map<string, Promise<any>>()

/**
* @param size
Expand All @@ -20,11 +22,10 @@ export class Cache {

/**
*
* @param {any} message
* @param message
* @param maxAge
* @returns
*/
get(message: any, maxAge: number | undefined) {
get(message: any, maxAge: number | undefined): null | string | { maxAge: number, value: any } {
if (this.enabled === false) {
return null
}
Expand All @@ -49,11 +50,10 @@ export class Cache {

/**
*
* @param {string} key
* @param {any} data
* @returns
* @param key
* @param data
*/
set(key: string, data: any) {
set(key: string | undefined | null, data: any): null | void {
if (this.enabled === false) {
process.emitWarning('tried to use disabled cache', {
code: 'MF_AMQP_CACHE_0001',
Expand All @@ -69,4 +69,16 @@ export class Cache {

return this.cache.set(key, { maxAge: process.hrtime(), value: data })
}

dedupe(key: string): void | Promise<any> {
return this.dedupes.get(key)
}

storeDedupe(key: string, future: Promise<any>): void {
this.dedupes.set(key, future)
}

cleanDedupe(key?: string): void {
if (key) this.dedupes.delete(key)
}
}
54 changes: 38 additions & 16 deletions packages/transport-amqp/src/utils/reply-storage.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import Errors = require('common-errors');
import Errors = require('common-errors')
import { Cache } from './cache'
import { generateErrorMessage } from './error'

export interface Future<T = any> {
promise: Promise<T>
deduped: false
resolve(result?: T | PromiseLike<T>): void // promise resolve action.
reject(err?: Error | null): void // promise reject action.
}
Expand All @@ -15,7 +17,7 @@ export interface PushOptions {
replyOptions: Record<string, any>
timer: NodeJS.Timer | null
future: Future | null
cache?: any
cache?: string | null
}

const getFuture = <T = any>(): Future<T> => {
Expand All @@ -25,17 +27,20 @@ const getFuture = <T = any>(): Future<T> => {
future.reject = reject
})
future.promise = promise
future.deduped = false
return future
}

/**
* In-memory reply storage
*/
export class ReplyStorage {
private storage: Map<string, { [K in keyof PushOptions]: NonNullable<PushOptions[K]> }>
private readonly storage: Map<string, { [K in keyof PushOptions]: NonNullable<PushOptions[K]> }>
private readonly cache: Cache

constructor(Type = Map) {
this.storage = new Type()
constructor(cache: Cache) {
this.storage = new Map()
this.cache = cache
this.onTimeout = this.onTimeout.bind(this)
}

Expand All @@ -51,32 +56,47 @@ export class ReplyStorage {
return
}

const { routing, timeout, future } = rpcCall
const { routing, timeout, future, cache } = rpcCall

// clean-up
storage.delete(correlationId)
this.cache.cleanDedupe(cache)

// reject with a timeout error
future.reject(new Errors.TimeoutError(generateErrorMessage(routing, timeout)))
}

/**
* Stores correlation ID in the memory storage
* @param {string} correlationId
* @param {Object} opts
* @param correlationId
* @param opts
*/
push<T = any>(correlationId: string, opts: Omit<PushOptions, 'timer' | 'future'>): Future<T> {
const future = getFuture<T>()
const timer = setTimeout(this.onTimeout, opts.timeout, correlationId)
this.storage.set(correlationId, { ...opts, timer, future })
return future
push<T = any>(correlationId: string, opts: PushOptions): Future<T> | { deduped: true, promise: Promise<any> } {
if (typeof opts.cache === 'string') {
const pendingFuture = this.cache.dedupe(opts.cache)
if (pendingFuture) {
return { promise: pendingFuture, deduped: true }
}

opts.future = getFuture<T>()
this.cache.storeDedupe(opts.cache, opts.future.promise)
} else {
opts.future = getFuture<T>()
}

opts.timer = setTimeout(this.onTimeout, opts.timeout, correlationId)

// @ts-expect-error ^ we know timer is defined
this.storage.set(correlationId, opts)

return opts.future
}

/**
* Rejects stored promise with an error & cleans up
* Timeout error
* @param {string} correlationId
* @param {Error} error
* @param correlationId
* @param error
*/
reject(correlationId: string, error: Error): void {
const { storage } = this
Expand All @@ -85,13 +105,14 @@ export class ReplyStorage {
return
}

const { timer, future } = rpcCall
const { timer, future, cache } = rpcCall

// remove timer
clearTimeout(timer)

// remove reference
storage.delete(correlationId)
this.cache.cleanDedupe(cache)

// now resolve promise and return an error
future.reject(error)
Expand All @@ -114,6 +135,7 @@ export class ReplyStorage {

// remove reference to it
this.storage.delete(correlationId)
this.cache.cleanDedupe(future.cache)

// return data
return future
Expand Down
21 changes: 20 additions & 1 deletion packages/transport-amqp/test/amqp-transport.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,32 @@ describe('AMQPTransport', function AMQPTransportTestSuite() {

it('publishes batches of messages, they must return cached values and then new ones', async () => {
const publish = () => cached.publishAndWait('test.default', 1, { cache: 2000 })
const spy = sinon.spy(cached, 'publish')
const promises = [
publish(),
publish(),
publish(),
publish(),
publish(),
setTimeout(300).then(publish),
setTimeout(5000).then(publish),
]

const [initial, cachedP, nonCached] = await Promise.all(promises)
const [
initial,
one, two, three, four,
cachedP,
nonCached
] = await Promise.all(promises)

// only called twice - once after expiration and everything else is deduped or read from cache
assert.equal(spy.callCount, 2)

// all identical
assert.equal(toMiliseconds(initial.time), toMiliseconds(one.time))
assert.equal(toMiliseconds(initial.time), toMiliseconds(two.time))
assert.equal(toMiliseconds(initial.time), toMiliseconds(three.time))
assert.equal(toMiliseconds(initial.time), toMiliseconds(four.time))

assert.equal(toMiliseconds(initial.time), toMiliseconds(cachedP.time))
assert(toMiliseconds(initial.time) < toMiliseconds(nonCached.time))
Expand Down
Loading

0 comments on commit 16c5db9

Please sign in to comment.