Skip to content

Commit

Permalink
ActiveSupport instrument DalliStore errors
Browse files Browse the repository at this point in the history
Previously, one could use raise_errors to either handle execptions
in client code or have them handled by dalli. In the later case,
logging was the only way to be notified of an exception. This
commit adds the ability to be notified via active support
instrumentation via a cache_error.active_support event.

I've added this code because I want to know when errors are
occuring to trigger external monitoring and I still want
dalli to fail without raising errors so that cached values
are still generated.
  • Loading branch information
tipair committed Jan 4, 2017
1 parent d2561b5 commit 4b4a1f7
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 19 deletions.
2 changes: 2 additions & 0 deletions History.md
@@ -1,6 +1,8 @@
Dalli Changelog
=====================

- Instrument DalliStore errors with instrument_errors configuration option. (btatnall)

2.7.6
==========
- Rails 5.0.0.beta2 compatibility (yui-knk, petergoldstein)
Expand Down
62 changes: 43 additions & 19 deletions lib/active_support/cache/dalli_store.rb
Expand Up @@ -99,7 +99,7 @@ def fetch(name, options=nil)
if block_given?
entry = not_found
unless options[:force]
entry = instrument(:read, namespaced_name, options) do |payload|
entry = instrument_with_log(:read, namespaced_name, options) do |payload|
read_entry(namespaced_name, options).tap do |result|
if payload
payload[:super_operation] = :fetch
Expand All @@ -110,13 +110,13 @@ def fetch(name, options=nil)
end

if entry == not_found
result = instrument(:generate, namespaced_name, options) do |payload|
result = instrument_with_log(:generate, namespaced_name, options) do |payload|
yield
end
write(name, result, options)
result
else
instrument(:fetch_hit, namespaced_name, options) { |payload| }
instrument_with_log(:fetch_hit, namespaced_name, options) { |payload| }
entry
end
else
Expand All @@ -128,7 +128,7 @@ def read(name, options=nil)
options ||= {}
name = namespaced_key(name, options)

instrument(:read, name, options) do |payload|
instrument_with_log(:read, name, options) do |payload|
entry = read_entry(name, options)
payload[:hit] = !entry.nil? if payload
entry
Expand All @@ -139,7 +139,7 @@ def write(name, value, options=nil)
options ||= {}
name = namespaced_key(name, options)

instrument(:write, name, options) do |payload|
instrument_with_log(:write, name, options) do |payload|
with do |connection|
options = options.merge(:connection => connection)
write_entry(name, value, options)
Expand All @@ -159,7 +159,7 @@ def delete(name, options=nil)
options ||= {}
name = namespaced_key(name, options)

instrument(:delete, name, options) do |payload|
instrument_with_log(:delete, name, options) do |payload|
delete_entry(name, options)
end
end
Expand All @@ -169,7 +169,7 @@ def delete(name, options=nil)
def read_multi(*names)
options = names.extract_options!
mapping = names.inject({}) { |memo, name| memo[namespaced_key(name, options)] = name; memo }
instrument(:read_multi, mapping.keys) do
instrument_with_log(:read_multi, mapping.keys) do
results = {}
if local_cache
mapping.each_key do |key|
Expand Down Expand Up @@ -200,7 +200,7 @@ def fetch_multi(*names)
options = names.extract_options!
mapping = names.inject({}) { |memo, name| memo[namespaced_key(name, options)] = name; memo }

instrument(:fetch_multi, mapping.keys) do
instrument_with_log(:fetch_multi, mapping.keys) do
with do |connection|
results = connection.get_multi(mapping.keys)

Expand Down Expand Up @@ -231,11 +231,12 @@ def increment(name, amount = 1, options=nil)
name = namespaced_key(name, options)
initial = options.has_key?(:initial) ? options[:initial] : amount
expires_in = options[:expires_in]
instrument(:increment, name, :amount => amount) do
instrument_with_log(:increment, name, :amount => amount) do
with { |c| c.incr(name, amount, expires_in, initial) }
end
rescue Dalli::DalliError => e
logger.error("DalliError: #{e.message}") if logger
log_dalli_error(e)
instrument_error(e) if instrument_errors?
raise if raise_errors?
nil
end
Expand All @@ -250,23 +251,25 @@ def decrement(name, amount = 1, options=nil)
name = namespaced_key(name, options)
initial = options.has_key?(:initial) ? options[:initial] : 0
expires_in = options[:expires_in]
instrument(:decrement, name, :amount => amount) do
instrument_with_log(:decrement, name, :amount => amount) do
with { |c| c.decr(name, amount, expires_in, initial) }
end
rescue Dalli::DalliError => e
logger.error("DalliError: #{e.message}") if logger
log_dalli_error(e)
instrument_error(e) if instrument_errors?
raise if raise_errors?
nil
end

