Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add support for read-only slave/writable master databases and databas…

…e sharding

The support is described in the sharding.rdoc file included with this
commit. This commit makes significant changes to every adapter in
order to support this new functionality.  I only have the ability to
test PostgreSQL, MySQL, and SQLite (both via the native drivers and
via JDBC), so it's possible I could have broken something on other
adapters.  If you use another adapter, please test this and see if it
breaks anything.  I try to be fairly careful whenever I change
something I can't test, but it's always possible I made an error.

This commit makes the following internal changes:

* The Database and Dataset execute and execute_dui methods now take
  an options hash.  The prepared statement support was integrated
  into this hash, resulting in a simpler implementation.
* The connection pool internals were changed significantly to allow
  connections to different servers.  The previous methods all still
  work the same way, but now take an optional server argument
  specifying which server to use.
* Many low_level methods (transaction, test_connection, synchronize,
  tables) take an optional server argument to specify the server to
  use.
* Some adapter database and dataset methods were made private.
* Adapter Dataset #fetch_rows methods that used Database#synchronize
  explicitly were modified to use Dataset#execute with a block.
  Adapter Database #execute methods were modified for these adapters
  to yield inside of #synchronize.
* Database#connect now requires a server argument.  The included
  adapters use this with the new private Database#server_opts method
  that allows overriding the default opts with the server specific
  opts.
* The JDBC and MySQL adapters were significantly refactored.
* The PostgreSQL adapter #execute_insert now takes a hash of options
  instead of table and values arguments.
* Adapters with specific support for named prepared statements now
  consider the use of a symbol as the first argument to execute
  to indicant the call of a prepared statement.  The
  execute_prepared_statement method in these adapters is now private.
* Adapter execute_select statements were removed in place of execute,
  with the original use of execute changed to execute_dui.  This
  follows the convention of using execute for SELECT queries, and
  execute_dui for DELETE/UPDATE/INSERT queries.
* Removes adapter_skeleton adapter.  The existing adapters provide
  better examples of how things should be done compared to this
  example file.
* No longer defines model methods for non-public dataset methods
  specified in plugins.
  • Loading branch information...
commit 7aeea22dd348f55341cb2bb99b462ee6d5ab564d 1 parent 7b2cee2
@jeremyevans authored
Showing with 839 additions and 591 deletions.
  1. +2 −0  CHANGELOG
  2. +113 −0 doc/sharding.rdoc
  3. +0 −54 lib/sequel_core/adapters/adapter_skeleton.rb
  4. +15 −10 lib/sequel_core/adapters/ado.rb
  5. +30 −33 lib/sequel_core/adapters/db2.rb
  6. +15 −13 lib/sequel_core/adapters/dbi.rb
  7. +13 −14 lib/sequel_core/adapters/informix.rb
  8. +105 −103 lib/sequel_core/adapters/jdbc.rb
  9. +2 −2 lib/sequel_core/adapters/jdbc/mysql.rb
  10. +4 −22 lib/sequel_core/adapters/jdbc/postgresql.rb
  11. +82 −90 lib/sequel_core/adapters/mysql.rb
  12. +17 −15 lib/sequel_core/adapters/odbc.rb
  13. +10 −7 lib/sequel_core/adapters/openbase.rb
  14. +17 −16 lib/sequel_core/adapters/oracle.rb
  15. +64 −41 lib/sequel_core/adapters/postgres.rb
  16. +12 −27 lib/sequel_core/adapters/shared/postgres.rb
  17. +9 −8 lib/sequel_core/adapters/shared/sqlite.rb
  18. +28 −35 lib/sequel_core/adapters/sqlite.rb
  19. +85 −63 lib/sequel_core/connection_pool.rb
  20. +32 −12 lib/sequel_core/database.rb
  21. +11 −4 lib/sequel_core/dataset.rb
  22. +1 −1  lib/sequel_model/plugins.rb
  23. +105 −3 spec/sequel_core/connection_pool_spec.rb
  24. +30 −3 spec/sequel_core/database_spec.rb
  25. +34 −12 spec/sequel_core/dataset_spec.rb
  26. +2 −2 spec/sequel_core/spec_helper.rb
  27. +1 −1  spec/sequel_model/spec_helper.rb
