Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add pg_statement_cache extension, for automatically preparing queries…

… when using postgres adapter with pg driver

When combined with the pg_auto_parameterize extension, this can
take an entire Sequel application turn most/all of the queries
into prepared statements.
  • Loading branch information...
commit cbfa4e9315e900192ca6eab97af38b13cea6cd0a 1 parent 99629c7
@jeremyevans authored
View
2  CHANGELOG
@@ -1,5 +1,7 @@
=== HEAD
+* Add pg_statement_cache extension, for automatically preparing queries when using postgres adapter with pg driver (jeremyevans)
+
* Add pg_auto_parameterize extension, for automatically parameterizing queries when using postgres adapter with pg driver (jeremyevans)
* Add ConnectionPool#disconnection_proc= method, to modify disconnection_proc after the pool has been created (jeremyevans)
View
316 lib/sequel/extensions/pg_statement_cache.rb
@@ -0,0 +1,316 @@
+# This extension adds a statement cache to Sequel's postgres adapter,
+# with the ability to automatically prepare statements that are
+# executed repeatedly. When combined with the pg_auto_parameterize
+# extension, it can take Sequel code such as:
+#
+# DB.extend Sequel::Postgres::AutoParameterize::DatabaseMethods
+# DB.extend Sequel::Postgres::StatementCache::DatabaseMethods
+# DB[:table].filter(:a=>1)
+# DB[:table].filter(:a=>2)
+# DB[:table].filter(:a=>3)
+#
+# And use the same prepared statement to execute the queries.
+#
+# The backbone of this extension is a modified LRU cache. It considers
+# both the last executed time and the number of executions when
+# determining which queries to keep in the cache. It only cleans the
+# cache when a high water mark has been passed, and removes queries
+# until it reaches the low water mark, in order to avoid thrashing when
+# you are using more than the maximum number of queries. To avoid
+# preparing queries when it isn't necessary, it does not prepare them
+# on the server side unless they are being executed more than once.
+# The cache is very tunable, allowing you to set the high and low
+# water marks, the number of executions before preparing the query,
+# and even use a custom callback for determine which queries to keep
+# in the cache.
+#
+# Note that automatically preparing statements does have some issues.
+# Most notably, if you change the result type that the query returns,
+# PostgreSQL will raise an error. This can happen if you have
+# prepared a statement that selects all columns from a table, and then
+# you add or remove a column from that table. This extension does
+# attempt to check that case and clear the statement caches if you use
+# alter_table from within Sequel, but it cannot fix the case when such
+# a change is made externally.
+#
+# This extension only works when the pg driver is used as the backend
+# for the postgres adapter.
+
+module Sequel
+ module Postgres
+ module StatementCache
+ # A simple structure used for the values in the StatementCache's hash.
+ # It does not hold the related SQL, since that is used as the key for
+ # the StatementCache's hash.
+ class Statement
+ # The last time this statement was seen by the cache, persumably the
+ # last time it was executed.
+ attr_accessor :last_seen
+
+ # The total number of executions since the statement entered the cache.
+ attr_accessor :num_executes
+
+ # The id related to the statement, used as part of the prepared statement
+ # name.
+ attr_reader :cache_id
+
+ # Used when adding entries to the cache, just sets their id. Uses
+ # 0 for num_executes since that is incremented elsewhere. Does not
+ # set last_seen since that is set elsewhere to reduce branching.
+ def initialize(cache_id)
+ @num_executes = 0
+ @cache_id = cache_id
+ end
+
+ # The name to use for the server side prepared statement. Note that this
+ # statement might not actually be prepared.
+ def name
+ "sequel_pgap_#{cache_id}"
+ end
+ end
+
+ # The backbone of the block, a modified LRU (least recently used) cache
+ # mapping SQL query strings to Statement objects.
+ class StatementCache
+ include Enumerable
+
+ # Set the options for the statement cache. These are generally set at
+ # the database level using the :statement_cache_opts Database option.
+ #
+ # :max_size :: The maximum size (high water mark) for the cache. If
+ # an entry is added when the current size of the cache is
+ # equal to the maximum size, the cache is cleaned up to
+ # reduce the number of entries to the :min_size. Defaults
+ # to 1000.
+ # :min_size :: The minimum size (low water mark) for the cache. On
+ # cleanup, the size of the cache is reduced to this
+ # number. Note that there could be fewer than this
+ # number of entries in the cache. Defaults to :max_size/2.
+ # :prepare_after :: The number of executions to wait for before preparing
+ # the query server-side. If set to 1, prepares all executed
+ # queries server-side. If set to 5, does not attempt to
+ # prepare the query until the 5th execution. Defaults to 2.
+ # :sorter :: A callable object that takes two arguments, the current time
+ # and the related Statement instance, and should return some
+ # Comparable (usually a numeric) such that the lowest values
+ # returned are the first to be removed when it comes time to
+ # clean the pool. The default is basically:
+ #
+ # lambda{|t, stmt| (stmt.last_seen - t)/stmt.num_executes}
+ #
+ # so that it doesn't remove statements that have been executed
+ # many times just because many less-frequently executed statements
+ # have been executed recently.
+ #
+ # The block passed is called with the Statement object's name, only for
+ # statements that have been prepared, and should be used to deallocate the
+ # statements.
+ def initialize(opts={}, &block)
+ @cleanup_proc = block
+ @prepare_after = opts.fetch(:prepare_after, 2)
+ @max_size = opts.fetch(:max_size, 1000)
+ @min_size = opts.fetch(:min_size, @max_size/2)
+ @sorter = opts.fetch(:sorter){method(:default_sorter)}
+ @ids = (1..@max_size).to_a.reverse
+ @hash = {}
+ #
+ # We add one so that when we clean the cache, the entry
+ # about to be added brings us to the min_size.
+ @size_diff = @max_size - @min_size + 1
+ end
+
+ # Completely clear the statement cache, deallocating on
+ # the server side all statements that have been prepared.
+ def clear
+ @hash.keys.each{|k| remove(k)}
+ end
+
+ # Yield each SQL string and Statement instance in the cache
+ # to the block.
+ def each(&block)
+ @hash.each(&block)
+ end
+
+ # Get the related statement name from the cache. If the
+ # entry is already in the cache, just bump it's last seen
+ # time and the number of executions. Otherwise, add it
+ # to the cache. If the cache is already full, clean it up
+ # before adding it.
+ #
+ # If the num of executions has passed the threshhold, yield
+ # the statement name to the block, which should be used to
+ # prepare the statement on the server side.
+ #
+ # This method should return the prepared statment name if
+ # the statement has been prepared, and nil if the query
+ # has not been prepared and the statement should be executed
+ # normally.
+ def fetch(sql)
+ unless stmt = @hash[sql]
+ # Get the next id from the id pool.
+ unless id = @ids.pop
+ # No id left, cache must be full, so cleanup and then
+ # get the next id from the id pool.
+ cleanup
+ id = @ids.pop
+ end
+ @hash[sql] = stmt = Statement.new(id)
+ end
+
+ stmt.last_seen = Time.now
+ stmt.num_executes += 1
+
+ if stmt.num_executes >= @prepare_after
+ if stmt.num_executes == @prepare_after
+ begin
+ yield(stmt.name)
+ rescue PGError
+ # An error occurred while preparing the statement,
+ # execute it normally (which will probably raise
+ # the error again elsewhere), but decrement the
+ # number of executions so we don't think we've
+ # prepared the statement when we haven't.
+ stmt.num_executes -= 1
+ return nil
+ end
+ end
+ stmt.name
+ end
+ end
+
+ # The current size of the statement cache.
+ def size
+ @hash.length
+ end
+
+ private
+
+ # Sort by time since last execution and number of executions.
+ # We don't want to throw stuff out of the
+ # cache if it has been executed a lot,
+ # but a bunch of queries that were
+ # executed only once came in more recently.
+ def default_sorter(t, stmt)
+ (stmt.last_seen - t)/stmt.num_executes
+ end
+
+ # After sorting the cache appropriately (so that the least important
+ # items are first), reduce the number of entries in the cache to
+ # the low water mark by removing the related query. Should only be
+ # called when the cache is full.
+ def cleanup
+ t = Time.now
+ @hash.sort_by{|k,v| @sorter.call(t, v)}.first(@size_diff).each{|sql, stmt| remove(sql)}
+ end
+
+ # Remove the query from the cache. If it has been prepared,
+ # call the cleanup_proc to deallocate the statement.
+ def remove(sql)
+ stmt = @hash.delete(sql)
+ if stmt.num_executes >= @prepare_after
+ @cleanup_proc.call(stmt.name)
+ end
+
+ # Return id to the pool of ids
+ @ids.push(stmt.cache_id)
+ end
+ end
+
+ module AdapterMethods
+ # A regular expression for the types of queries to cache. Any queries not
+ # matching this regular expression are not cached.
+ DML_RE = /\A(WITH|SELECT|INSERT|UPDATE|DELETE) /
+
+ # The StatementCache instance for this connection. Note that
+ # each connection has a separate StatementCache, because prepared
+ # statements are connection-specific.
+ attr_reader :statement_cache
+
+ # Set the statement_cache for the connection, using the database's
+ # :statement_cache_opts option.
+ def self.extended(c)
+ c.instance_variable_set(:@statement_cache, StatementCache.new(c.sequel_db.opts[:statement_cache_opts] || {}){|name| c.deallocate(name)})
+ end
+
+ # pg seems to already use the db method (but not the @db instance variable),
+ # so use the sequel_db method to access the related Sequel::Database object.
+ def sequel_db
+ @db
+ end
+
+ # Deallocate on the server the prepared statement with the given name.
+ def deallocate(name)
+ begin
+ execute("DEALLOCATE #{name}")
+ rescue PGError
+ # table probably got removed, just ignore it
+ end
+ end
+
+ private
+
+ # If the sql query string is one we should cache, cache it. If the query already
+ # has a related prepared statement with it, execute the prepared statement instead
+ # of executing the query normally.
+ def execute_query(sql, args=nil)
+ if sql =~ DML_RE
+ if name = statement_cache.fetch(sql){|stmt_name| sequel_db.log_yield("PREPARE #{stmt_name} AS #{sql}"){prepare(stmt_name, sql)}}
+ if args
+ sequel_db.log_yield("EXECUTE #{name}", args){exec_prepared(name, args)}
+ else
+ sequel_db.log_yield("EXECUTE #{name}"){exec_prepared(name)}
+ end
+ else
+ super
+ end
+ else
+ super
+ end
+ end
+ end
+
+ module DatabaseMethods
+ # Setup the after_connect proc for the connection pool to make
+ # sure the connection object is extended with the appropriate
+ # module. This disconnects any existing connections to ensure
+ # that all connections in the pool have been extended appropriately.
+ def self.extended(db)
+ # Respect existing after_connect proc if one is present
+ pr = db.opts[:after_connect]
+
+ # Set the after_connect proc to extend the adapter with
+ # the statement cache support.
+ db.pool.after_connect = db.opts[:after_connect] = proc do |c|
+ pr.call(c) if pr
+ c.extend(AdapterMethods)
+ end
+
+ # Disconnect to make sure all connections get set up with
+ # statement cache.
+ db.disconnect
+ end
+
+ # Clear statement caches for all connections when altering tables.
+ def alter_table(*)
+ clear_statement_caches
+ super
+ end
+
+ # Clear statement caches for all connections when dropping tables.
+ def drop_table(*)
+ clear_statement_caches
+ super
+ end
+
+ private
+
+ # Clear the statement cache for all connections. Note that for
+ # the threaded pools, this will not affect connections currently
+ # allocated to other threads.
+ def clear_statement_caches
+ pool.all_connections{|c| c.statement_cache.clear}
+ end
+ end
+ end
+ end
+end
View
207 spec/extensions/pg_statement_cache_spec.rb
@@ -0,0 +1,207 @@
+require File.join(File.dirname(File.expand_path(__FILE__)), "spec_helper")
+
+Sequel::Postgres::StatementCache::PGError = Sequel::Error
+
+describe "pg_statement_cache and pg_auto_parameterize extensions" do
+ before do
+ @dbp = proc do |opts|
+ @db = Sequel.connect 'mock://postgres', :quote_identifiers=>false, :statement_cache_opts=>{:max_size=>4}.merge(opts),
+ :after_connect=>(proc do |c|
+ c.extend(Module.new do
+ def execute_query(sql, args)
+ raise Sequel::Postgres::StatementCache::PGError if @db.exec_raise
+ @db.execute(sql, :arguments=>args, :no_eq=>true)
+ end
+ def prepare(name, sql)
+ raise Sequel::Postgres::StatementCache::PGError if sql =~ /prepare_raise/
+ @db._sqls << "PREPARE #{name} AS #{sql}"
+ end
+ def exec_prepared(name, args=nil)
+ @db._sqls << "EXECUTE #{name}#{" -- args: #{args.inspect}" if args}"
+ end
+ end)
+ end)
+ @db.extend Sequel::Postgres::AutoParameterize::DatabaseMethods
+ @db.extend Sequel::Postgres::StatementCache::DatabaseMethods
+ @db.extend(Module.new do
+ attr_accessor :exec_raise
+ def _execute(c, sql, opts={})
+ opts[:no_eq] ? super : c.send(:execute_query, sql, opts[:arguments])
+ end
+ def _sqls
+ @sqls
+ end
+ def statement_cache
+ synchronize{|c| c.statement_cache}
+ end
+ end)
+ @db
+ end
+ @db = @dbp.call({})
+ end
+
+ it "should automatically prepare statements executed multiple times" do
+ @db[:table].filter(:b=>2).all
+ 3.times{|i| @db[:table].filter(:a=>i).all}
+ @db.sqls.should == ["SELECT * FROM table WHERE (b = $1::int4) -- args: [2]",
+ "SELECT * FROM table WHERE (a = $1::int4) -- args: [0]",
+ "PREPARE sequel_pgap_2 AS SELECT * FROM table WHERE (a = $1::int4)",
+ "EXECUTE sequel_pgap_2 -- args: [1]",
+ "EXECUTE sequel_pgap_2 -- args: [2]"]
+ end
+
+ it "should work correctly for queries without parameters" do
+ @db[:table].filter(:b).all
+ 3.times{|i| @db[:table].filter(:a).all}
+ @db.sqls.should == ["SELECT * FROM table WHERE b",
+ "SELECT * FROM table WHERE a",
+ "PREPARE sequel_pgap_2 AS SELECT * FROM table WHERE a",
+ "EXECUTE sequel_pgap_2",
+ "EXECUTE sequel_pgap_2"]
+ end
+
+ it "should correctly return the size of the cache" do
+ sc = @db.statement_cache
+ sc.size.should == 0
+ @db[:table].filter(:b=>2).all
+ sc.size.should == 1
+ 3.times{|i| @db[:table].filter(:a=>i).all}
+ sc.size.should == 2
+ end
+
+ it "should correctly clear the cache" do
+ sc = @db.statement_cache
+ sc.size.should == 0
+ @db[:table].filter(:b=>2).all
+ sc.size.should == 1
+ 3.times{|i| @db[:table].filter(:a=>i).all}
+ sc.size.should == 2
+ sc.clear
+ sc.size.should == 0
+ 3.times{|i| @db[:table].filter(:a=>i).all}
+ sc.size.should == 1
+ end
+
+ it "should correctly yield each entry in the cache" do
+ @db[:table].filter(:b=>2).all
+ 3.times{|i| @db[:table].filter(:a=>i).all}
+ a = []
+ @db.statement_cache.each{|k, v| a << [k, v]}
+ a.sort!
+ a[0][0].should == "SELECT * FROM table WHERE (a = $1::int4)"
+ a[1][0].should == "SELECT * FROM table WHERE (b = $1::int4)"
+ s1 = a[1][1]
+ s1.cache_id.should == 1
+ s1.num_executes.should == 1
+ s1 = a[0][1]
+ s1.cache_id.should == 2
+ s1.num_executes.should == 3
+ end
+
+ it "should automatically cleanup the cache when it goes beyond its maximum size" do
+ sc = @db.statement_cache
+ 4.times{|i| @db[:table].filter(:"a#{i}"=>1).all}
+ sc.size.should == 4
+ @db[:table].filter(:b=>1).all
+ sc.size.should == 2
+ end
+
+ it "should clear statement caches when altering tables" do
+ @db[:table].filter(:b=>2).all
+ sc = @db.statement_cache
+ @db.alter_table(:foo){drop_column :bar}
+ sc.size.should == 0
+ end
+
+ it "should clear statement caches when dropping tables" do
+ @db[:table].filter(:b=>2).all
+ sc = @db.statement_cache
+ @db.drop_table(:foo)
+ sc.size.should == 0
+ end
+
+ it "should deallocate prepared statements when clearing the cache" do
+ 3.times{|i| @db[:table].filter(:a=>i).all}
+ @db.sqls
+ @db.statement_cache.clear
+ @db.sqls.should == ["DEALLOCATE sequel_pgap_1"]
+ end
+
+ it "should deallocate prepared statements when cleaning up the cache" do
+ @db = @dbp.call(:sorter=>proc{|t, s| -s.num_executes})
+ 4.times{|i| @db[:table].filter(:"a#{i}"=>1).all}
+ @db[:table].filter(:a0=>1).all
+ @db.sqls
+ @db[:table].filter(:b=>1).all
+ @db.sqls.should == ["DEALLOCATE sequel_pgap_1", "SELECT * FROM table WHERE (b = $1::int4) -- args: [1]"]
+ end
+
+ it "should not deallocate nonprepared statements when clearing the cache" do
+ 4.times{|i| @db[:table].filter(:"a#{i}"=>1).all}
+ @db.sqls
+ @db.statement_cache.clear
+ @db.sqls.should == []
+ end
+
+ it "should not deallocate nonprepared statements when cleaning up the cache" do
+ @db = @dbp.call(:sorter=>proc{|t, s| -s.num_executes})
+ 4.times{|i| @db[:table].filter(:"a#{i}"=>1).all}
+ @db.sqls
+ @db[:table].filter(:b=>1).all
+ @db.sqls.should == ["SELECT * FROM table WHERE (b = $1::int4) -- args: [1]"]
+ end
+
+ it "should have a configurable max_size and min_size" do
+ @db = @dbp.call(:max_size=>10, :min_size=>2)
+ 10.times{|i| @db[:table].filter(:"a#{i}"=>1).all}
+ sc = @db.statement_cache
+ sc.size.should == 10
+ @db[:table].filter(:b=>1).all
+ sc.size.should == 2
+ end
+
+ it "should have a configurable prepare_after" do
+ @db = @dbp.call(:prepare_after=>3)
+ 4.times{|i| @db[:table].filter(:a=>i).all}
+ @db.sqls.should == ["SELECT * FROM table WHERE (a = $1::int4) -- args: [0]",
+ "SELECT * FROM table WHERE (a = $1::int4) -- args: [1]",
+ "PREPARE sequel_pgap_1 AS SELECT * FROM table WHERE (a = $1::int4)",
+ "EXECUTE sequel_pgap_1 -- args: [2]",
+ "EXECUTE sequel_pgap_1 -- args: [3]"]
+ end
+
+ it "should have a configurable sorter" do
+ @db = @dbp.call(:sorter=>proc{|t, s| s.num_executes})
+ 4.times{|i| (i+1).times{@db[:table].filter(:"a#{i}"=>1).all}}
+ @db[:table].filter(:b=>1).all
+ sc = @db.statement_cache
+ a = []
+ sc.each{|k, v| a << [k, v]}
+ a.sort!
+ a[0][0].should == "SELECT * FROM table WHERE (a3 = $1::int4)"
+ a[1][0].should == "SELECT * FROM table WHERE (b = $1::int4)"
+ s1 = a[1][1]
+ s1.num_executes.should == 1
+ s1 = a[0][1]
+ s1.cache_id.should == 4
+ s1.num_executes.should == 4
+ end
+
+ it "should ignore errors when preparing queries" do
+ 3.times{|i| @db[:table].filter(:prepare_raise=>1).all}
+ @db.sqls.should == ["SELECT * FROM table WHERE (prepare_raise = $1::int4) -- args: [1]",
+ "SELECT * FROM table WHERE (prepare_raise = $1::int4) -- args: [1]",
+ "SELECT * FROM table WHERE (prepare_raise = $1::int4) -- args: [1]"]
+ end
+
+ it "should ignore errors when deallocating queries" do
+ 3.times{|i| @db[:table].filter(:a=>1).all}
+ @db.exec_raise = true
+ @db.statement_cache.clear
+ @db.sqls.should == ["SELECT * FROM table WHERE (a = $1::int4) -- args: [1]",
+ "PREPARE sequel_pgap_1 AS SELECT * FROM table WHERE (a = $1::int4)",
+ "EXECUTE sequel_pgap_1 -- args: [1]",
+ "EXECUTE sequel_pgap_1 -- args: [1]"]
+ end
+
+end
View
2  spec/extensions/spec_helper.rb
@@ -8,7 +8,7 @@
require 'sequel/model'
end
-Sequel.extension(*%w'string_date_time inflector pagination query pretty_table blank migration schema_dumper looser_typecasting sql_expr thread_local_timezones to_dot columns_introspection server_block arbitrary_servers pg_auto_parameterize')
+Sequel.extension(*%w'string_date_time inflector pagination query pretty_table blank migration schema_dumper looser_typecasting sql_expr thread_local_timezones to_dot columns_introspection server_block arbitrary_servers pg_auto_parameterize pg_statement_cache')
{:hook_class_methods=>[], :schema=>[], :validation_class_methods=>[]}.each{|p, opts| Sequel::Model.plugin(p, *opts)}
Sequel::Dataset.introspect_all_columns if ENV['SEQUEL_COLUMNS_INTROSPECTION']
View
1  www/pages/plugins
@@ -89,6 +89,7 @@
<li><a href="rdoc-plugins/files/lib/sequel/extensions/pretty_table_rb.html">pretty_table</a>: Adds Dataset#print for printing a dataset as a simple plain-text table.</li>
<li><a href="rdoc-plugins/files/lib/sequel/extensions/query_rb.html">query</a>: Adds Dataset#query for a different interface to creating queries that doesn't use method chaining.</li>
<li><a href="rdoc-plugins/files/lib/sequel/extensions/pg_auto_parameterize_rb.html">pg_auto_parameterize</a>: Automatically parameterize queries when using the postgres adapter with the pg driver.</li>
+<li><a href="rdoc-plugins/files/lib/sequel/extensions/pg_statement_cache_rb.html">pg_statement_cache</a>: Automatically use prepared statements for most queries when using postgres adapter with the pg driver.</li>
<li><a href="rdoc-plugins/files/lib/sequel/extensions/schema_dumper_rb.html">schema_dumper</a>: Adds Database#dump_schema_migration and related methods for dumping the schema of the database as a migration that can be restored on other databases.</li>
<li><a href="rdoc-plugins/files/lib/sequel/extensions/server_block_rb.html">server_block</a>: Adds Database#with_server method that makes access inside the passed block use the specified shard by default.</li>
<li><a href="rdoc-plugins/files/lib/sequel/extensions/sql_expr_rb.html">sql_expr</a>: Adds sql_expr method to all objects, allowing easy use of Sequel's DSL.</li>
Please sign in to comment.
Something went wrong with that request. Please try again.