# Clear the entire cache on all memcached servers. This method should
# be used with care when using a shared cache.
def clear(options=nil)
instrument(:clear, 'flushing all keys') do
instrument_with_log(:clear, 'flushing all keys') do
with { |c| c.flush_all }
end
rescue Dalli::DalliError => e
logger.error("DalliError: #{e.message}") if logger
log_dalli_error(e)
instrument_error(e) if instrument_errors?
raise if raise_errors?
nil
end
Expand Down Expand Up @@ -300,7 +303,8 @@ def read_entry(key, options) # :nodoc:
# NB Backwards data compatibility, to be removed at some point
entry.is_a?(ActiveSupport::Cache::Entry) ? entry.value : entry
rescue Dalli::DalliError => e
logger.error("DalliError: #{e.message}") if logger
log_dalli_error(e)
instrument_error(e) if instrument_errors?
raise if raise_errors?
nil
end
Expand All @@ -314,7 +318,8 @@ def write_entry(key, value, options) # :nodoc:
connection = options.delete(:connection)
connection.send(method, key, value, expires_in, options)
rescue Dalli::DalliError => e
logger.error("DalliError: #{e.message}") if logger
log_dalli_error(e)
instrument_error(e) if instrument_errors?
raise if raise_errors?
false
end
Expand All @@ -323,7 +328,8 @@ def write_entry(key, value, options) # :nodoc:
def delete_entry(key, options) # :nodoc:
with { |c| c.delete(key) }
rescue Dalli::DalliError => e
logger.error("DalliError: #{e.message}") if logger
log_dalli_error(e)
instrument_error(e) if instrument_errors?
raise if raise_errors?
false
end
Expand Down Expand Up @@ -364,12 +370,26 @@ def expanded_key(key) # :nodoc:
key
end

def instrument(operation, key, options=nil)
def log_dalli_error(error)
logger.error("DalliError: #{error.message}") if logger
end

def instrument_with_log(operation, key, options=nil)
log(operation, key, options)

payload = { :key => key }
payload.merge!(options) if options.is_a?(Hash)
ActiveSupport::Notifications.instrument("cache_#{operation}.active_support", payload){ yield(payload) }
instrument(operation, payload) { |p| yield(p) }
end

def instrument_error(error)
instrument(:error, { :key => 'DalliError', :message => error.message })
end

def instrument(operation, payload)
ActiveSupport::Notifications.instrument("cache_#{operation}.active_support", payload) do
yield(payload) if block_given?
end
end

def log(operation, key, options=nil)
Expand All @@ -381,6 +401,10 @@ def raise_errors?
!!@options[:raise_errors]
end

def instrument_errors?
!!@options[:instrument_errors]
end

# Make sure LocalCache is giving raw values, not `Entry`s, and
# respect `raw` option.
module LocalCacheEntryUnwrapAndRaw # :nodoc:
Expand Down
13 changes: 13 additions & 0 deletions test/test_active_support.rb
Expand Up @@ -394,6 +394,19 @@ def self.it_with_and_without_local_cache(message, &block)
end

describe 'instruments' do
it 'notifies errors' do
new_port = 29333
key = 'foo'
with_cache port: new_port, :instrument_errors => true do
memcached_kill(new_port)
payload_proc = Proc.new { |payload| payload }
@dalli.expects(:instrument).with(:read, { :key => key }).yields(&payload_proc).once
@dalli.expects(:instrument).with(:error, { :key => "DalliError",
:message => "No server available" }).once
@dalli.read(key)
end
end

it 'payload hits' do
with_cache do
payload = {}
Expand Down

0 comments on commit 4b4a1f7

Please sign in to comment.