View
2  CHANGELOG
@@ -1,5 +1,7 @@
=== HEAD
+* Add support for read-only slave/writable master databases and database sharding (jeremyevans)
+
* Remove InvalidExpression, InvalidFilter, InvalidJoinType, and WorkerStop exceptions (jeremyevans)
* Add prepared statement/bound variable support (jeremyevans)
View
113 doc/sharding.rdoc
@@ -0,0 +1,113 @@
+= Read-Only Slaves/Writable Master and Database Sharding
+
+Starting with version 2.4.0, Sequel has support for read only slave databases
+with a writable master database, as well as database sharding (where you can
+pick a database connection to use for a given dataset). Support for both
+features is database independent, and should work for all database adapters
+included with Sequel.
+
+== The :servers Database option
+
+Both features use the new :servers Database option. The :servers option should
+be a hash with symbol keys and values that are either hashes or procs that
+return hashes. Note that all servers should have the same schema, unless you
+really know what you are doing.
+
+== Master and Slave Database Configurations
+
+=== Single Read-Only Slave, Single Master
+
+To use a single, read-only slave that handles SELECT queries, the following
+is the simplest configuration:
+
+ DB=Sequel.connect('postgres://master_server/database', \
+ :servers=>{:read_only=>{:host=>'slave_server'}})
+
+This will use the host slave_server for SELECT queries and master_server for
+other queries.
+
+=== Multiple Read-Only Slaves, Single Master
+
+Let's say you have 4 slave database servers with names slave_server0,
+slave_server1, slave_server2, and slave_server3.
+
+ DB=Sequel.connect('postgres://master_server/database', \
+ :servers=>{:read_only=>proc{|db| :host=>db.get_slave_host}})
+ def DB.get_slave_host
+ @current_host ||= -1
+ "slave_server#{(@current_host+=1)%4}"
+ end
+
+This will use one of the slave servers for SELECT queries and use the
+master_server for other queries. It's also possible to pick a random host
+instead of using the round robin approach presented above, but that can result
+in less optimal resource usage.
+
+=== Multiple Read-Only Slaves, Multiple Masters
+
+This involves the same basic idea as the multiple slaves, single master, but
+it shows that the master database is named :default. So for 4 masters and
+4 slaves:
+
+ DB=Sequel.connect('postgres://master_server/database', \
+ :servers=>{:read_only=>proc{|db| :host=>db.get_slave_host}, \
+ :default=>proc{|db| :host=>db.get_master_host}})
+ def DB.get_slave_host
+ @current_slave_host ||= -1
+ "slave_server#{(@current_slave_host+=1)%4}"
+ end
+ def DB.get_master_host
+ @current_master_host ||= -1
+ "master_server#{(@current_master_host+=1)%4}"
+ end
+
+== Sharding
+
+There is specific support in Sequel for handling master/slave database
+combinations, with the only necessary setup being the database configuration.
+However, since sharding is always going to be implementation dependent, Sequel
+supplies the basic infrastructure, but you have to tell it which server to use
+for each dataset. Let's assume the simple scenario, a distributed rainbow
+table for SHA-1 hashes, sharding based on the first hex character (for a total
+of 16 shards). First, you need to configure the database:
+
+ servers = {}
+ (('0'..'9').to_a + ('a'..'f').to_a).each do |hex|
+ servers[hex.to_sym] = {:host=>"hash_host_#{hex}"}
+ end
+ DB=Sequel.connect('postgres://hash_host/hashes', :servers=>servers)
+
+This configures 17 servers, the 16 shard servers (/hash_host_[0-9a-f]/), and 1
+default server which will be used if no shard is specified ("hash_host"). If
+you want the default server to be one of the shard servers (e.g. hash_host_a),
+it's easiest to do:
+
+ DB=Sequel.connect('postgres://hash_host_a/hashes', :servers=>servers)
+
+That will still set up a second pool of connections for the default server,
+since it considers the default server and shard servers independent. Note that
+if you always set the shard on a dataset before using it in queries, it will
+not attempt to connect to the default server. Sequel may use the default
+server in queries it generates itself, such as to get column names or table
+schemas, so it is always good to have a default server that works.
+
+To set the shard for a given query, you use the Dataset#server method:
+
+ DB[:hashes].server(:a).filter(:hash=>/31337/)
+
+That will return all matching rows on the hash_host_a shard that have a hash
+column that contains 31337.
+
+Rainbow tables are generally used to find specific hashes, so to save some
+work, you might want to add a method to the dataset that automatically sets
+the shard to use. This is fairly easy using a Sequel::Model:
+
+ class Rainbow < Sequel::Model(:hashes)
+ def_dataset_method(:plaintext_for_hash) do |hash|
+ raise(ArgumentError, 'Invalid SHA-1 Hash') unless /\A[0-9a-f]{40}\z/.match(hash)
+ row = self.server(hash[0...1].to_sym).first(:hash=>hash)
+ row[:plaintext] if row
+ end
+ end
+
+ Rainbow.plaintext_for_hash("e580726d31f6e1ad216ffd87279e536d1f74e606")
View
54 lib/sequel_core/adapters/adapter_skeleton.rb
@@ -1,54 +0,0 @@
-module Sequel
- module Adapter
- class Database < Sequel::Database
- set_adapter_scheme :adapter
-
- def connect
- AdapterDB.new(@opts[:database], @opts[:user], @opts[:password])
- end
-
- def disconnect
- @pool.disconnect {|c| c.disconnect}
- end
-
- def dataset(opts = nil)
- Adapter::Dataset.new(self, opts)
- end
-
- def execute(sql)
- log_info(sql)
- @pool.hold {|conn| conn.exec(sql)}
- end
- end
-
- class Dataset < Sequel::Dataset
- def literal(v)
- case v
- when Time
- literal(v.iso8601)
- when Date, DateTime
- literal(v.to_s)
- else
- super
- end
- end
-
- def fetch_rows(sql, &block)
- @db.synchronize do
- cursor = @db.execute sql
- begin
- @columns = cursor.get_col_names.map {|c| c.to_sym}
- while r = cursor.fetch
- row = {}
- r.each_with_index {|v, i| row[@columns[i]] = v}
- yield row
- end
- ensure
- cursor.close
- end
- end
- self
- end
- end
- end
-end
View
25 lib/sequel_core/adapters/ado.rb
@@ -14,14 +14,15 @@ module ADO
class Database < Sequel::Database
set_adapter_scheme :ado
- def connect
- @opts[:driver] ||= 'SQL Server'
- case @opts[:driver]
+ def connect(server)
+ opts = server_opts(server)
+ opts[:driver] ||= 'SQL Server'
+ case opts[:driver]
when 'SQL Server'
require 'sequel_core/adapters/shared/mssql'
extend Sequel::MSSQL::DatabaseMethods
end
- s = "driver=#{@opts[:driver]};server=#{@opts[:host]};database=#{@opts[:database]}#{";uid=#{@opts[:user]};pwd=#{@opts[:password]}" if @opts[:user]}"
+ s = "driver=#{opts[:driver]};server=#{opts[:host]};database=#{opts[:database]}#{";uid=#{opts[:user]};pwd=#{opts[:password]}" if opts[:user]}"
handle = WIN32OLE.new('ADODB.Connection')
handle.Open(s)
handle
@@ -35,9 +36,13 @@ def dataset(opts = nil)
ADO::Dataset.new(self, opts)
end
- def execute(sql)
+ def execute(sql, opts={})
log_info(sql)
- @pool.hold {|conn| conn.Execute(sql)}
+ synchronize(opts[:server]) do |conn|
+ r = conn.Execute(sql)
+ yield(r) if block_given?
+ r
+ end
end
alias_method :do, :execute
end
@@ -54,10 +59,8 @@ def literal(v)
end
end
- def fetch_rows(sql, &block)
- @db.synchronize do
- s = @db.execute sql
-
+ def fetch_rows(sql)
+ execute(sql) do |s|
@columns = s.Fields.extend(Enumerable).map do |column|
name = column.Name.empty? ? '(no column name)' : column.Name
name.to_sym
@@ -71,6 +74,8 @@ def fetch_rows(sql, &block)
self
end
+ private
+
def hash_row(row)
@columns.inject({}) do |m, c|
m[c] = row.shift
View
63 lib/sequel_core/adapters/db2.rb
@@ -6,29 +6,15 @@ class Database < Sequel::Database
set_adapter_scheme :db2
include DB2CLI
- # AUTO_INCREMENT = 'IDENTITY(1,1)'.freeze
- #
- # def auto_increment_sql
- # AUTO_INCREMENT
- # end
-
- def check_error(rc, msg)
- case rc
- when SQL_SUCCESS, SQL_SUCCESS_WITH_INFO
- nil
- else
- raise Error, msg
- end
- end
-
rc, @@env = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE)
- check_error(rc, "Could not allocate DB2 environment")
+ #check_error(rc, "Could not allocate DB2 environment")
- def connect
+ def connect(server)
+ opts = server_opts(server)
rc, dbc = SQLAllocHandle(SQL_HANDLE_DBC, @@env)
check_error(rc, "Could not allocate database connection")
- rc = SQLConnect(dbc, @opts[:database], @opts[:user], @opts[:password])
+ rc = SQLConnect(dbc, opts[:database], opts[:user], opts[:password])
check_error(rc, "Could not connect to database")
dbc
@@ -44,8 +30,8 @@ def disconnect
end
end
- def test_connection
- @pool.hold {|conn|}
+ def test_connection(server=nil)
+ synchronize(server){|conn|}
true
end
@@ -53,9 +39,9 @@ def dataset(opts = nil)
DB2::Dataset.new(self, opts)
end
- def execute(sql, &block)
+ def execute(sql, opts={})
log_info(sql)
- @pool.hold do |conn|
+ synchronize(opts[:server]) do |conn|
rc, sth = SQLAllocHandle(SQL_HANDLE_STMT, @handle)
check_error(rc, "Could not allocate statement")
@@ -63,7 +49,7 @@ def execute(sql, &block)
rc = SQLExecDirect(sth, sql)
check_error(rc, "Could not execute statement")
- block[sth] if block
+ yield(sth) if block_given?
rc, rpc = SQLRowCount(sth)
check_error(rc, "Could not get RPC")
@@ -75,9 +61,22 @@ def execute(sql, &block)
end
end
alias_method :do, :execute
+
+ private
+
+ def check_error(rc, msg)
+ case rc
+ when SQL_SUCCESS, SQL_SUCCESS_WITH_INFO
+ nil
+ else
+ raise Error, msg
+ end
+ end
end
class Dataset < Sequel::Dataset
+ MAX_COL_SIZE = 256
+
def literal(v)
case v
when Time
@@ -89,21 +88,19 @@ def literal(v)
end
end
- def fetch_rows(sql, &block)
- @db.synchronize do
- @db.execute(sql) do |sth|
- @column_info = get_column_info(sth)
- @columns = @column_info.map {|c| c[:name]}
- while (rc = SQLFetch(@handle)) != SQL_NO_DATA_FOUND
- @db.check_error(rc, "Could not fetch row")
- yield hash_row(sth)
- end
+ def fetch_rows(sql)
+ execute(sql) do |sth|
+ @column_info = get_column_info(sth)
+ @columns = @column_info.map {|c| c[:name]}
+ while (rc = SQLFetch(@handle)) != SQL_NO_DATA_FOUND
+ @db.check_error(rc, "Could not fetch row")
+ yield hash_row(sth)
end
end
self
end
- MAX_COL_SIZE = 256
+ private
def get_column_info(sth)
rc, column_count = SQLNumResultCols(sth)
View
28 lib/sequel_core/adapters/dbi.rb
@@ -41,13 +41,14 @@ def self.uri_to_options(uri) # :nodoc:
private_class_method :uri_to_options
- def connect
- dbname = @opts[:database]
+ def connect(server)
+ opts = server_opts(server)
+ dbname = opts[:database]
if dbname !~ /^DBI:/ then
dbname = "DBI:#{dbname}"
- [:host, :port].each{|sym| dbname += ";#{sym}=#{@opts[sym]}" unless @opts[sym].blank?}
+ [:host, :port].each{|sym| dbname += ";#{sym}=#{opts[sym]}" unless opts[sym].blank?}
end
- ::DBI.connect(dbname, @opts[:user], @opts[:password])
+ ::DBI.connect(dbname, opts[:user], opts[:password])
end
def disconnect
@@ -58,18 +59,18 @@ def dataset(opts = nil)
DBI::Dataset.new(self, opts)
end
- def execute(sql)
+ def execute(sql, opts={})
log_info(sql)
- @pool.hold do |conn|
- conn.execute(sql)
+ synchronize(opts[:server]) do |conn|
+ r = conn.execute(sql)
+ yield(r) if block_given?
+ r
end
end
- def do(sql)
+ def do(sql, opts={})
log_info(sql)
- @pool.hold do |conn|
- conn.do(sql)
- end
+ synchronize(opts[:server]){|conn| conn.do(sql)}
end
alias_method :execute_dui, :do
@@ -92,8 +93,7 @@ def literal(v)
end
def fetch_rows(sql, &block)
- @db.synchronize do
- s = @db.execute sql
+ execute(sql) do |s|
begin
@columns = s.column_names.map do |c|
@db.lowercase ? c.downcase.to_sym : c.to_sym
@@ -106,6 +106,8 @@ def fetch_rows(sql, &block)
self
end
+ private
+
def hash_row(stmt, row)
@columns.inject({}) do |m, c|
m[c] = row.shift
View
27 lib/sequel_core/adapters/informix.rb
@@ -11,12 +11,13 @@ class Database < Sequel::Database
# AUTO_INCREMENT
# end
- def connect
- ::Informix.connect(@opts[:database], @opts[:user], @opts[:password])
+ def connect(server)
+ opts = server_opts(server)
+ ::Informix.connect(opts[:database], opts[:user], opts[:password])
end
def disconnect
- @pool.disconnect {|c| c.close}
+ @pool.disconnect{|c| c.close}
end
def dataset(opts = nil)
@@ -24,15 +25,15 @@ def dataset(opts = nil)
end
# Returns number of rows affected
- def execute_dui(sql)
+ def execute_dui(sql, opts={})
log_info(sql)
- @pool.hold {|c| c.immediate(sql)}
+ synchronize(opts[:server]){|c| c.immediate(sql)}
end
alias_method :do, :execute_dui
- def execute(sql, &block)
+ def execute(sql, opts={})
log_info(sql)
- @pool.hold {|c| block[c.cursor(sql)]}
+ synchronize(opts[:server]){|c| yield c.cursor(sql)}
end
alias_method :query, :execute
end
@@ -62,13 +63,11 @@ def select_sql(opts = nil)
end
def fetch_rows(sql, &block)
- @db.synchronize do
- @db.execute(sql) do |cursor|
- begin
- cursor.open.each_hash(&block)
- ensure
- cursor.drop
- end
+ execute(sql) do |cursor|
+ begin
+ cursor.open.each_hash(&block)
+ ensure
+ cursor.drop
end
end
self
View
208 lib/sequel_core/adapters/jdbc.rb
@@ -90,8 +90,8 @@ def initialize(opts)
end
# Connect to the database using JavaSQL::DriverManager.getConnection.
- def connect
- setup_connection(JavaSQL::DriverManager.getConnection(uri))
+ def connect(server)
+ setup_connection(JavaSQL::DriverManager.getConnection(uri(server_opts(server))))
end
# Return instances of JDBC::Dataset with the given opts.
@@ -104,85 +104,53 @@ def disconnect
@pool.disconnect {|c| c.close}
end
- # Execute the given SQL, which should be a SELECT statement
- # or something else that returns rows.
- def execute(sql, &block)
- _execute(sql, :type=>:select, &block)
- end
-
- # Execute the given DDL SQL, which should not return any
- # values or rows.
- def execute_ddl(sql)
- _execute(sql, :type=>:ddl)
- end
-
- # Execute the given DELETE, UPDATE, or INSERT SQL, returning
- # the number of rows affected.
- def execute_dui(sql)
- _execute(sql, :type=>:dui)
- end
-
- # Execute the given INSERT SQL, returning the last inserted
- # row id.
- def execute_insert(sql)
- _execute(sql, :type=>:insert)
- end
-
- # Execute the prepared statement. If the provided name is a
- # dataset, use that as the prepared statement, otherwise use
- # it as a key to look it up in the prepared_statements hash.
- # If the connection we are using has already prepared an identical
- # statement, use that statement instead of creating another.
- # Otherwise, prepare a new statement for the connection, bind the
- # variables, and execute it.
- def execute_prepared_statement(name, args=[], opts={})
- if Dataset === name
- ps = name
- name = ps.prepared_statement_name
- else
- ps = prepared_statements[name]
- end
- sql = ps.prepared_sql
- synchronize do |conn|
- if name and cps = conn.prepared_statements[name] and cps[0] == sql
- cps = cps[1]
- else
- if cps
- log_info("Closing #{name}")
- cps[1].close
- end
- log_info("Preparing#{" #{name}:" if name} #{sql}")
- cps = conn.prepareStatement(sql)
- conn.prepared_statements[name] = [sql, cps] if name
- end
- i = 0
- args.each{|arg| set_ps_arg(cps, arg, i+=1)}
- log_info("Executing#{" #{name}" if name}", args)
+ # Execute the given SQL. If a block is given, if should be a SELECT
+ # statement or something else that returns rows.
+ def execute(sql, opts={}, &block)
+ return execute_prepared_statement(sql, opts, &block) if sql.is_one_of?(Symbol, Dataset)
+ log_info(sql)
+ synchronize(opts[:server]) do |conn|
+ stmt = conn.createStatement
begin
- case opts[:type]
- when :select
- yield cps.executeQuery
- when :ddl
- cps.execute
- when :insert
- cps.executeUpdate
- last_insert_id(conn, opts)
+ if block_given?
+ yield stmt.executeQuery(sql)
else
- cps.executeUpdate
+ case opts[:type]
+ when :ddl
+ stmt.execute(sql)
+ when :insert
+ stmt.executeUpdate(sql)
+ last_insert_id(conn, opts)
+ else
+ stmt.executeUpdate(sql)
+ end
end
rescue NativeException, JavaSQL::SQLException => e
raise Error, e.message
ensure
- cps.close unless name
+ stmt.close
end
end
end
+ alias execute_dui execute
+
+ # Execute the given DDL SQL, which should not return any
+ # values or rows.
+ def execute_ddl(sql, opts={})
+ execute(sql, {:type=>:ddl}.merge(opts))
+ end
+
+ # Execute the given INSERT SQL, returning the last inserted
+ # row id.
+ def execute_insert(sql, opts={})
+ execute(sql, {:type=>:insert}.merge(opts))
+ end
# Default transaction method that should work on most JDBC
# databases. Does not use the JDBC transaction methods, uses
# SQL BEGIN/ROLLBACK/COMMIT statements instead.
- def transaction
- synchronize do |conn|
+ def transaction(server=nil)
+ synchronize(server) do |conn|
return yield(conn) if @transactions.include?(Thread.current)
stmt = conn.createStatement
begin
@@ -209,36 +177,64 @@ def transaction
# using the :uri, :url, or :database options. You don't
# need to worry about this if you use Sequel.connect
# with the JDBC connectrion strings.
- def uri
- ur = @opts[:uri] || @opts[:url] || @opts[:database]
+ def uri(opts={})
+ opts = @opts.merge(opts)
+ ur = opts[:uri] || opts[:url] || opts[:database]
ur =~ /^\Ajdbc:/ ? ur : "jdbc:#{ur}"
end
alias url uri
private
- # Execute the SQL. Use the :type option to see which JDBC method
- # to use.
- def _execute(sql, opts)
- log_info(sql)
- synchronize do |conn|
- stmt = conn.createStatement
+ # Execute the prepared statement. If the provided name is a
+ # dataset, use that as the prepared statement, otherwise use
+ # it as a key to look it up in the prepared_statements hash.
+ # If the connection we are using has already prepared an identical
+ # statement, use that statement instead of creating another.
+ # Otherwise, prepare a new statement for the connection, bind the
+ # variables, and execute it.
+ def execute_prepared_statement(name, opts={})
+ args = opts[:arguments]
+ if Dataset === name
+ ps = name
+ name = ps.prepared_statement_name
+ else
+ ps = prepared_statements[name]
+ end
+ sql = ps.prepared_sql
+ synchronize(opts[:server]) do |conn|
+ if name and cps = conn.prepared_statements[name] and cps[0] == sql
+ cps = cps[1]
+ else
+ if cps
+ log_info("Closing #{name}")
+ cps[1].close
+ end
+ log_info("Preparing#{" #{name}:" if name} #{sql}")
+ cps = conn.prepareStatement(sql)
+ conn.prepared_statements[name] = [sql, cps] if name
+ end
+ i = 0
+ args.each{|arg| set_ps_arg(cps, arg, i+=1)}
+ log_info("Executing#{" #{name}" if name}", args)
begin
- case opts[:type]
- when :select
- yield stmt.executeQuery(sql)
- when :ddl
- stmt.execute(sql)
- when :insert
- stmt.executeUpdate(sql)
- last_insert_id(conn, opts)
+ if block_given?
+ yield cps.executeQuery
else
- stmt.executeUpdate(sql)
+ case opts[:type]
+ when :ddl
+ cps.execute
+ when :insert
+ cps.executeUpdate
+ last_insert_id(conn, opts)
+ else
+ cps.executeUpdate
+ end
end
rescue NativeException, JavaSQL::SQLException => e
raise Error, e.message
ensure
- stmt.close
+ cps.close unless name
end
end
end
@@ -298,11 +294,19 @@ module PreparedStatementMethods
# Execute the prepared SQL using the stored type and
# arguments derived from the hash passed to call.
- def execute(sql, &block)
- @db.execute_prepared_statement(self, bind_arguments, :type=>sql_query_type, &block)
+ def execute(sql, opts={}, &block)
+ super(self, {:arguments=>bind_arguments, :type=>sql_query_type}.merge(opts), &block)
+ end
+
+ # Same as execute, explicit due to intricacies of alias and super.
+ def execute_dui(sql, opts={}, &block)
+ super(self, {:arguments=>bind_arguments, :type=>sql_query_type}.merge(opts), &block)
+ end
+
+ # Same as execute, explicit due to intricacies of alias and super.
+ def execute_insert(sql, opts={}, &block)
+ super(self, {:arguments=>bind_arguments, :type=>sql_query_type}.merge(opts), &block)
end
- alias execute_dui execute
- alias execute_insert execute
end
# Create an unnamed prepared statement and call it. Allows the
@@ -313,20 +317,18 @@ def call(type, hash, values=nil, &block)
# Correctly return rows from the database and return them as hashes.
def fetch_rows(sql, &block)
- @db.synchronize do
- execute(sql) do |result|
- # get column names
- meta = result.getMetaData
- column_count = meta.getColumnCount
- @columns = []
- column_count.times {|i| @columns << meta.getColumnName(i+1).to_sym}
+ execute(sql) do |result|
+ # get column names
+ meta = result.getMetaData
+ column_count = meta.getColumnCount
+ @columns = []
+ column_count.times {|i| @columns << meta.getColumnName(i+1).to_sym}
- # get rows
- while result.next
- row = {}
- @columns.each_with_index {|v, i| row[v] = result.getObject(i+1)}
- yield row
- end
+ # get rows
+ while result.next
+ row = {}
+ @columns.each_with_index {|v, i| row[v] = result.getObject(i+1)}
+ yield row
end
end
self
View
4 lib/sequel_core/adapters/jdbc/mysql.rb
@@ -54,8 +54,8 @@ def replace(*args)
private
# Call execute_insert on the database.
- def execute_insert(sql)
- @db.execute_insert(sql)
+ def execute_insert(sql, opts={})
+ @db.execute_insert(sql, {:server=>@opts[:server] || :default}.merge(opts))
end
end
end
View
26 lib/sequel_core/adapters/jdbc/postgresql.rb
@@ -14,8 +14,8 @@ module AdapterMethods
# Give the JDBC adapter a direct execute method, which creates
# a statement with the given sql and executes it.
- def execute(sql, method=:execute)
- method = :executeQuery if block_given?
+ def execute(sql, args=nil)
+ method = block_given? ? :executeQuery : :execute
stmt = createStatement
begin
rows = stmt.send(method, sql)
@@ -55,8 +55,8 @@ def dataset(opts=nil)
# Run the INSERT sql on the database and return the primary key
# for the record.
- def execute_insert(sql, table, values)
- _execute(sql, :type=>:insert, :table=>table, :values=>values)
+ def execute_insert(sql, opts={})
+ super(sql, {:type=>:insert}.merge(opts))
end
private
@@ -78,17 +78,6 @@ def last_insert_id(conn, opts)
class Dataset < JDBC::Dataset
include Sequel::Postgres::DatasetMethods
- # Methods to support JDBC PostgreSQL prepared statements
- module PreparedStatementMethods
- private
-
- # Add the table and values to the opts call so they can later
- # be pulled by the DatabaseMethods#last_insert_id
- def execute_insert(sql, table, values)
- @db.execute_prepared_statement(self, bind_arguments, :type=>:insert, :table=>table, :values=>values)
- end
- end
-
# Convert Java::JavaSql::Timestamps correctly, and handle SQL::Blobs
# correctly.
def literal(v)
@@ -101,13 +90,6 @@ def literal(v)
super
end
end
-
- # Extend the prepared statement created with PreparedStatementMethods.
- def prepare(type, name, values=nil)
- ps = super
- ps.extend(PreparedStatementMethods)
- ps
- end
end
end
end
View
172 lib/sequel_core/adapters/mysql.rb
@@ -91,22 +91,23 @@ class Database < Sequel::Database
# * :encoding, :charset - Set all the related character sets for this
# connection (connection, client, database, server, and results).
# * :socket - Use a unix socket file instead of connecting via TCP/IP.
- def connect
+ def connect(server)
+ opts = server_opts(server)
conn = Mysql.init
conn.options(Mysql::OPT_LOCAL_INFILE, "client")
conn.real_connect(
- @opts[:host] || 'localhost',
- @opts[:user],
- @opts[:password],
- @opts[:database],
- @opts[:port],
- @opts[:socket],
+ opts[:host] || 'localhost',
+ opts[:user],
+ opts[:password],
+ opts[:database],
+ opts[:port],
+ opts[:socket],
Mysql::CLIENT_MULTI_RESULTS +
Mysql::CLIENT_MULTI_STATEMENTS +
Mysql::CLIENT_COMPRESS
)
conn.query_with_result = false
- if encoding = @opts[:encoding] || @opts[:charset]
+ if encoding = opts[:encoding] || opts[:charset]
conn.query("set character_set_connection = '#{encoding}'")
conn.query("set character_set_client = '#{encoding}'")
conn.query("set character_set_database = '#{encoding}'")
@@ -131,85 +132,28 @@ def disconnect
# Executes the given SQL using an available connection, yielding the
# connection if the block is given.
- def execute(sql)
+ def execute(sql, opts={}, &block)
+ return execute_prepared_statement(sql, opts, &block) if Symbol === sql
begin
- log_info(sql)
- synchronize do |conn|
- conn.query(sql)
- yield conn if block_given?
- end
+ synchronize(opts[:server]){|conn| _execute(conn, sql, opts, &block)}
rescue Mysql::Error => e
raise Error.new(e.message)
end
end
- # Executes a prepared statement on an available connection. If the
- # prepared statement already exists for the connection and has the same
- # SQL, reuse it, otherwise, prepare the new statement. Because of the
- # usual MySQL stupidity, we are forced to name arguments via separate
- # SET queries. Use @sequel_arg_N (for N starting at 1) for these
- # arguments.
- def execute_prepared_statement(ps_name, args, opts={})
- ps = prepared_statements[ps_name]
- sql = ps.prepared_sql
- synchronize do |conn|
- unless conn.prepared_statements[ps_name] == sql
- conn.prepared_statements[ps_name] = sql
- s = "PREPARE #{ps_name} FROM '#{::Mysql.quote(sql)}'"
- log_info(s)
- conn.query(s)
- end
- i = 0
- args.each do |arg|
- s = "SET @sequel_arg_#{i+=1} = #{literal(arg)}"
- log_info(s)
- conn.query(s)
- end
- s = "EXECUTE #{ps_name}#{" USING #{(1..i).map{|j| "@sequel_arg_#{j}"}.join(', ')}" unless i == 0}"
- log_info(s)
- conn.query(s)
- if opts[:select]
- r = conn.use_result
- begin
- yield r
- ensure
- r.free
- end
- else
- yield conn if block_given?
- end
- end
- end
-
- # Execute the given SQL, yielding the result set. Until the block
- # given returns, no queries can use this connection. For that
- # reason, if you need to use nested queries in MySQL, you must
- # get all records at once for the outer queries (e.g.
- # DB[:i].all{|i| DB[:j].each{|j|}}.
- def execute_select(sql)
- execute(sql) do |c|
- r = c.use_result
- begin
- yield r
- ensure
- r.free
- end
- end
- end
-
# Return the version of the MySQL server two which we are connecting.
- def server_version
- @server_version ||= (synchronize{|conn| conn.server_version if conn.respond_to?(:server_version)} || super)
+ def server_version(server=nil)
+ @server_version ||= (synchronize(server){|conn| conn.server_version if conn.respond_to?(:server_version)} || super)
end
# Return an array of symbols specifying table names in the current database.
- def tables
- synchronize{|conn| conn.list_tables.map {|t| t.to_sym}}
+ def tables(server=nil)
+ synchronize(server){|conn| conn.list_tables.map {|t| t.to_sym}}
end
# Support single level transactions on MySQL.
- def transaction
- synchronize do |conn|
+ def transaction(server=nil)
+ synchronize(server) do |conn|
return yield(conn) if @transactions.include?(Thread.current)
log_info(SQL_BEGIN)
conn.query(SQL_BEGIN)
@@ -232,6 +176,24 @@ def transaction
private
+ # Execute the given SQL on the given connection. If the :type
+ # option is :select, yield the result of the query, otherwise
+ # yield the connection if a block is given.
+ def _execute(conn, sql, opts)
+ log_info(sql)
+ conn.query(sql)
+ if opts[:type] == :select
+ r = conn.use_result
+ begin
+ yield r
+ ensure
+ r.free
+ end
+ else
+ yield conn if block_given?
+ end
+ end
+
# MySQL doesn't need the connection pool to convert exceptions.
def connection_pool_default_options
super.merge(:pool_convert_exceptions=>false)
@@ -242,6 +204,33 @@ def connection_pool_default_options
def database_name
@opts[:database]
end
+
+ # Executes a prepared statement on an available connection. If the
+ # prepared statement already exists for the connection and has the same
+ # SQL, reuse it, otherwise, prepare the new statement. Because of the
+ # usual MySQL stupidity, we are forced to name arguments via separate
+ # SET queries. Use @sequel_arg_N (for N starting at 1) for these
+ # arguments.
+ def execute_prepared_statement(ps_name, opts, &block)
+ args = opts[:arguments]
+ ps = prepared_statements[ps_name]
+ sql = ps.prepared_sql
+ synchronize(opts[:server]) do |conn|
+ unless conn.prepared_statements[ps_name] == sql
+ conn.prepared_statements[ps_name] = sql
+ s = "PREPARE #{ps_name} FROM '#{::Mysql.quote(sql)}'"
+ log_info(s)
+ conn.query(s)
+ end
+ i = 0
+ args.each do |arg|
+ s = "SET @sequel_arg_#{i+=1} = #{literal(arg)}"
+ log_info(s)
+ conn.query(s)
+ end
+ _execute(conn, "EXECUTE #{ps_name}#{" USING #{(1..i).map{|j| "@sequel_arg_#{j}"}.join(', ')}" unless i == 0}", opts, &block)
+ end
+ end
end
# Dataset class for MySQL datasets accessed via the native driver.
@@ -253,27 +242,25 @@ module PreparedStatementMethods
include Sequel::Dataset::UnnumberedArgumentMapper
# Execute the prepared statement with the bind arguments instead of
- # the given SQL, yielding the connection to the block.
- def execute(sql, &block)
- @db.execute_prepared_statement(prepared_statement_name, bind_arguments, &block)
+ # the given SQL.
+ def execute(sql, opts={}, &block)
+ super(prepared_statement_name, {:arguments=>bind_arguments}.merge(opts), &block)
end
- alias execute_dui execute
- # Execute the prepared statement with the bind arguments instead of
- # the given SQL, yielding the rows to the block.
- def execute_select(sql, &block)
- @db.execute_prepared_statement(prepared_statement_name, bind_arguments, :select=>true, &block)
+ # Same as execute, explicit due to intricacies of alias and super.
+ def execute_dui(sql, opts={}, &block)
+ super(prepared_statement_name, {:arguments=>bind_arguments}.merge(opts), &block)
end
end
# Delete rows matching this dataset
def delete(opts = nil)
- execute(delete_sql(opts)){|c| c.affected_rows}
+ execute_dui(delete_sql(opts)){|c| c.affected_rows}
end
# Yield all rows matching this dataset
def fetch_rows(sql)
- execute_select(sql) do |r|
+ execute(sql) do |r|
@columns = r.columns
r.sequel_each_hash {|row| yield row}
end
@@ -282,7 +269,7 @@ def fetch_rows(sql)
# Insert a new value into this dataset
def insert(*values)
- execute(insert_sql(*values)){|c| c.insert_id}
+ execute_dui(insert_sql(*values)){|c| c.insert_id}
end
# Handle correct quoting of strings using ::MySQL.quote.
@@ -308,19 +295,24 @@ def prepare(type, name, values=nil)
# Replace (update or insert) the matching row.
def replace(*args)
- execute(replace_sql(*args)){|c| c.insert_id}
+ execute_dui(replace_sql(*args)){|c| c.insert_id}
end
# Update the matching rows.
def update(*args)
- execute(update_sql(*args)){|c| c.affected_rows}
+ execute_dui(update_sql(*args)){|c| c.affected_rows}
end
private
- # Run execute_select with the given SQL against the associated database.
- def execute_select(sql, &block)
- @db.execute_select(sql, &block)
+ # Set the :type option to :select if it hasn't been set.
+ def execute(sql, opts={}, &block)
+ super(sql, {:type=>:select}.merge(opts), &block)
+ end
+
+ # Set the :type option to :dui if it hasn't been set.
+ def execute_dui(sql, opts={}, &block)
+ super(sql, {:type=>:dui}.merge(opts), &block)
end
end
end
View
32 lib/sequel_core/adapters/odbc.rb
@@ -8,16 +8,17 @@ class Database < Sequel::Database
GUARDED_DRV_NAME = /^\{.+\}$/.freeze
DRV_NAME_GUARDS = '{%s}'.freeze
- def connect
- case @opts[:db_type]
+ def connect(server)
+ opts = server_opts(server)
+ case opts[:db_type]
when 'mssql'
require 'sequel_core/adapters/shared/mssql'
extend Sequel::MSSQL::DatabaseMethods
end
- if @opts.include? :driver
+ if opts.include? :driver
drv = ::ODBC::Driver.new
drv.name = 'Sequel ODBC Driver130'
- @opts.each do |param, value|
+ opts.each do |param, value|
if :driver == param and not (value =~ GUARDED_DRV_NAME)
value = DRV_NAME_GUARDS % value
end
@@ -26,7 +27,7 @@ def connect
db = ::ODBC::Database.new
conn = db.drvconnect(drv)
else
- conn = ::ODBC::connect(@opts[:database], @opts[:user], @opts[:password])
+ conn = ::ODBC::connect(opts[:database], opts[:user], opts[:password])
end
conn.autocommit = true
conn
@@ -44,20 +45,20 @@ def dataset(opts = nil)
# you call execute manually, or you will get warnings. See the
# fetch_rows method source code for an example of how to drop
# the statements.
- def execute(sql)
+ def execute(sql, opts={})
log_info(sql)
- @pool.hold do |conn|
- conn.run(sql)
+ synchronize(opts[:server]) do |conn|
+ r = conn.run(sql)
+ yield(r) if block_given?
+ r
end
end
- def do(sql)
+ def execute_dui(sql, opts={})
log_info(sql)
- @pool.hold do |conn|
- conn.do(sql)
- end
+ synchronize(opts[:server]){|conn| conn.do(sql)}
end
- alias_method :execute_dui, :do
+ alias_method :do, :execute_dui
end
class Dataset < Sequel::Dataset
@@ -89,8 +90,7 @@ def literal(v)
UNTITLED_COLUMN = 'untitled_%d'.freeze
def fetch_rows(sql, &block)
- @db.synchronize do
- s = @db.execute sql
+ execute(sql) do |s|
begin
untitled_count = 0
@columns = s.columns(true).map do |c|
@@ -108,6 +108,8 @@ def fetch_rows(sql, &block)
self
end
+ private
+
def hash_row(row)
hash = {}
row.each_with_index do |v, idx|
View
17 lib/sequel_core/adapters/openbase.rb
@@ -5,7 +5,8 @@ module OpenBase
class Database < Sequel::Database
set_adapter_scheme :openbase
- def connect
+ def connect(server)
+ opts = server_opts(server)
OpenBase.new(
opts[:database],
opts[:host] || 'localhost',
@@ -23,11 +24,14 @@ def dataset(opts = nil)
OpenBase::Dataset.new(self, opts)
end
- def execute(sql)
+ def execute(sql, opts={})
log_info(sql)
- @pool.hold {|conn| conn.execute(sql)}
+ synchronize(opts[:server]) do |conn|
+ r = conn.execute(sql)
+ yield(r) if block_given?
+ r
+ end
end
-
alias_method :do, :execute
end
@@ -43,9 +47,8 @@ def literal(v)
end
end
- def fetch_rows(sql, &block)
- @db.synchronize do
- result = @db.execute sql
+ def fetch_rows(sql)
+ execute(sql) do |result|
begin
@columns = result.column_infos.map {|c| c.name.to_sym}
result.each do |r|
View
33 lib/sequel_core/adapters/oracle.rb
@@ -5,14 +5,15 @@ module Oracle
class Database < Sequel::Database
set_adapter_scheme :oracle
- def connect
- if @opts[:database]
- dbname = @opts[:host] ? \
- "//#{@opts[:host]}#{":#{@opts[:port]}" if @opts[:port]}/#{@opts[:database]}" : @opts[:database]
+ def connect(server)
+ opts = server_opts(server)
+ if opts[:database]
+ dbname = opts[:host] ? \
+ "//#{opts[:host]}#{":#{opts[:port]}" if opts[:port]}/#{opts[:database]}" : opts[:database]
else
- dbname = @opts[:host]
+ dbname = opts[:host]
end
- conn = OCI8.new(@opts[:user], @opts[:password], dbname, @opts[:privilege])
+ conn = OCI8.new(opts[:user], opts[:password], dbname, opts[:privilege])
conn.autocommit = true
conn.non_blocking = true
conn
@@ -26,11 +27,14 @@ def dataset(opts = nil)
Oracle::Dataset.new(self, opts)
end
- def execute(sql)
+ def execute(sql, opts={})
log_info(sql)
- @pool.hold {|conn| conn.exec(sql)}
+ synchronize(opts[:server]) do |conn|
+ r = conn.exec(sql)
+ yield(r) if block_given?
+ r
+ end
end
-
alias_method :do, :execute
def tables
@@ -43,11 +47,9 @@ def table_exists?(name)
from(:tab).filter(:tname => name.to_s.upcase, :tabtype => 'TABLE').count > 0
end
- def transaction
- @pool.hold do |conn|
- if @transactions.include? Thread.current
- return yield(conn)
- end
+ def transaction(server=nil)
+ synchronize(server) do |conn|
+ return yield(conn) if @transactions.include?(Thread.current)
conn.autocommit = false
begin
@@ -76,8 +78,7 @@ def literal(v)
end
def fetch_rows(sql, &block)
- @db.synchronize do
- cursor = @db.execute sql
+ execute(sql) do |cursor|
begin
@columns = cursor.get_col_names.map {|c| c.downcase.to_sym}
while r = cursor.fetch
View
105 lib/sequel_core/adapters/postgres.rb
@@ -116,14 +116,14 @@ class Adapter < ::PGconn
# Execute the given SQL with this connection. If a block is given,
# yield the results, otherwise, return the number of changed rows.
- def execute(sql, *args)
+ def execute(sql, args=nil)
q = nil
begin
- q = exec(sql, *args)
+ q = args ? exec(sql, args) : exec(sql)
rescue PGError => e
raise if status == Adapter::CONNECTION_OK
reset
- q = exec(sql, *args)
+ q = args ? exec(sql, args) : exec(sql)
end
begin
block_given? ? yield(q) : q.cmd_tuples
@@ -165,16 +165,17 @@ class Database < Sequel::Database
# Connects to the database. In addition to the standard database
# options, using the :encoding or :charset option changes the
# client encoding for the connection.
- def connect
+ def connect(server)
+ opts = server_opts(server)
conn = Adapter.connect(
- @opts[:host] || 'localhost',
- @opts[:port] || 5432,
+ opts[:host] || 'localhost',
+ opts[:port] || 5432,
'', '',
- @opts[:database],
- @opts[:user],
- @opts[:password]
+ opts[:database],
+ opts[:user],
+ opts[:password]
)
- if encoding = @opts[:encoding] || @opts[:charset]
+ if encoding = opts[:encoding] || opts[:charset]
conn.set_client_encoding(encoding)
end
conn
@@ -191,16 +192,40 @@ def disconnect
end
# Execute the given SQL with the given args on an available connection.
- def execute(sql, *args, &block)
+ def execute(sql, opts={}, &block)
+ return execute_prepared_statement(sql, opts, &block) if Symbol === sql
begin
- log_info(sql, *args)
- synchronize{|conn| conn.execute(sql, *args, &block)}
+ log_info(sql, opts[:arguments])
+ synchronize(opts[:server]){|conn| conn.execute(sql, opts[:arguments], &block)}
rescue => e
log_info(e.message)
raise convert_pgerror(e)
end
end
+ # Insert the values into the table and return the primary key (if
+ # automatically generated).
+ def execute_insert(sql, opts={})
+ return execute(sql, opts) if Symbol === sql
+ begin
+ log_info(sql, opts[:arguments])
+ synchronize(opts[:server]) do |conn|
+ conn.execute(sql, opts[:arguments])
+ insert_result(conn, opts[:table], opts[:values])
+ end
+ rescue => e
+ log_info(e.message)
+ raise convert_pgerror(e)
+ end
+ end
+
+ private
+
+ # PostgreSQL doesn't need the connection pool to convert exceptions.
+ def connection_pool_default_options
+ super.merge(:pool_convert_exceptions=>false)
+ end
+
# Execute the prepared statement with the given name on an available
# connection, using the given args. If the connection has not prepared
# a statement with the given name yet, prepare it. If the connection
@@ -209,11 +234,12 @@ def execute(sql, *args, &block)
# If a block is given, yield the result, otherwise, return the number
# of rows changed. If the :insert option is passed, return the value
# of the primary key for the last inserted row.
- def execute_prepared_statement(name, args, opts={})
+ def execute_prepared_statement(name, opts={})
ps = prepared_statements[name]
sql = ps.prepared_sql
ps_name = name.to_s
- synchronize do |conn|
+ args = opts[:arguments]
+ synchronize(opts[:server]) do |conn|
unless conn.prepared_statements[ps_name] == sql
if conn.prepared_statements.include?(ps_name)
s = "DEALLOCATE #{ps_name}"
@@ -226,8 +252,8 @@ def execute_prepared_statement(name, args, opts={})
end
log_info("EXECUTE #{ps_name}", args)
q = conn.exec_prepared(ps_name, args)
- if opts[:insert]
- insert_result(conn, *opts[:insert])
+ if opts[:table] && opts[:values]
+ insert_result(conn, opts[:table], opts[:values])
else
begin
block_given? ? yield(q) : q.cmd_tuples
@@ -237,13 +263,6 @@ def execute_prepared_statement(name, args, opts={})
end
end
end
-
- private
-
- # PostgreSQL doesn't need the connection pool to convert exceptions.
- def connection_pool_default_options
- super.merge(:pool_convert_exceptions=>false)
- end
end
# Dataset class for PostgreSQL datasets that use the pg, postgres, or
@@ -323,37 +342,42 @@ module BindArgumentMethods
private
# Execute the given SQL with the stored bind arguments.
- def execute(sql, &block)
- @db.execute(sql, bind_arguments, &block)
+ def execute(sql, opts={}, &block)
+ super(sql, {:arguments=>bind_arguments}.merge(opts), &block)
end
- alias execute_dui execute
- # Execute the given SQL with the stored bind arguments, returning
- # the primary key value for the inserted row.
- def execute_insert(sql, table, values)
- @db.execute_insert(sql, table, values, bind_arguments)
+ # Same as execute, explicit due to intricacies of alias and super.
+ def execute_dui(sql, opts={}, &block)
+ super(sql, {:arguments=>bind_arguments}.merge(opts), &block)
+ end
+
+ # Same as execute, explicit due to intricacies of alias and super.
+ def execute_insert(sql, opts={}, &block)
+ super(sql, {:arguments=>bind_arguments}.merge(opts), &block)
end
end
# Allow use of server side prepared statements for PostgreSQL using the
# pg driver.
module PreparedStatementMethods
- include ArgumentMapper
+ include BindArgumentMethods
private
# Execute the stored prepared statement name and the stored bind
# arguments instead of the SQL given.
- def execute(sql, &block)
- @db.execute_prepared_statement(prepared_statement_name, bind_arguments, &block)
+ def execute(sql, opts={}, &block)
+ super(prepared_statement_name, opts, &block)
end
- alias execute_dui execute
- # Execute the stored prepared statement name and the stored bind
- # arguments instead of the SQL given, returning the primary key value
- # for the last inserted row.
- def execute_insert(sql, table, values)
- @db.execute_prepared_statement(prepared_statement_name, bind_arguments, :insert=>[table, values])
+ # Same as execute, explicit due to intricacies of alias and super.
+ def execute_dui(sql, opts={}, &block)
+ super(prepared_statement_name, opts, &block)
+ end
+
+ # Same as execute, explicit due to intricacies of alias and super.
+ def execute_insert(sql, opts={}, &block)
+ super(prepared_statement_name, opts, &block)
end
end
@@ -384,4 +408,3 @@ def prepared_arg_placeholder
end
end
end
-
View
39 lib/sequel_core/adapters/shared/postgres.rb
@@ -108,21 +108,6 @@ def drop_table_sql(name)
"DROP TABLE #{name} CASCADE"
end
- # Insert the values into the table and return the primary key (if
- # automatically generated).
- def execute_insert(sql, table, values, *bind_arguments)
- begin
- log_info(sql, *bind_arguments)
- synchronize do |conn|
- conn.execute(sql, *bind_arguments)
- insert_result(conn, table, values)
- end
- rescue => e
- log_info(e.message)
- raise convert_pgerror(e)
- end
- end
-
# PostgreSQL specific index SQL.
def index_definition_sql(table_name, index)
index_name = index[:name] || default_index_name(table_name, index[:columns])
@@ -188,9 +173,9 @@ def serial_primary_key_options
end
# The version of the PostgreSQL server, used for determining capability.
- def server_version
+ def server_version(server=nil)
return @server_version if @server_version
- @server_version = pool.hold do |conn|
+ @server_version = synchronize(server) do |conn|
(conn.server_version rescue nil) if conn.respond_to?(:server_version)
end
unless @server_version
@@ -206,8 +191,8 @@ def tables
end
# PostgreSQL supports multi-level transactions using save points.
- def transaction
- synchronize do |conn|
+ def transaction(server=nil)
+ synchronize(server) do |conn|
conn.transaction_depth = 0 if conn.transaction_depth.nil?
if conn.transaction_depth > 0
log_info(SQL_SAVEPOINT % conn.transaction_depth)
@@ -332,8 +317,8 @@ def full_text_search(cols, terms, opts = {})
# Insert given values into the database.
def insert(*values)
- execute_insert(insert_sql(*values), source_list(@opts[:from]),
- values.size == 1 ? values.first : values)
+ execute_insert(insert_sql(*values), :table=>source_list(@opts[:from]),
+ :values=>values.size == 1 ? values.first : values)
end
# Handle microseconds for Time and DateTime values, as well as PostgreSQL
@@ -358,13 +343,13 @@ def literal(v)
end
# Locks the table with the specified mode.
- def lock(mode)
+ def lock(mode, server=nil)
sql = LOCK % [source_list(@opts[:from]), mode]
- @db.synchronize do
+ @db.synchronize(server) do
if block_given? # perform locking inside a transaction and yield to block
- @db.transaction{@db.execute(sql); yield}
+ @db.transaction(server){@db.execute(sql, :server=>server); yield}
else
- @db.execute(sql) # lock without a transaction
+ @db.execute(sql, :server=>server) # lock without a transaction
self
end
end
@@ -402,8 +387,8 @@ def select_sql(opts = nil)
private
# Call execute_insert on the database object with the given values.
- def execute_insert(sql, table, values)
- @db.execute_insert(sql, table, values)
+ def execute_insert(sql, opts={})
+ @db.execute_insert(sql, {:server=>@opts[:server] || :default}.merge(opts))
end
end
end
View
17 lib/sequel_core/adapters/shared/sqlite.rb
@@ -134,22 +134,23 @@ def complex_expression_sql(op, args)
# SQLite performs a TRUNCATE style DELETE if no filter is specified.
# Since we want to always return the count of records, do a specific
# count in the case of no filter.
- def delete(opts = nil)
+ def delete(opts = {})
# check if no filter is specified
- unless (opts && opts[:where]) || @opts[:where]
- @db.transaction do
+ opts = @opts.merge(opts)
+ unless opts[:where]
+ @db.transaction(opts[:server]) do
unfiltered_count = count
- execute_dui delete_sql(opts)
+ execute_dui(delete_sql(opts))
unfiltered_count
end
else
- execute_dui delete_sql(opts)
+ execute_dui(delete_sql(opts))
end
end
# Insert the values into the database.
def insert(*values)
- execute_insert insert_sql(*values)
+ execute_insert(insert_sql(*values))
end
# Allow inserting of values directly from a dataset.
@@ -169,8 +170,8 @@ def quoted_identifier(c)
private
# Call execute_insert on the database with the given SQL.
- def execute_insert(sql)
- @db.execute_insert(sql)
+ def execute_insert(sql, opts={})
+ @db.execute_insert(sql, {:server=>@opts[:server] || :default}.merge(opts))
end
end
end
View
63 lib/sequel_core/adapters/sqlite.rb
@@ -23,11 +23,12 @@ def self.uri_to_options(uri) # :nodoc:
# Connect to the database. Since SQLite is a file based database,
# the only options available are :database (to specify the database
# name), and :timeout, to specify how long to wait for the database to
- # be available if it is locked (default is 5 seconds).
- def connect
- @opts[:database] = ':memory:' if @opts[:database].blank?
- db = ::SQLite3::Database.new(@opts[:database])
- db.busy_timeout(@opts.fetch(:timeout, 5000))
+ # be available if it is locked, given in milliseconds (default is 5000).
+ def connect(server)
+ opts = server_opts(server)
+ opts[:database] = ':memory:' if opts[:database].blank?
+ db = ::SQLite3::Database.new(opts[:database])
+ db.busy_timeout(opts.fetch(:timeout, 5000))
db.type_translation = true
# fix for timestamp translation
db.translator.add_translator("timestamp") do |t, v|
@@ -47,30 +48,30 @@ def disconnect
end
# Run the given SQL with the given arguments and return the number of changed rows.
- def execute(sql, *bind_arguments)
- _execute(sql, *bind_arguments){|conn| conn.execute_batch(sql, *bind_arguments); conn.changes}
+ def execute_dui(sql, opts={})
+ _execute(sql, opts){|conn| conn.execute_batch(sql, opts[:arguments]); conn.changes}
end
# Run the given SQL with the given arguments and return the last inserted row id.
- def execute_insert(sql, *bind_arguments)
- _execute(sql, *bind_arguments){|conn| conn.execute(sql, *bind_arguments); conn.last_insert_row_id}
+ def execute_insert(sql, opts={})
+ _execute(sql, opts){|conn| conn.execute(sql, opts[:arguments]); conn.last_insert_row_id}
end
# Run the given SQL with the given arguments and yield each row.
- def execute_select(sql, *bind_arguments)
- _execute(sql, *bind_arguments){|conn| conn.query(sql, *bind_arguments){|r| yield r}}
+ def execute(sql, opts={}, &block)
+ _execute(sql, opts){|conn| conn.query(sql, opts[:arguments], &block)}
end
# Run the given SQL with the given arguments and return the first value of the first row.
- def single_value(sql, *bind_arguments)
- _execute(sql, *bind_arguments){|conn| conn.get_first_value(sql, *bind_arguments)}
+ def single_value(sql, opts={})
+ _execute(sql, opts){|conn| conn.get_first_value(sql, opts[:arguments])}
end
# Use the native driver transaction method if there isn't already a transaction
# in progress on the connection, always yielding a connection inside a transaction
# transaction.
- def transaction(&block)
- synchronize do |conn|
+ def transaction(server=nil, &block)
+ synchronize(server) do |conn|
return yield(conn) if conn.transaction_active?
begin
result = nil
@@ -86,10 +87,10 @@ def transaction(&block)
# Log the SQL and the arguments, and yield an available connection. Rescue
# any SQLite3::Exceptions and turn the into Error::InvalidStatements.
- def _execute(sql, *bind_arguments)
+ def _execute(sql, opts)
begin
- log_info(sql, *bind_arguments)
- synchronize{|conn| yield conn}
+ log_info(sql, opts[:arguments])
+ synchronize(opts[:server]){|conn| yield conn}
rescue SQLite3::Exception => e
raise Error::InvalidStatement, "#{sql}\r\n#{e.message}"
end
@@ -154,22 +155,19 @@ module PreparedStatementMethods
# Run execute_select on the database with the given SQL and the stored
# bind arguments.
- def execute_select(sql, &block)
- @db.execute_select(sql, bind_arguments, &block)
+ def execute(sql, opts={}, &block)
+ super(sql, {:arguments=>bind_arguments}.merge(opts), &block)
end
- # Run execute_insert on the database with the given SQL and the
- # stored bind arguments.
- def execute_insert(sql)
- @db.execute_insert(sql, bind_arguments)
+ # Same as execute, explicit due to intricacies of alias and super.
+ def execute_dui(sql, opts={}, &block)
+ super(sql, {:arguments=>bind_arguments}.merge(opts), &block)
end
- # Run execute on the database with the given SQL and the stored bind
- # arguments.
- def execute(sql)
- @db.execute(sql, bind_arguments)
+ # Same as execute, explicit due to intricacies of alias and super.
+ def execute_insert(sql, opts={}, &block)
+ super(sql, {:arguments=>bind_arguments}.merge(opts), &block)
end
- alias execute_dui execute
end
# Prepare an unnamed statement of the given type and call it with the
@@ -188,7 +186,7 @@ def explain
# Yield a hash for each row in the dataset.
def fetch_rows(sql)
- execute_select(sql) do |result|
+ execute(sql) do |result|
@columns = result.columns.map {|c| c.to_sym}
column_count = @columns.size
result.each do |values|
@@ -228,11 +226,6 @@ def prepare(type, name, values=nil)
private
- # Run execute_select on the database with the given SQL.
- def execute_select(sql, &block)
- @db.execute_select(sql, &block)
- end
-
# SQLite uses a : before the name of the argument as a placeholder.
def prepared_arg_placeholder
PREPARED_ARG_PLACEHOLDER
View
148 lib/sequel_core/connection_pool.rb
@@ -2,21 +2,8 @@
# multiple connections and giving threads exclusive access to each
# connection.
class Sequel::ConnectionPool
- # A hash of connections currently being used, key is the Thread,
- # value is the connection.
- attr_reader :allocated
-
- # An array of connections opened but not currently used
- attr_reader :available_connections
-
# The proc used to create a new database connection.
attr_accessor :connection_proc
-
- # The total number of connections opened, should
- # be equal to available_connections.length +
- # allocated.length
- attr_reader :created_count
- alias_method :size, :created_count
# The maximum number of connections.
attr_reader :max_size
@@ -25,7 +12,6 @@ class Sequel::ConnectionPool
# this if you want to manipulate the variables safely.
attr_reader :mutex
-
# Constructs a new pool with a maximum size. If a block is supplied, it
# is used to create new connections as they are needed.
#
@@ -47,21 +33,50 @@ class Sequel::ConnectionPool
# a connection again (default 0.001)
# * :pool_timeout - The amount of seconds to wait to acquire a connection
# before raising a PoolTimeoutError (default 5)
+ # * :servers - A hash of servers to use. Keys should be symbols. If not
+ # present, will use a single :default server. The server name symbol will
+ # be passed to the connection_proc.
def initialize(opts = {}, &block)
@max_size = opts[:max_connections] || 4
@mutex = Mutex.new
@connection_proc = block
-
- @available_connections = []
- @allocated = {}
- @created_count = 0
+ @servers = [:default]
+ @servers += opts[:servers].keys - @servers if opts[:servers]
+ @available_connections = Hash.new{|h,k| h[:default]}
+ @allocated = Hash.new{|h,k| h[:default]}
+ @created_count = Hash.new{|h,k| h[:default]}
+ @servers.each do |s|
+ @available_connections[s] = []
+ @allocated[s] = {}
+ @created_count[s] = 0
+ end
@timeout = opts[:pool_timeout] || 5
@sleep_time = opts[:pool_sleep_time] || 0.001
@convert_exceptions = opts.include?(:pool_convert_exceptions) ? opts[:pool_convert_exceptions] : true
end
- # Chooses the first available connection, or if none are available,
- # creates a new connection. Passes the connection to the supplied block:
+ # A hash of connections currently being used for the given server, key is the
+ # Thread, value is the connection.
+ def allocated(server=:default)
+ @allocated[server]
+ end
+
+ # An array of connections opened but not currently used, for the given
+ # server.
+ def available_connections(server=:default)
+ @available_connections[server]
+ end
+
+ # The total number of connections opened for the given server, should
+ # be equal to available_connections.length + allocated.length
+ def created_count(server=:default)
+ @created_count[server]
+ end
+ alias size created_count
+
+ # Chooses the first available connection to the given server, or if none are
+ # available, creates a new connection. Passes the connection to the supplied
+ # block:
#
# pool.hold {|conn| conn.execute('DROP TABLE posts')}
#
@@ -73,78 +88,83 @@ def initialize(opts = {}, &block)
# is available or the timeout expires. If the timeout expires before a
# connection can be acquired, a Sequel::Error::PoolTimeoutError is
# raised.
- def hold
+ def hold(server=:default)
begin
t = Thread.current
time = Time.new
timeout = time + @timeout
sleep_time = @sleep_time
- if conn = owned_connection(t)
+ if conn = owned_connection(t, server)
return yield(conn)
end
- until conn = acquire(t)
+ until conn = acquire(t, server)
raise(::Sequel::Error::PoolTimeoutError) if Time.new > timeout
sleep sleep_time
end
begin
yield conn
ensure
- release(t, conn)
+ release(t, conn, server)
end
rescue Exception => e
raise(@convert_exceptions && !e.is_a?(StandardError) ? RuntimeError.new(e.message) : e)
end
end
- # Removes all connection currently available, optionally yielding each
- # connection to the given block. This method has the effect of
- # disconnecting from the database. Once a connection is requested using
- # #hold, the connection pool creates new connections to the database.
- def disconnect(&block)
+ # Removes all connection currently available on all servers, optionally
+ # yielding each connection to the given block. This method has the effect of
+ # disconnecting from the database, assuming that no connections are currently
+ # being used. Once a connection is requested using #hold, the connection pool
+ # creates new connections to the database.
+ def disconnect
@mutex.synchronize do
- @available_connections.each {|c| block[c]} if block
- @available_connections = []
- @created_count = @allocated.size
+ @available_connections.each do |server, conns|
+ conns.each{|c| yield(c)} if block_given?
+ conns.clear
+ @created_count[server] = allocated(server).length
+ end
end
end
private
- # Returns the connection owned by the supplied thread, if any.
- def owned_connection(thread)
- @mutex.synchronize{@allocated[thread]}
+ # Returns the connection owned by the supplied thread for the given server,
+ # if any.
+ def owned_connection(thread, server)
+ @mutex.synchronize{@allocated[server][thread]}
end
- # Assigns a connection to the supplied thread, if one is available.
- def acquire(thread)
+ # Assigns a connection to the supplied thread for the given server, if one
+ # is available.
+ def acquire(thread, server)
@mutex.synchronize do
- if conn = available
- @allocated[thread] = conn
+ if conn = available(server)
+ allocated(server)[thread] = conn
end
end
end
- # Returns an available connection. If no connection is available,
- # tries to create a new connection.
- def available
- @available_connections.pop || make_new
+ # Returns an available connection to the given server. If no connection is
+ # available, tries to create a new connection.
+ def available(server)
+ available_connections(server).pop || make_new(server)
end
- # Creates a new connection if the size of the pool is less than the
- # maximum size.
- def make_new
- if @created_count < @max_size
- @created_count += 1
- @connection_proc ? @connection_proc.call : \
+ # Creates a new connection to the given server if the size of the pool for
+ # the server is less than the maximum size of the pool.
+ def make_new(server)
+ if @created_count[server] < @max_size
+ @created_count[server] += 1
+ @connection_proc ? @connection_proc.call(server) : \
(raise Error, "No connection proc specified")
end
end
- # Releases the connection assigned to the supplied thread.
- def release(thread, conn)
+ # Releases the connection assigned to the supplied thread and server.
+ def release(thread, conn, server)
@mutex.synchronize do
- @allocated.delete(thread)
- @available_connections << conn
+ allocated(server).delete(thread)
+ available_connections(server) << conn
end
end
end
@@ -156,9 +176,6 @@ def release(thread, conn)
# Note that using a single threaded pool with some adapters can cause
# errors in certain cases, see Sequel.single_threaded=.
class Sequel::SingleThreadedPool
- # The single database connection for the pool
- attr_reader :conn
-
# The proc used to create a new database connection
attr_writer :connection_proc
@@ -170,15 +187,20 @@ class Sequel::SingleThreadedPool
# to RuntimeError exceptions (default true)
def initialize(opts={}, &block)
@connection_proc = block
+ @conns = {}
@convert_exceptions = opts.include?(:pool_convert_exceptions) ? opts[:pool_convert_exceptions] : true
end
- # Yields the connection to the supplied block. This method simulates the
- # ConnectionPool#hold API.
- def hold
+ # The connection for the given server.
+ def conn(server=:default)
+ @conns[server]
+ end
+
+ # Yields the connection to the supplied block for the given server.
+ # This method simulates the ConnectionPool#hold API.
+ def hold(server=:default)
begin
- @conn ||= @connection_proc.call
- yield @conn
+ yield(@conns[server] ||= @connection_proc.call(server))
rescue Exception => e
# if the error is not a StandardError it is converted into RuntimeError.
raise(@convert_exceptions && !e.is_a?(StandardError) ? RuntimeError.new(e.message) : e)
@@ -188,7 +210,7 @@ def hold
# Disconnects from the database. Once a connection is requested using
# #hold, the connection is reestablished.
def disconnect(&block)
- block[@conn] if block && @conn
- @conn = nil
+ @conns.values.each{|conn| yield(conn) if block_given?}
+ @conns = {}
end
end
View
44 lib/sequel_core/database.rb
@@ -57,7 +57,7 @@ def initialize(opts = {}, &block)
@prepared_statements = {}
@transactions = []
@pool = (@single_threaded ? SingleThreadedPool : ConnectionPool).new(connection_pool_default_options.merge(opts), &block)
- @pool.connection_proc = proc{connect} unless block
+ @pool.connection_proc = proc{|server| connect(server)} unless block
@loggers = Array(opts[:logger]) + Array(opts[:loggers])
::Sequel::DATABASES.push(self)
@@ -219,20 +219,20 @@ def disconnect
end
# Executes the given SQL. This method should be overridden in descendants.
- def execute(sql)
+ def execute(sql, opts={})
raise NotImplementedError, "#execute should be overridden by adapters"
end
# Method that should be used when submitting any DDL (Data Definition
# Language) SQL. By default, calls execute_dui.
- def execute_ddl(sql)
- execute_dui(sql)
+ def execute_ddl(sql, opts={}, &block)
+ execute_dui(sql, opts, &block)
end
# Method that should be used when issuing a DELETE, UPDATE, or INSERT
# statement. By default, calls execute.
- def execute_dui(sql)
- execute(sql)
+ def execute_dui(sql, opts={}, &block)
+ execute(sql, opts, &block)
end
# Fetches records for an arbitrary SQL statement. If a block is given,
@@ -331,8 +331,8 @@ def single_threaded?
end
# Acquires a database connection, yielding it to the passed block.
- def synchronize(&block)
- @pool.hold(&block)
+ def synchronize(server=nil, &block)
+ @pool.hold(server || :default, &block)
end
# Returns true if a table with the given name exists.
@@ -351,8 +351,8 @@ def table_exists?(name)
# Attempts to acquire a database connection. Returns true if successful.
# Will probably raise an error if unsuccessful.
- def test_connection
- synchronize{|conn|}
+ def test_connection(server=nil)
+ synchronize(server){|conn|}
true
end
@@ -360,8 +360,8 @@ def test_connection
# supported - calling #transaction within a transaction will reuse the
# current transaction. Should be overridden for databases that support nested
# transactions.
- def transaction
- synchronize do |conn|
+ def transaction(server=nil)
+ synchronize(server) do |conn|
return yield(conn) if @transactions.include?(Thread.current)
log_info(SQL_BEGIN)
conn.execute(SQL_BEGIN)
@@ -470,6 +470,26 @@ def uri
alias_method :url, :uri
private
+
+ # Return the options for the given server by merging the generic
+ # options for all server with the specific options for the given
+ # server specified in the :servers option.
+ def server_opts(server)
+ opts = if @opts[:servers] && server_options = @opts[:servers][server]
+ case server_options
+ when Hash
+ @opts.merge(server_options)
+ when Proc
+ @opts.merge(server_options.call(self))
+ else
+ raise Error, 'Server opts should be a hash or proc'
+ end
+ else
+ @opts.dup
+ end
+ opts.delete(:servers)
+ opts
+ end
# The default options for the connection pool.
def connection_pool_default_options
View
15 lib/sequel_core/dataset.rb
@@ -246,6 +246,13 @@ def polymorphic_key
def quote_identifiers?
@quote_identifiers
end
+
+ # Set the server for this dataset to use. Used to pick a specific database
+ # shard to run a query against, or to override the default SELECT uses
+ # :read_only database and all other queries use the :default database.
+ def server(servr)
+ clone(:server=>servr)
+ end
# Alias for set, but not aliased directly so subclasses
# don't have to override both methods.