Permalink
Browse files

avoid storing huge data in session. let ff_daemon cache it. closes #194.

  • Loading branch information...
1 parent 029b127 commit 5bf152a6e4dbceb267b964ebd97148346555a808 @nahi committed Jul 14, 2009
@@ -563,12 +563,10 @@ def unpin
private
def find_opt(ctx = @ctx)
- cache = session[:cached_entries] ||= CachedEntries.new
updated_id = updated_id_in_flash()
ctx.find_opt.merge(
:allow_cache => flash[:allow_cache],
- :updated_id => updated_id,
- :cached_entries => cache
+ :updated_id => updated_id
)
end
@@ -1,3 +1,6 @@
+require 'entry'
+
+
class CachedEntries < Array
attr_accessor :opt
end
View
@@ -1,4 +1,12 @@
require 'hash_utils'
+require 'comment'
+require 'entry_user'
+require 'geo'
+require 'like'
+require 'media'
+require 'room'
+require 'service'
+require 'via'
class Entry
View
@@ -1,4 +1,6 @@
require 'task'
+require 'entry'
+require 'cached_entries'
class EntryThread
@@ -13,15 +15,13 @@ class << self
def find(opt = {})
auth = opt[:auth]
- cache = opt[:cached_entries]
return nil unless auth
unless opt.key?(:merge_entry)
opt[:merge_entry] = true
end
opt.delete(:auth)
- opt.delete(:cached_entries)
logger.info('[perf] start entries fetch')
- original = entries = fetch_entries(auth, opt, cache)
+ original = entries = fetch_entries(auth, opt)
logger.info('[perf] start internal data handling')
record_last_modified(entries)
logger.info('[perf] record_last_modified done')
@@ -98,15 +98,15 @@ def logger
ActiveRecord::Base.logger
end
- def fetch_entries(auth, opt, cache)
+ def fetch_entries(auth, opt)
if opt[:id]
- fetch_single_entry_as_array(auth, opt, cache)
+ fetch_single_entry_as_array(auth, opt)
else
- entries = fetch_list_entries(auth, opt, cache)
+ entries = fetch_list_entries(auth, opt)
if updated_id = opt[:updated_id]
entry = wrap(get_entry(auth, :id => updated_id)).first
if entry
- update_cache_entry(cache, entry)
+ update_cache_entry(auth, entry)
if entries.find { |e| e.id == updated_id }
replace_entry(entries, entry)
else
@@ -118,18 +118,20 @@ def fetch_entries(auth, opt, cache)
end
end
- def fetch_single_entry_as_array(auth, opt, cache)
- if opt[:allow_cache] and cache
- if found = cache.find { |e| e.id == opt[:id] && e.id != opt[:updated_id] }
- logger.info("[cache] entry cache found for #{opt[:id]}")
- return [found]
+ def fetch_single_entry_as_array(auth, opt)
+ if opt[:allow_cache]
+ if cache = get_cached_entries(auth)
+ if found = cache.find { |e| e.id == opt[:id] && e.id != opt[:updated_id] }
+ logger.info("[cache] entry cache found for #{opt[:id]}")
+ return [found]
+ end
end
end
wrap(Task.run { get_entry(auth, opt) }.result)
end
- def fetch_list_entries(auth, opt, cache)
- cache_entries(opt, cache) {
+ def fetch_list_entries(auth, opt)
+ cache_entries(auth, opt) {
if opt[:inbox]
start = opt[:start]
num = opt[:num]
@@ -176,34 +178,45 @@ def fetch_list_entries(auth, opt, cache)
}
end
- def cache_entries(opt, cache, &block)
+ def cache_entries(auth, opt, &block)
allow_cache = opt[:allow_cache]
opt = opt.dup
opt.delete(:allow_cache)
opt.delete(:updated_id)
opt.delete(:merge_entry)
opt.delete(:merge_service)
opt.delete(:filter_inbox_except)
- if allow_cache and cache
- if opt == cache.opt
- logger.info("[cache] entries cache found for #{opt.inspect}")
- return cache
+ if allow_cache
+ if cache = get_cached_entries(auth)
+ if opt == cache.opt
+ logger.info("[cache] entries cache found for #{opt.inspect}")
+ return cache
+ end
end
end
entries = yield
- if cache
- cache.opt = opt
- cache.replace(entries)
- end
+ cache = CachedEntries.new
+ cache.opt = opt
+ cache.replace(entries)
+ set_cached_entries(auth, cache)
entries
end
- def update_cache_entry(cache, entry)
- if cache
+ def update_cache_entry(auth, entry)
+ if cache = get_cached_entries(auth)
replace_entry(cache, entry)
+ set_cached_entries(auth, cache)
end
end
+ def get_cached_entries(auth)
+ ff_client.get_cached_entries(auth.name)
+ end
+
+ def set_cached_entries(auth, cache)
+ ff_client.set_cached_entries(auth.name, cache)
+ end
+
def record_last_modified(entries)
found = LastModified.find_all_by_eid(entries.map { |e| e.id })
found_map = found.inject({}) { |r, e|
View
@@ -56,6 +56,7 @@ class BaseClient
attr_accessor :logger
attr_accessor :apikey
attr_accessor :http_proxy
+ attr_accessor :httpclient_max_keepalive
class LShiftLogger
def initialize(logger)
@@ -72,21 +73,36 @@ def method_missing(msg_id, *a, &b)
end
class UserClient
+ attr_accessor :httpclient_max_keepalive
+
def initialize(name, remote_key, logger, http_proxy)
@client = HTTPClient.new(http_proxy)
@name = name
@remote_key = remote_key
#@client.debug_dev = LShiftLogger.new(logger)
+ @logger = logger
@client.extend(MonitorMixin)
+ @last_accessed = Time.now
reset_auth
end
+ def idle?
+ if @httpclient_max_keepalive
+ elapsed = Time.now - @last_accessed
+ if elapsed > @httpclient_max_keepalive
+ @client.reset_all rescue nil
+ true
+ end
+ end
+ end
+
def client(remote_key)
@client.synchronize do
if remote_key != @remote_key
@remote_key = remote_key
reset_auth
end
+ @last_accessed = Time.now
@client
end
end
@@ -106,7 +122,9 @@ def initialize(logger = nil, apikey = nil)
@logger = logger || NullLogger.new
@apikey = apikey
@http_proxy = nil
+ @httpclient_max_keepalive = 5 * 60
@clients = {}
+ @mutex = Monitor.new
end
private
@@ -119,10 +137,23 @@ def uri(part)
end
def create_client(name, remote_key)
- UserClient.new(name, remote_key, @logger, @http_proxy)
+ client = UserClient.new(name, remote_key, @logger, @http_proxy)
+ client.httpclient_max_keepalive = @httpclient_max_keepalive
+ client
end
def client_sync(uri, name, remote_key)
+ @mutex.synchronize do
+ clients = {}
+ @clients.each do |key, value|
+ if value.idle?
+ @logger.info("removed idle HTTPClient for #{key}")
+ else
+ clients[key] = value
+ end
+ end
+ @clients = clients
+ end
user_client = @clients[name] ||= create_client(name, remote_key)
client = user_client.client(remote_key)
logger.info("#{user_client.inspect} is accessing to #{uri.to_s} for #{name}")
View
@@ -70,6 +70,8 @@ class APIClientProxy
define_proxy_method :get_list_profile
define_proxy_method :get_room_status
define_proxy_method :purge_cache
+ define_proxy_method :get_cached_entries
+ define_proxy_method :set_cached_entries
def initialize
@client = DRb::DRbObject.new(nil, F2P::Config.friendfeed_api_daemon_drb_uri)
@@ -230,6 +232,19 @@ def initialize(logger = nil)
@cache = {}
end
+ def set_cached_entries(name, entries)
+ basekey = name
+ cache = ((@cache ||= {})[basekey] ||= {})
+ cache[:last_entries] = entries
+ nil
+ end
+
+ def get_cached_entries(name)
+ basekey = name
+ cache = ((@cache ||= {})[basekey] ||= {})
+ cache[:last_entries]
+ end
+
def purge_cache(key)
@cache.delete(key)
if @channel.key?(key)
@@ -8,6 +8,15 @@ def setup
@ff.stubs(:get_profile).returns(profile)
@ff.stubs(:get_user_status).returns({})
@ff.stubs(:get_room_status).returns({})
+ class << @ff
+ def set_cached_entries(auth, cache)
+ @cache = cache
+ end
+
+ def get_cached_entries(auth)
+ @cache
+ end
+ end
ApplicationController.ff_client = @ff
end
Oops, something went wrong.

0 comments on commit 5bf152a

Please sign in to comment.