Navigation Menu

Skip to content

Commit

Permalink
implement event listener caching
Browse files Browse the repository at this point in the history
  • Loading branch information
poelzi committed Sep 14, 2011
1 parent 6a760ea commit 1fdda86
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 18 deletions.
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{ "name": "flexcache"
, "description": "flexible cacher for async functions with switchable backends. redis/memory"
, "version": "0.0.5"
, "version": "0.0.6"
, "homepage": "https://github.com/poelzi/node-flexcache"
, "author": "poelzi (http://poelzi.org)"
, "repository": {"type": "git", "url": "git://github.com/poelzi/node-flexcache.git"}
Expand Down
100 changes: 86 additions & 14 deletions src/flexcache.coffee
Expand Up @@ -13,6 +13,7 @@ quack = require 'quack-array'
hexy = require('hexy').hexy
hashlib = require('hashlib')
assert = require('assert')
{ EventEmitter } = require('events')


class Flexcache
Expand Down Expand Up @@ -69,48 +70,119 @@ class Flexcache
clear_hash: (group, hash, cb) =>
@backend.clear_hash group, hash, cb

calculate_size: (obj) ->
if typeof(obj) == "string" or Buffer.isBuffer(obj)
return obj.length
else if typeof(obj) == "object"
rv = 0
for own value, key of obj
rv += calculate_size(value)
rv += calculate_size(key)
return rv
else if obj.length
return obj.length
return 0

cache: (fn, loptions = {}) =>
hasher = loptions.hash or @hash
grouper = loptions.group or @group
ttl = loptions.ttl or @options.ttl
hash_name = loptions.name or fn.name

# test validity of options
if not hash_name
throw new Error("Flexcachecname missing in options on anonymous function")
if @used_names[hash_name] and not loptions.multi
throw new Error("Name is already in use and multi is false")
@used_names[hash_name] = true

# prepare event emitter if set
if loptions.emitter
if typeof(loptions.emitter) == 'boolean'
emitter = EventEmitter
else if typeof(loptions.emitter) == 'function'
emitter = loptions.emitter


wrapper = (wargs..., callback) =>
# in case no callback was defined, push it back to the arguments list
if typeof(callback) != 'function'
wargs.push(callback)
if @options.debug > 1
console.log("try cache call. args:", wargs)

# calculate group and hash keys
group_prefix = loptions.group_prefix or @options.group_prefix
group = group_prefix + grouper(wargs...)
hash = hash_name + "_" + hasher(wargs...)
# create event emitter return value
if emitter
ee = new emitter(wargs...)

@backend.get group, hash, (err, cached) =>
# undecodeable means non cached
if err or not cached
if @options.debug
console.log("cache MISS group:", group, " hash:", hash)
# call the masked function
fn wargs..., (results...) =>
if results[0] # error case
return callback.apply(null, results)
# cache the result
opt = ttl:ttl, max_object_size:@options.max_object_size
@backend.set group, hash, results, opt, (err, res) =>
# don't care if succeeded
if @options.debug
console.log("save cache", group, hash)
#console.log(wargs)
#console.log(results)
# call real callback function
callback.apply(null, results)
if emitter
total_buffer = []
total_size = 0
over_limit = false
# call the masked function.
realee = fn wargs...
realee.on 'data', (data) =>
ee.emit 'data', data
if not over_limit
total_buffer.push(data)
#total_size += @calculate_size(data)
over_limit = total_size/2 > @options.max_object_size
realee.on 'end', () =>
# save result in cache
#total_buffer.push(data)
opt = ttl:ttl, max_object_size:@options.max_object_size
@backend.set group, hash, total_buffer, opt, (err, res) =>
if @options.debug
console.log("save cache", group, hash, "err:", err)
ee.emit 'end'


else
fn wargs..., (results...) =>
if results[0] # error case
return callback.apply(null, results)
# cache the result
opt = ttl:ttl, max_object_size:@options.max_object_size
@backend.set group, hash, results, opt, (err, res) =>
# don't care if succeeded
if @options.debug
console.log("save cache", group, hash)
#console.log(wargs)
#console.log(results)
# call real callback function
if not ee
callback.apply(null, results)

else
if @options.debug
console.log("cache HIT group:", group, " hash:", hash)
#console.log(cached)
callback.apply(null, cached)
if not ee
callback.apply(null, cached)
else
# handle event emmitter
#console.log("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%cached", cached)
atest = () ->
cached.length
adata = (callback) ->
mydata = cached.splice(0,1)
console.log(mydata)
ee.emit 'data', mydata
setTimeout callback, 0
aend = () ->
ee.emit('end')
async.whilst(atest, adata, aend)
ee or null

wrapper.get_group = (args...) =>
grouper = loptions.group or @group
Expand Down
81 changes: 78 additions & 3 deletions test/test-redis.coffee
Expand Up @@ -8,6 +8,7 @@ STRESS_RUNS = 10000
async = require 'async'
garbage = require 'garbage'
inspect = require('eyes').inspector({styles: {all: 'magenta'}})
{ EventEmitter } = require('events')

`function _deepEqual(actual, expected) {
// 7.1. All identical values are equivalent, as determined by ===.
Expand Down Expand Up @@ -375,8 +376,82 @@ gentests "Stress", (test, backend) ->
for i in [0...RUNS]
queue.push {}
queue.drain = (err, done) ->
back.close()
took = (new Date().getTime() - start)/1000
console.log("took:", took, "s req/sec:", RUNS/took)
test.done()

back.close(test.done)

#redis = require('redis')
#redis.debug_mode = true

module.exports.TestEmmiter = (test) ->
back = new RedisBackend()
fc = new Flexcache back, debug:2

emitter_run = 0
class TestEmitter extends EventEmitter
constructor: (args...) ->
console.log(args)
@args = args
setTimeout(@run, 0)

run: () =>
emitter_run++
console.trace()
async.waterfall [
(next) =>
@emit 'data', "emitter run: " + emitter_run
setTimeout(next, 4)
,
(next) =>
@emit 'data', "\nsecond\n"
setTimeout(next, 0)
,
(next) =>
@emit 'end'
]
@

special: () =>
return 42

my_emitter = (args...) ->
ee = new TestEmitter args...
return ee

cached_emitter = fc.cache my_emitter,
name: "emit",
emitter: TestEmitter

cached_emitter2 = fc.cache my_emitter,
name: "emit2",
emitter: true

rvp = cached_emitter2(1, "test2")
test.ok(rvp instanceof EventEmitter, "not event emitter")

rv1 = cached_emitter(1, "test2")
test.ok(rv1 instanceof TestEmitter, "not test emitter")
cached_emitter.clear_group 1, () ->
async.waterfall [
(next) ->
rv1 = cached_emitter(1, "test2")
test.ok(rv1 instanceof TestEmitter, "not test emitter")
rv1.on 'data', (data) ->
console.log("got data:", data)
rv1.on 'end', () ->
console.log("end rcv, test1")
next(rv1)
,
(rv1, next) ->
rv2 = cached_emitter(1, "test2")
test.ok(rv2 instanceof TestEmitter, "not event emitter")
should_results = [ 'emitter run: 5', '\nsecond\n' ]
rv2.on 'data', (data) ->
test.equal(data, should_results.splice(0,1), "results differ")
console.log("got data", data)
rv2.on 'end', () ->
console.log("end rcv, test2")
next()
], () ->
back.close()
test.done()

0 comments on commit 1fdda86

Please sign in to comment.