Permalink
Browse files

r4644@asus: jeremy | 2006-06-16 14:57:03 -0700

 locking
 r4645@asus:  jeremy | 2006-06-17 12:41:30 -0700
 missing reply fixture
 r4646@asus:  jeremy | 2006-06-19 13:05:23 -0700
 Use a per-thread (rather than global) transaction mutex so you may execute concurrent transactions on separate connections.
 r4647@asus:  jeremy | 2006-06-19 13:07:23 -0700
 PostgreSQL: introduce allow_concurrency option which determines whether to use blocking or asynchronous #execute. Adapters with blocking #execute will deadlock Ruby threads. The default value is ActiveRecord::Base.allow_concurrency.
 r4648@asus:  jeremy | 2006-06-19 13:08:40 -0700
 Pass the default allow_concurrency when instantiating new connections.
 r4649@asus:  jeremy | 2006-06-19 13:11:12 -0700
 Break out concurrent transaction tests and run them for PostgreSQLAdapter only (need to fork or system('some_test_script') for the other adapters)
 r4650@asus:  jeremy | 2006-06-19 13:42:48 -0700
 Row locking. Provide a locking clause with the :lock finder option or true for the default "FOR UPDATE".
 r4661@asus:  jeremy | 2006-06-19 15:36:51 -0700
 excise the junk mutex


git-svn-id: http://svn-commit.rubyonrails.org/rails/trunk@4460 5ecf4fe2-1ee6-0310-87b1-e25e094e27de
  • Loading branch information...
