-
Notifications
You must be signed in to change notification settings - Fork 40
/
cache.rb
504 lines (441 loc) · 15.1 KB
/
cache.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
# frozen_string_literal: true
require 'readthis/entity'
require 'readthis/expanders'
require 'readthis/passthrough'
require 'readthis/scripts'
require 'redis'
require 'connection_pool'
module Readthis
# Readthis is a Redis backed cache client. It is a drop in replacement for
# any `ActiveSupport` compliant cache Above all Readthis emphasizes
# performance, simplicity, and explicitness.
class Cache
attr_reader :entity, :notifications, :options, :pool, :scripts
# Provide a class level lookup of the proper notifications module.
# Instrumention is expected to occur within applications that have
# ActiveSupport::Notifications available, but needs to work even when it
# isn't.
def self.notifications
ActiveSupport::Notifications if defined?(ActiveSupport::Notifications)
end
# Creates a new Readthis::Cache object with the given options.
#
# @option options [Hash] :redis Options that will be passed to the redis
# connection
# @option options [Boolean] :compress (false) Enable or disable automatic
# compression
# @option options [Number] :compression_threshold (8k) Minimum string size
# for compression
# @option options [Number] :expires_in The number of seconds until an entry
# expires
# @option options [Boolean] :refresh (false) Automatically refresh key
# expiration
# @option options [Boolean] :retain_nils (false) Whether nil values should
# be included in read_multi output
# @option options [Module] :marshal (Marshal) Module that responds to
# `dump` and `load`
# @option options [String] :namespace Prefix used to namespace entries
# @option options [Number] :pool_size (5) The number of threads in the pool
# @option options [Number] :pool_timeout (5) How long before a thread times
# out
#
# @example Create a new cache instance
#
# Readthis::Cache.new(namespace: 'cache',
# redis: { url: 'redis://localhost:6379/0' })
#
# @example Create a compressed cache instance
#
# Readthis::Cache.new(compress: true, compression_threshold: 2048)
#
def initialize(options = {})
@options = options
@entity = Readthis::Entity.new(
marshal: options.fetch(:marshal, Marshal),
compress: options.fetch(:compress, false),
threshold: options.fetch(:compression_threshold, 1024)
)
@pool = ConnectionPool.new(pool_options(options)) do
Redis.new(options.fetch(:redis, {}))
end
@scripts = Readthis::Scripts.new
end
# Fetches data from the cache, using the given key. If there is data in
# the cache with the given key, then that data is returned. Otherwise, nil
# is returned.
#
# @param [String] key Key for lookup
# @param [Hash] options Optional overrides
#
# @example
#
# cache.read('missing') # => nil
# cache.read('matched') # => 'some value'
#
def read(key, options = {})
options = merged_options(options)
invoke(:read, key) do |store|
key = namespaced_key(key, options)
refresh_entity(key, store, options)
entity.load(store.get(key))
end
end
# Writes data to the cache using the given key. Will overwrite whatever
# value is already stored at that key.
#
# @param [String] key Key for lookup
# @param [Hash] options Optional overrides
#
# @example
#
# cache.write('some-key', 'a bunch of text') # => 'OK'
# cache.write('some-key', 'short lived', expires_in: 60) # => 'OK'
# cache.write('some-key', 'lives elsehwere', namespace: 'cache') # => 'OK'
#
def write(key, value, options = {})
options = merged_options(options)
invoke(:write, key) do |store|
write_entity(key, value, store, options)
end
end
# Delete the value stored at the specified key. Returns `true` if
# anything was deleted, `false` otherwise.
#
# @param [String] key The key for lookup
# @param [Hash] options Optional overrides
#
# @example
#
# cache.delete('existing-key') # => true
# cache.delete('random-key') # => false
#
def delete(key, options = {})
namespaced = namespaced_key(key, merged_options(options))
invoke(:delete, key) do |store|
store.del(namespaced) > 0
end
end
# Delete all values that match a given pattern. The pattern must be defined
# using Redis compliant globs. The following examples are borrowed from the
# `KEYS` documentation:
#
# * `h?llo` matches hello, hallo and hxllo
# * `h*llo` matches hllo and heeeello
# * `h[ae]llo` matches hello and hallo, but not hillo
# * `h[^e]llo` matches hallo, hbllo, ... but not hello
# * `h[a-b]llo` matches hallo and hbllo
#
# Note that `delete_matched` does *not* use the `KEYS` command, making it
# safe for use in production.
#
# @param [String] pattern The glob pattern for matching keys
# @option [String] :namespace Prepend a namespace to the pattern
# @option [Number] :count Configure the number of keys deleted at once
#
# @example Delete all 'cat' keys
#
# cache.delete_matched('*cats') #=> 47
# cache.delete_matched('*dogs') #=> 0
#
def delete_matched(pattern, options = {})
namespaced = namespaced_key(pattern, merged_options(options))
invoke(:delete, pattern) do |store|
cursor = nil
count = options.fetch(:count, 1000)
deleted = 0
until cursor == '0'
cursor, matched = store.scan(cursor || 0, match: namespaced, count: count)
if matched.any?
store.del(*matched)
deleted += matched.length
end
end
deleted
end
end
# Fetches data from the cache, using the given key. If there is data in the
# cache with the given key, then that data is returned.
#
# If there is no such data in the cache (a cache miss), then `nil` will be
# returned. However, if a block has been passed, that block will be passed
# the key and executed in the event of a cache miss. The return value of
# the block will be written to the cache under the given cache key, and
# that return value will be returned.
#
# @param [String] key Key for lookup
# @param options [Hash] Optional overrides
# @option options [Boolean] :force Force a cache miss
# @yield [String] Gives a missing key to the block, which is used to
# generate the missing value
#
# @example Typical
#
# cache.write('today', 'Monday')
# cache.fetch('today') # => "Monday"
# cache.fetch('city') # => nil
#
# @example With a block
#
# cache.fetch('city') do
# 'Duckburgh'
# end
#
# cache.fetch('city') # => "Duckburgh"
#
# @example Cache Miss
#
# cache.write('today', 'Monday')
# cache.fetch('today', force: true) # => nil
#
def fetch(key, options = {})
options ||= {}
value = read(key, options) unless options[:force]
if value.nil? && block_given?
value = yield(key)
write(key, value, options)
end
value
end
# Increment a key in the store.
#
# If the key doesn't exist it will be initialized at 0. If the key exists
# but it isn't a Fixnum it will be coerced to 0.
#
# Note that this method does *not* use Redis' native `incr` or `incrby`
# commands. Those commands only work with number-like strings, and are
# incompatible with the encoded values Readthis writes to the store. The
# behavior of `incrby` is preserved as much as possible, but incrementing
# is not an atomic action. If multiple clients are incrementing the same
# key there will be a "last write wins" race condition, causing incorrect
# counts.
#
# If you absolutely require correct counts it is better to use the Redis
# client directly.
#
# @param [String] key Key for lookup
# @param [Fixnum] amount Value to increment by
# @param [Hash] options Optional overrides
#
# @example
#
# cache.increment('counter') # => 1
# cache.increment('counter', 2) # => 3
#
def increment(key, amount = 1, options = {})
invoke(:increment, key) do |store|
alter(store, key, amount, options)
end
end
# Decrement a key in the store.
#
# If the key doesn't exist it will be initialized at 0. If the key exists
# but it isn't a Fixnum it will be coerced to 0. Like `increment`, this
# does not make use of the native `decr` or `decrby` commands.
#
# @param [String] key Key for lookup
# @param [Fixnum] amount Value to decrement by
# @param [Hash] options Optional overrides
#
# @example
#
# cache.write('counter', 20) # => 20
# cache.decrement('counter') # => 19
# cache.decrement('counter', 2) # => 17
#
def decrement(key, amount = 1, options = {})
invoke(:decrement, key) do |store|
alter(store, key, -amount, options)
end
end
# Efficiently read multiple values at once from the cache. Options can be
# passed in the last argument.
#
# @overload read_multi(keys)
# Return all values for the given keys.
# @param [String] One or more keys to fetch
# @param [Hash] options Configuration to override
#
# @return [Hash] A hash mapping keys to the values found.
#
# @example
#
# cache.read_multi('a', 'b') # => { 'a' => 1 }
# cache.read_multi('a', 'b', retain_nils: true) # => { 'a' => 1, 'b' => nil }
#
def read_multi(*keys)
options = merged_options(extract_options!(keys))
mapping = keys.map { |key| namespaced_key(key, options) }
return {} if keys.empty?
invoke(:read_multi, keys) do |store|
values = store.mget(*mapping).map { |value| entity.load(value) }
refresh_entity(mapping, store, options)
zipped_results(keys, values, options)
end
end
# Write multiple key value pairs simultaneously. This is an atomic
# operation that will always succeed and will overwrite existing
# values.
#
# This is a non-standard, but useful, cache method.
#
# @param [Hash] hash Key value hash to write
# @param [Hash] options Optional overrides
#
# @example
#
# cache.write_multi({ 'a' => 1, 'b' => 2 }) # => true
#
def write_multi(hash, options = {})
options = merged_options(options)
invoke(:write_multi, hash.keys) do |store|
store.multi do
hash.each { |key, value| write_entity(key, value, store, options) }
end
end
end
# Fetches multiple keys from the cache using a single call to the server
# and filling in any cache misses. All read and write operations are
# executed atomically.
#
# @overload fetch_multi(keys)
# Return all values for the given keys, applying the block to the key
# when a value is missing.
# @param [String] One or more keys to fetch
#
# @example
#
# cache.fetch_multi('alpha', 'beta') do |key|
# "#{key}-was-missing"
# end
#
# cache.fetch_multi('a', 'b', expires_in: 60) do |key|
# key * 2
# end
#
def fetch_multi(*keys)
options = extract_options!(keys).merge(retain_nils: true)
results = read_multi(*keys, options)
missing = {}
invoke(:fetch_multi, keys) do |_store|
results.each do |key, value|
next unless value.nil?
value = yield(key)
missing[key] = value
results[key] = value
end
end
write_multi(missing, options) if missing.any?
results
end
# Returns `true` if the cache contains an entry for the given key.
#
# @param [String] key Key for lookup
# @param [Hash] options Optional overrides
#
# @example
#
# cache.exist?('some-key') # => false
# cache.exist?('some-key', namespace: 'cache') # => true
#
def exist?(key, options = {})
invoke(:exist?, key) do |store|
store.exists(namespaced_key(key, merged_options(options)))
end
end
# Clear the entire cache by flushing the current database.
#
# This flushes everything in the current database, with no globbing
# applied. Data in other numbered databases will be preserved.
#
# @option options [Hash] :async Flush the database asynchronously, only
# supported in Redis 4.0+
#
# @example
#
# cache.clear #=> 'OK'
# cache.clear(async: true) #=> 'OK'
#
def clear(options = {})
invoke(:clear, '*') do |store|
if options[:async]
store.flushdb(async: true)
else
store.flushdb
end
end
end
protected
def refresh_entity(keys, store, options)
return unless options[:refresh] && options[:expires_in]
expiration = coerce_expiration(options[:expires_in])
scripts.run('mexpire', store, keys, expiration)
end
def write_entity(key, value, store, options)
namespaced = namespaced_key(key, options)
dumped = entity.dump(value, options)
if (expiration = options[:expires_in])
store.setex(namespaced, coerce_expiration(expiration), dumped)
else
store.set(namespaced, dumped)
end
end
private
def alter(store, key, amount, options)
options = merged_options(options)
namespaced = namespaced_key(key, options)
loaded = entity.load(store.get(namespaced))
change = loaded.to_i + amount
dumped = entity.dump(change, options)
expiration = fallback_expiration(store, namespaced, options)
if expiration
store.setex(namespaced, coerce_expiration(expiration), dumped)
else
store.set(namespaced, dumped)
end
change
end
def fallback_expiration(store, key, options)
options.fetch(:expires_in) do
ttl = store.ttl(key)
ttl > 0 ? ttl : nil
end
end
def coerce_expiration(expires_in)
Float(expires_in).ceil
end
def instrument(name, key)
if self.class.notifications
name = "cache_#{name}.active_support"
payload = { key: key, name: name }
self.class.notifications.instrument(name, payload) { yield(payload) }
else
yield
end
end
def invoke(operation, key, &block)
instrument(operation, key) do
pool.with(&block)
end
rescue Redis::BaseError => error
raise error unless Readthis.fault_tolerant?
end
def extract_options!(array)
array.last.is_a?(Hash) ? array.pop : {}
end
def merged_options(options)
@options.merge(options || {})
end
def zipped_results(keys, values, options)
zipped = keys.zip(values)
zipped.select! { |(_, value)| value } unless options[:retain_nils]
zipped.to_h
end
def pool_options(options)
{ size: options.fetch(:pool_size, 5),
timeout: options.fetch(:pool_timeout, 5) }
end
def namespaced_key(key, options)
Readthis::Expanders.namespace_key(key, options[:namespace])
end
end
end