Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

partitioning updates working #7584

Closed
wants to merge 1 commit into
from
Jump to file or symbol
Failed to load files and symbols.
+232 −20
Split
@@ -225,10 +225,10 @@ def attribute_method?(attr_name)
# type casted for use in an Arel insert/update method.
def arel_attributes_with_values(attribute_names)
attrs = {}
- arel_table = self.class.arel_table
+ actual_arel_table = arel_table
attribute_names.each do |name|
- attrs[arel_table[name]] = typecasted_attribute_value(name)
+ attrs[actual_arel_table[name]] = typecasted_attribute_value(name)
end
attrs
end
@@ -73,6 +73,14 @@ def initialize(base)
@base = base
end
+ #
+ # Builds a SQL check constraint
+ #
+ # @param [String] constraint a SQL constraint
+ def check_constraint(constraint)
+ @columns << Struct.new(:to_sql).new("CHECK (#{constraint})")
@tenderlove

tenderlove Jan 15, 2013

Owner

We should move this to ARel (I think). I see why we need this, but I don't like the implementation. ;-)

@simonoff

simonoff Jan 15, 2013

But it's a DB-aware setting. MySQL has different syntax for partitioning. I think it can be a stub in base adapter with different implementations for each DB.

@tenderlove

tenderlove Jan 15, 2013

Owner

I'm specifically complaining about the Struct thing. We should add a node to ARel that handles CHECK