1 parent e5fc5aa commit 15aa6e05528019fbf62e1a5cbd2398a2205af8bb @jeremy jeremy committed Jun 19, 2006
View
@@ -1,5 +1,18 @@
*SVN*
+* Row locking. Provide a locking clause with the :lock finder option or true for the default "FOR UPDATE". [Shugo Maeda]
+ # Obtain an exclusive lock on person 1 so we can safely increment visits.
+ Person.transaction do
+ # select * from people where id=1 for update
+ person = Person.find(1, :lock => true)
+ person.visits += 1
+ person.save!
+ end
+
+* PostgreSQL: introduce allow_concurrency option which determines whether to use blocking or asynchronous #execute. Adapters with blocking #execute will deadlock Ruby threads. The default value is ActiveRecord::Base.allow_concurrency. [Jeremy Kemper]
+
+* Use a per-thread (rather than global) transaction mutex so you may execute concurrent transactions on separate connections. [Jeremy Kemper]
+
* Change AR::Base#to_param to return a String instead of a Fixnum. Closes #5320. [Nicholas Seckar]
* Use explicit delegation instead of method aliasing for AR::Base.to_param -> AR::Base.id. #5299 (skaes@web.de)
@@ -365,6 +365,8 @@ class << self # Class methods
# * <tt>:select</tt>: By default, this is * as in SELECT * FROM, but can be changed if you for example want to do a join, but not
# include the joined columns.
# * <tt>:readonly</tt>: Mark the returned records read-only so they cannot be saved or updated.
+ # * <tt>:lock</tt>: An SQL fragment like "FOR UPDATE" or "LOCK IN SHARE MODE".
+ # :lock => true gives connection's default exclusive lock, usually "FOR UPDATE".
#
# Examples for find by id:
# Person.find(1) # returns the object for ID = 1
@@ -384,6 +386,17 @@ class << self # Class methods
# Person.find(:all, :offset => 10, :limit => 10)
# Person.find(:all, :include => [ :account, :friends ])
# Person.find(:all, :group => "category")
+ #
+ # Example for find with a lock. Imagine two concurrent transactions:
+ # each will read person.visits == 2, add 1 to it, and save, resulting
+ # in two saves of person.visits = 3. By locking the row, the second
+ # transaction has to wait until the first is finished; we get the
+ # expected person.visits == 4.
+ # Person.transaction do
+ # person = Person.find(1, :lock => true)
+ # person.visits += 1
+ # person.save!
+ # end
def find(*args)
options = extract_options_from_args!(args)
validate_find_options(options)
@@ -850,7 +863,7 @@ def with_scope(method_scoping = {}, action = :merge, &block)
method_scoping.assert_valid_keys([ :find, :create ])
if f = method_scoping[:find]
- f.assert_valid_keys([ :conditions, :joins, :select, :include, :from, :offset, :limit, :order, :readonly ])
+ f.assert_valid_keys([ :conditions, :joins, :select, :include, :from, :offset, :limit, :order, :readonly, :lock ])
f[:readonly] = true if !f[:joins].blank? && !f.has_key?(:readonly)
end
@@ -1028,6 +1041,7 @@ def construct_finder_sql(options)
add_order!(sql, options[:order])
add_limit!(sql, options, scope)
+ add_lock!(sql, options, scope)
sql
end
@@ -1061,14 +1075,19 @@ def add_order!(sql, order)
# The optional scope argument is for the current :find scope.
def add_limit!(sql, options, scope = :auto)
scope = scope(:find) if :auto == scope
- if scope
- options[:limit] ||= scope[:limit]
- options[:offset] ||= scope[:offset]
- end
+ options = options.reverse_merge(:limit => scope[:limit], :offset => scope[:offset]) if scope
connection.add_limit_offset!(sql, options)
end
# The optional scope argument is for the current :find scope.
+ # The :lock option has precedence over a scoped :lock.
+ def add_lock!(sql, options, scope = :auto)
+ scope = scope(:find) if :auto == :scope
+ options = options.reverse_merge(:lock => scope[:lock]) if scope
+ connection.add_lock!(sql, options)
+ end
+
+ # The optional scope argument is for the current :find scope.
def add_joins!(sql, options, scope = :auto)
scope = scope(:find) if :auto == scope
join = (scope && scope[:joins]) || options[:joins]
@@ -1361,12 +1380,12 @@ def extract_options_from_args!(args) #:nodoc:
end
VALID_FIND_OPTIONS = [ :conditions, :include, :joins, :limit, :offset,
- :order, :select, :readonly, :group, :from ]
-
+ :order, :select, :readonly, :group, :from, :lock ]
+
def validate_find_options(options) #:nodoc:
options.assert_valid_keys(VALID_FIND_OPTIONS)
end
-
+
def set_readonly_option!(options) #:nodoc:
# Inherit :readonly from finder scope if set. Otherwise,
# if :joins is not blank then :readonly defaults to true.
@@ -2025,4 +2044,4 @@ def clone_attribute_value(reader_method, attribute_name)
value
end
end
-end
+end
@@ -248,7 +248,8 @@ def self.connection=(spec) #:nodoc:
if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
active_connections[name] = spec
elsif spec.kind_of?(ConnectionSpecification)
- self.connection = self.send(spec.adapter_method, spec.config)
+ config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency)
+ self.connection = self.send(spec.adapter_method, config)
elsif spec.nil?
raise ConnectionNotEstablished
else
@@ -91,6 +91,17 @@ def add_limit_offset!(sql, options)
end
end
+ # Appends a locking clause to a SQL statement. *Modifies the +sql+ parameter*.
+ # # SELECT * FROM suppliers FOR UPDATE
+ # add_lock! 'SELECT * FROM suppliers', :lock => true
+ # add_lock! 'SELECT * FROM suppliers', :lock => ' FOR UPDATE'
+ def add_lock!(sql, options)
+ case lock = options[:lock]
+ when true: sql << ' FOR UPDATE'
+ when String: sql << " #{lock}"
+ end
+ end
+
def default_sequence_name(table, column)
nil
end
@@ -46,6 +46,7 @@ module ConnectionAdapters
# * <tt>:schema_search_path</tt> -- An optional schema search path for the connection given as a string of comma-separated schema names. This is backward-compatible with the :schema_order option.
# * <tt>:encoding</tt> -- An optional client encoding that is using in a SET client_encoding TO <encoding> call on connection.
# * <tt>:min_messages</tt> -- An optional client min messages that is using in a SET client_min_messages TO <min_messages> call on connection.
+ # * <tt>:allow_concurrency</tt> -- If true, use async query methods so Ruby threads don't deadlock; otherwise, use blocking query methods.
class PostgreSQLAdapter < AbstractAdapter
def adapter_name
'PostgreSQL'
@@ -54,6 +55,7 @@ def adapter_name
def initialize(connection, logger, config = {})
super(connection, logger)
@config = config
+ @async = config[:allow_concurrency]
configure_connection
end
@@ -67,7 +69,7 @@ def active?
end
# postgres-pr raises a NoMethodError when querying if no conn is available
rescue PGError, NoMethodError
- false
+ false
end
# Close then reopen the connection.
@@ -78,7 +80,7 @@ def reconnect!
configure_connection
end
end
-
+
def disconnect!
# Both postgres and postgres-pr respond to :close
@connection.close rescue nil
@@ -99,11 +101,11 @@ def native_database_types
:boolean => { :name => "boolean" }
}
end
-
+
def supports_migrations?
true
- end
-
+ end
+
def table_alias_length
63
end
@@ -141,11 +143,23 @@ def insert(sql, name = nil, pk = nil, id_value = nil, sequence_name = nil) #:nod
end
def query(sql, name = nil) #:nodoc:
- log(sql, name) { @connection.query(sql) }
+ log(sql, name) do
+ if @async
+ @connection.async_query(sql)
+ else
+ @connection.query(sql)
+ end
+ end
end
def execute(sql, name = nil) #:nodoc:
- log(sql, name) { @connection.exec(sql) }
+ log(sql, name) do
+ if @async
+ @connection.async_exec(sql)
+ else
+ @connection.exec(sql)
+ end
+ end
end
def update(sql, name = nil) #:nodoc:
@@ -162,7 +176,7 @@ def begin_db_transaction #:nodoc:
def commit_db_transaction #:nodoc:
execute "COMMIT"
end
-
+
def rollback_db_transaction #:nodoc:
execute "ROLLBACK"
end
@@ -261,7 +275,7 @@ def reset_pk_sequence!(table, pk = nil, sequence = nil)
def pk_and_sequence_for(table)
# First try looking for a sequence with a dependency on the
# given table's primary key.
- result = execute(<<-end_sql, 'PK and serial sequence')[0]
+ result = query(<<-end_sql, 'PK and serial sequence')[0]
SELECT attr.attname, name.nspname, seq.relname
FROM pg_class seq,
pg_attribute attr,
@@ -284,7 +298,7 @@ def pk_and_sequence_for(table)
# Support the 7.x and 8.0 nextval('foo'::text) as well as
# the 8.1+ nextval('foo'::regclass).
# TODO: assumes sequence is in same schema as table.
- result = execute(<<-end_sql, 'PK and custom sequence')[0]
+ result = query(<<-end_sql, 'PK and custom sequence')[0]
SELECT attr.attname, name.nspname, split_part(def.adsrc, '\\\'', 2)
FROM pg_class t
JOIN pg_namespace name ON (t.relnamespace = name.oid)
@@ -305,7 +319,7 @@ def pk_and_sequence_for(table)
def rename_table(name, new_name)
execute "ALTER TABLE #{name} RENAME TO #{new_name}"
end
-
+
def add_column(table_name, column_name, type, options = {})
execute("ALTER TABLE #{table_name} ADD #{column_name} #{type_to_sql(type, options[:limit])}")
execute("ALTER TABLE #{table_name} ALTER #{column_name} SET NOT NULL") if options[:null] == false
@@ -325,12 +339,12 @@ def change_column(table_name, column_name, type, options = {}) #:nodoc:
commit_db_transaction
end
change_column_default(table_name, column_name, options[:default]) unless options[:default].nil?
- end
+ end
def change_column_default(table_name, column_name, default) #:nodoc:
execute "ALTER TABLE #{table_name} ALTER COLUMN #{column_name} SET DEFAULT '#{default}'"
end
-
+
def rename_column(table_name, column_name, new_column_name) #:nodoc:
execute "ALTER TABLE #{table_name} RENAME COLUMN #{column_name} TO #{new_column_name}"
end
@@ -379,7 +393,7 @@ def select(sql, name = nil)
hashed_row = {}
row.each_index do |cel_index|
column = row[cel_index]
-
+
case res.type(cel_index)
when BYTEA_COLUMN_TYPE_OID
column = unescape_bytea(column)
@@ -392,6 +406,7 @@ def select(sql, name = nil)
rows << hashed_row
end
end
+ res.clear
return rows
end
@@ -442,7 +457,7 @@ def unescape_bytea(s)
end
unescape_bytea(s)
end
-
+
# Query a table's column names, default values, and types.
#
# The underlying query is roughly:
@@ -482,7 +497,7 @@ def translate_field_type(field_type)
when /^real|^money/i then 'float'
when /^interval/i then 'string'
# geometric types (the line type is currently not implemented in postgresql)
- when /^(?:point|lseg|box|"?path"?|polygon|circle)/i then 'string'
+ when /^(?:point|lseg|box|"?path"?|polygon|circle)/i then 'string'
when /^bytea/i then 'binary'
else field_type # Pass through standard types.
end
@@ -492,16 +507,16 @@ def default_value(value)
# Boolean types
return "t" if value =~ /true/i
return "f" if value =~ /false/i
-
+
# Char/String/Bytea type values
return $1 if value =~ /^'(.*)'::(bpchar|text|character varying|bytea)$/
-
+
# Numeric values
return value if value =~ /^-?[0-9]+(\.[0-9]*)?/
# Fixed dates / times
return $1 if value =~ /^'(.+)'::(date|timestamp)/
-
+
# Anything else is blank, some user type, or some function
# and we can't know the value of that, so return nil.
return nil
@@ -184,6 +184,12 @@ def rollback_db_transaction #:nodoc:
end
+ # SELECT ... FOR UPDATE is redundant since the table is locked.
+ def add_lock!(sql, options) #:nodoc:
+ sql
+ end
+
+
# SCHEMA STATEMENTS ========================================
def tables(name = nil) #:nodoc:
@@ -519,7 +519,7 @@ def setup_with_fixtures
load_fixtures
@@already_loaded_fixtures[self.class] = @loaded_fixtures
end
- ActiveRecord::Base.lock_mutex
+ ActiveRecord::Base.send :increment_open_transactions
ActiveRecord::Base.connection.begin_db_transaction
# Load fixtures for every test.
@@ -538,7 +538,7 @@ def teardown_with_fixtures
# Rollback changes.
if use_transactional_fixtures?
ActiveRecord::Base.connection.rollback_db_transaction
- ActiveRecord::Base.unlock_mutex
+ ActiveRecord::Base.send :decrement_open_transactions
end
ActiveRecord::Base.verify_active_connections!
end
Oops, something went wrong.

0 comments on commit 15aa6e0

Please sign in to comment.