+ end
+
def xml(*args)
raise NotImplementedError unless %w{
sqlite mysql mysql2
@@ -222,9 +222,7 @@ def client_min_messages=(level)
# Returns the sequence name for a table's primary key or some other specified key.
def default_sequence_name(table_name, pk = nil) #:nodoc:
- result = serial_sequence(table_name, pk || 'id')
- return nil unless result
- result.split('.').last
+ serial_sequence(table_name, pk || 'id')
rescue ActiveRecord::StatementInvalid
"#{table_name}_#{pk || 'id'}_seq"
end
@@ -306,6 +304,81 @@ def pk_and_sequence_for(table) #:nodoc:
nil
end
+ #
+ # Get the next value in a sequence. Used on INSERT operation for
+ # partitioning like by_id because the ID is required before the insert
+ # so that the specific child table is known ahead of time.
+ #
+ # @param [String] sequence_name the name of the sequence to fetch the next value from
+ # @return [Integer] the value from the sequence
+ def next_sequence_value(sequence_name)
+ return execute("select nextval('#{sequence_name}')").field_values("nextval").first.to_i
+ end
+
+ #
+ # Get some next values from a sequence.
+ #
+ # @param [String] sequence_name the name of the sequence to fetch the next values from
+ # @param [Integer] batch_size count of values.
+ # @return [Array<Integer>] an array of values from the sequence
+ def next_sequence_values(sequence_name, batch_size)
+ result = execute("select nextval('#{sequence_name}') from generate_series(1, #{batch_size})")
+ return result.field_values("nextval").map(&:to_i)
+ end
+
+ #
+ # Causes active resource to fetch the primary key for the table (using next_sequence_value())
+ # just before an insert. We need the prefetch to happen but we don't have enough information
+ # here to determine if it should happen, so Relation::insert has been modified to request of
+ # the ActiveRecord::Base derived class if it requires a prefetch.
+ #
+ # @param [String] table_name the table name to query
+ # @return [Boolean] returns true if the table should have its primary key prefetched.
+ def prefetch_primary_key?(table_name)
+ return false
+ end
+
+ #
+ # Creates a schema given a name.
+ #
+ # @param [String] name the name of the schema.
+ # @param [Hash] options ({}) options for creating a schema
+ # @option options [Boolean] :unless_exists (false) check if schema exists.
+ # @return [optional] undefined
+ def create_schema(name, options = {})
+ if options[:unless_exists]
+ return if execute("select count(*) from pg_namespace where nspname = '#{name}'").getvalue(0,0).to_i > 0
+ end
+ execute("CREATE SCHEMA #{name}")
+ end
+
+ #
+ # Drop a schema given a name.
+ #
+ # @param [String] name the name of the schema.
+ # @param [Hash] options ({}) options for dropping a schema
+ # @option options [Boolean] :if_exists (false) check if schema exists.
+ # @option options [Boolean] :cascade (false) drop dependant objects
+ # @return [optional] undefined
+ def drop_schema(name, options = {})
+ if options[:if_exists]
+ return if execute("select count(*) from pg_namespace where nspname = '#{name}'").getvalue(0,0).to_i == 0
+ end
+ execute("DROP SCHEMA #{name}#{' cascade' if options[:cascade]}")
+ end
+
+ #
+ # Add foreign key constraint to table.
+ #
+ # @param [String] referencing_table_name the name of the table containing the foreign key
+ # @param [String] referencing_field_name the name of foreign key column
+ # @param [String] referenced_table_name the name of the table referenced by the foreign key
+ # @param [String] referenced_field_name (:id) the name of the column referenced by the foreign key
+ # @return [optional] undefined
+ def add_foreign_key(referencing_table_name, referencing_field_name, referenced_table_name, referenced_field_name = :id)
+ execute("ALTER TABLE #{referencing_table_name} add foreign key (#{referencing_field_name}) references #{referenced_table_name}(#{referenced_field_name})")
+ end
+
# Returns just a table's primary key
def primary_key(table)
row = exec_query(<<-end_sql, 'SCHEMA').rows.first
@@ -133,8 +133,25 @@ def ===(object)
# class Post < ActiveRecord::Base
# scope :published_and_commented, published.and(self.arel_table[:comments_count].gt(0))
# end
- def arel_table
- @arel_table ||= Arel::Table.new(table_name, arel_engine)
+ def arel_table(arel_attribute_values = {})
+ @arel_tables ||= {}
+
+ if arel_attribute_values.blank?
@tenderlove

tenderlove Jan 15, 2013

Owner

Change this to empty?

+ key_values = nil
+ else
+ key_values = self.table_partition_key_values(arel_attribute_values)
+ end
+ new_arel_table = @arel_tables[key_values]
+ if new_arel_table.blank?
+ new_arel_table = Arel::Table.new(table_name(*key_values), arel_engine)
+ @arel_tables[key_values] = new_arel_table
+ end
+ return new_arel_table
+ end
+
+ def reset_arel_table
+ @arel_tables ||= {}
+ @arel_tables[nil] = nil
end
# Returns the Arel engine.
@@ -364,6 +381,12 @@ def slice(*methods)
Hash[methods.map { |method| [method, public_send(method)] }].with_indifferent_access
end
+ def arel_table
+ symbolized_attributes = attributes.symbolize_keys
+ table_partition_key_values = Hash[*self.class.table_partition_keys.map{|name| [name,symbolized_attributes[name]]}.flatten]
+ return self.class.arel_table(table_partition_key_values)
+ end
+
private
# Under Ruby 1.9, Array#flatten will call #to_ary (recursively) on each of the elements
@@ -28,8 +28,9 @@ def reset_counters(id, *counters)
reflection = belongs_to.find { |e| e.foreign_key.to_s == foreign_key && e.options[:counter_cache].present? }
counter_name = reflection.counter_cache_column
- stmt = unscoped.where(arel_table[primary_key].eq(object.id)).arel.compile_update({
- arel_table[counter_name] => object.send(association).count
+ actual_arel_table = arel_table
+ stmt = unscoped.where(actual_arel_table[primary_key].eq(object.id)).arel.compile_update({
+ actual_arel_table[counter_name] => object.send(association).count
})
connection.update stmt
end
@@ -123,7 +123,7 @@ def relation_for_destroy
column = self.class.columns_hash[column_name]
substitute = connection.substitute_at(column, relation.bind_values.length)
- relation = relation.where(self.class.arel_table[column_name].eq(substitute))
+ relation = relation.where(arel_table[column_name].eq(substitute))
relation.bind_values << [column, self[column_name].to_i]
end
@@ -50,6 +50,11 @@ module ModelSchema
# If true, the default table name for a Product class will be +products+. If false, it would just be +product+.
# See table_name for the full rules on table/class naming. This is true, by default.
config_attribute :pluralize_table_names
+
+ def table_name
+ symbolized_attributes = attributes.symbolize_keys
+ return self.class.table_name(*self.class.table_partition_keys.map{|attribute_name| symbolized_attributes[attribute_name]})
+ end
end
module ClassMethods
@@ -108,7 +113,7 @@ module ClassMethods
# end
# end
# Post.table_name # => "special_posts"
- def table_name
+ def table_name(*table_partition_key_values)
reset_table_name unless defined?(@table_name)
@table_name
end
@@ -131,7 +136,7 @@ def table_name=(value)
@table_name = value
@quoted_table_name = nil
- @arel_table = nil
+ reset_arel_table
@sequence_name = nil unless defined?(@explicit_sequence_name) && @explicit_sequence_name
@relation = Relation.new(self, arel_table)
end
@@ -167,6 +172,97 @@ def inheritance_column=(value)
@explicit_inheritance_column = true
end
+ #
+ # Returns an array of attribute names (strings) used to fetch the key value(s)
+ # the determine this specific partition table.
+ #
+ # @return [String] the column name used to partition this table
+ # @return [Array<String>] the column names used to partition this table
+ def table_partition_keys
+ return []
+ end
+
+ #
+ # The specific values for a partition of this active record's type which are defined by
+ # {#self.table_partition_keys}
+ #
+ # @param [Hash] values key/value pairs to extract values from
+ # @return [Object] value of partition key
+ # @return [Array<Object>] values of partition keys
+ def table_partition_key_values(values)
+ symbolized_values = values.symbolize_keys
+ return self.table_partition_keys.map{|key| symbolized_values[key.to_sym]}
+ end
+
+ #
+ # This scoping is used to target the
+ # active record find() to a specific child table and alias it to the name of the
+ # parent table (so activerecord can generally work with it)
+ #
+ # Use as:
+ #
+ # Foo.from_partition(KEY).find(:first)
+ #
+ # where KEY is the key value(s) used as the check constraint on Foo's table.
+ #
+ # @param [*Array<Object>] partition_field the field values to partition on
+ # @return [Hash] the scoping
+ def from_partition(*partition_field)
+ table_alias_name = table_alias_name(*partition_field)
+ from("#{table_name(*partition_field)} AS #{table_alias_name}").
+ tap{|relation| relation.table.table_alias = table_alias_name}
+ end
+
+ #
+ # This scope is used to target the
+ # active record find() to a specific child table. Is probably best used in advanced
+ # activerecord queries when a number of tables are involved in the query.
+ #
+ # Use as:
+ #
+ # Foo.from_partitioned_without_alias(KEY).find(:all, :select => "*")
+ #
+ # where KEY is the key value(s) used as the check constraint on Foo's table.
+ #
+ # it's not obvious why :select => "*" is supplied. note activerecord wants
+ # to use the name of parent table for access to any attributes, so without
+ # the :select argument the sql result would be something like:
+ #
+ # SELECT foos.* FROM foos_partitions.pXXX
+ #
+ # which fails because table foos is not referenced. using the form #from_partition
+ # is almost always the correct thing when using activerecord.
+ #
+ # Because the scope is specific to a class (a class method) but unlike
+ # class methods is not inherited, one must use this form (#from_partitioned_without_alias) instead
+ # of #from_partitioned_without_alias_scope to get the most derived classes specific active record scope.
+ #
+ # @param [*Array<Object>] partition_field the field values to partition on
+ # @return [Hash] the scoping
+ def from_partitioned_without_alias(*partition_field)
+ table_alias_name = table_name(*partition_field)
+ from(table_alias_name).
+ tap{|relation| relation.table.table_alias = table_alias_name}
+ end
+
+ def table_alias_name(*partition_field)
+ return table_name(*partition_field)
+ end
+
+ #
+ # partitioning needs to be able to specify if
+ # we should prefetch the primary key (to determine
+ # the specific table we will insert in to we
+ # need to know the partition key values.
+ #
+ # this needs to be on the model NOT the connection
+ #
+ # for the simple case we just pass the question on to
+ # the connection
+ def prefetch_primary_key?
+ connection.prefetch_primary_key?(table_name)
+ end
+
def sequence_name
if base_class == self
@sequence_name ||= reset_sequence_name
@@ -113,7 +113,7 @@ def save!(*)
# callbacks, Observer methods, or any <tt>:dependent</tt> association
# options, use <tt>#destroy</tt>.
def delete
- self.class.delete(id) if persisted?
+ self.class.from_partition(*self.class.table_partition_key_values(attributes)).delete(id) if persisted?
@destroyed = true
freeze
end
@@ -303,9 +303,9 @@ def reload(options = nil)
fresh_object =
if options && options[:lock]
- self.class.unscoped { self.class.lock.find(id) }
+ self.class.unscoped { self.class.lock.from_partition(*self.class.table_partition_key_values(attributes)).find(id) }
else
- self.class.unscoped { self.class.find(id) }
+ self.class.unscoped { self.class.from_partition(*self.class.table_partition_key_values(attributes)).find(id) }
end
@attributes.update(fresh_object.instance_variable_get('@attributes'))
@@ -372,7 +372,7 @@ def relation_for_destroy
substitute = connection.substitute_at(column, 0)
relation = self.class.unscoped.where(
- self.class.arel_table[pk].eq(substitute))
+ arel_table[pk].eq(substitute))
relation.bind_values = [[column, id]]
relation
@@ -390,13 +390,24 @@ def update(attribute_names = @attributes.keys)
attributes_with_values = arel_attributes_with_values_for_update(attribute_names)
return 0 if attributes_with_values.empty?
klass = self.class
- stmt = klass.unscoped.where(klass.arel_table[klass.primary_key].eq(id)).arel.compile_update(attributes_with_values)
+ actual_arel_table = arel_table
+ # This is pretty hacky: we adjust the attributes so they are connected to the correct arel_table
+ # we can do this in two places and I've chosen here (which seems less intrusive).
+ # Alternatively we could hook into any attribute change (model.created_at = Time.now.utc) and
+ # adjust all arel_tables in all attributes when any of this model's partition key values change.
+ # That seems like a lot of work.
+ attributes_with_values = Hash[*attributes_with_values.map{|k,v| [actual_arel_table[k.name], v]}.flatten]
+ stmt = klass.unscoped.where(actual_arel_table[klass.primary_key].eq(id)).arel.compile_update(attributes_with_values)
klass.connection.update stmt
end
# Creates a record with values matching those of the instance attributes
# and returns its id.
def create
+ if self.id.nil? && self.class.prefetch_primary_key?
+ self.id = connection.next_sequence_value(self.class.sequence_name)
+ end
+
attributes_values = arel_attributes_with_values_for_create(!id.nil?)
new_id = self.class.unscoped.insert attributes_values
@@ -41,12 +41,12 @@ def insert(values)
if !primary_key_value && connection.prefetch_primary_key?(klass.table_name)
primary_key_value = connection.next_sequence_value(klass.sequence_name)
- values[klass.arel_table[klass.primary_key]] = primary_key_value
+ values[arel_table[klass.primary_key]] = primary_key_value
end
end
im = arel.create_insert
- im.into @table
+ im.into arel_table(Hash[*values.map{|k,v| [k.name,v]}.flatten])
conn = @klass.connection
@@ -57,7 +57,7 @@ def self.build(attribute, value)
array_predicates << values_predicate
array_predicates.inject { |composite, predicate| composite.or(predicate) }
when ActiveRecord::Relation
- value = value.select(value.klass.arel_table[value.klass.primary_key]) if value.select_values.empty?
+ value = value.select(value.arel_table[value.klass.primary_key]) if value.select_values.empty?
attribute.in(value.arel.ast)
when Range
attribute.in(value)