Skip to content
Browse files

implement simple queries across partitioned data

  • Loading branch information...
1 parent ba20ec4 commit 6306c6fcb87be36e15b864c376ef515d9851cab4 @rcarver committed Mar 21, 2012
View
3 features/sharding.feature
@@ -51,6 +51,9 @@ Feature: Automatically shard data into multiple tables
20120101, counts_201201, 2
20120102, counts_201201, 2
20120201, counts_201202, 1
+ 20120101, ideas, 10
+ 20120102, ideas, 10
+ 20120201, ideas, 10
"""
And my database looks like:
"""
View
4 lib/orel.rb
@@ -49,6 +49,7 @@
require 'orel/sharding'
require 'orel/sharding/namer'
require 'orel/sharding/partitioned_table'
+require 'orel/sharding/partitioned_query'
require 'orel/sharding/partitioner'
module Orel
@@ -73,6 +74,9 @@ def self.finalize!
end
AR = ActiveRecord::Base
+ def AR.inspect
+ "[AR]"
+ end
def self.logger=(logger)
AR.logger = logger
View
3 lib/orel/connection.rb
@@ -39,8 +39,9 @@ def arel_table(target)
name = case target
when Orel::Relation::Heading then target.namer.table_name
when Orel::Relation::Namer then target.table_name
+ when Orel::Table then target.name
when Symbol then target
- else raise ArgumentError, "Cannot convert #{target.inspect} to table name"
+ else raise ArgumentError, "Cannot convert #{target.class} to table name"
end
@arel_tables[name] ||= Arel::Table.new(name, @active_record)
end
View
10 lib/orel/sharding.rb
@@ -3,9 +3,11 @@ module Orel
# data from multiple database tables for one logical heading.
module Sharding
- def self.included(base)
- #raise ArgumentError, "Orel::Sharding requires Orel::Relation"
+ def self.extended(base)
#raise ArgumentError, "Orel::Sharding is not supported for Orel::Object"
+ base.class_eval do
+ extend Orel::Relation
+ end
end
# Public: Partition records in this heading into multiple underlying tables.
@@ -36,7 +38,7 @@ def shard_table_on(attribute, &block)
# Returns an Orel::Sharding::ParitionedTable.
def table(child_name=nil)
raise ArgumentError, "Child table is not supported" if child_name
- Sharding::ParitionedTable.new(@shard_partitioner)
+ Sharding::PartitionedTable.new(@shard_partitioner)
end
# Public: Get access to a single table partition.
@@ -49,5 +51,7 @@ def partition_for(attributes)
@shard_partitioner.get_partition_for_attributes(attributes, false)
end
+ attr_reader :shard_partitioner
+
end
end
View
115 lib/orel/sharding/partitioned_query.rb
@@ -0,0 +1,115 @@
+module Orel
+ module Sharding
+ # A set of classes that wrap Arel syntax in order to capture
+ # information about the query in order to perform it against
+ # the relevant partitions.
+ module PartitionedQuery
+
+ # Accumulates information about the partitioned attributes
+ # that we're querying against and then returns the relevant
+ # partitions.
+ class PartitionAccumulator
+
+ def initialize(partitioner)
+ @partitioner = partitioner
+ @values = []
+ end
+
+ def attr?(name)
+ @partitioner.partitioned_attribute == name
+ end
+
+ def add_value(name, values)
+ @values.concat values
+ end
+
+ def get_partitions
+ if @values.empty?
+ @partitioner.get_all_partitions
+ else
+ partitions = {}
+ @values.each { |value|
+ partition = @partitioner.get_partition_for_attributes(@partitioner.partitioned_attribute => value)
+ partitions[partition.name] = partition
+ }
+ partitions.values
+ end
+ end
+ end
+
+ # Quacks like an Arel::SelectManager.
+ class SelectManagerProxy
+
+ def initialize
+ @commands = []
+ end
+
+ def project(*attribute_proxies)
+ @commands << ([:project] << attribute_proxies)
+ end
+
+ def where(attribute_proxy)
+ @commands << [:where, attribute_proxy]
+ end
+
+ def get_arel_select_manager(arel_table)
+ manager = Arel::SelectManager.new(arel_table.engine)
+ @commands.each { |command|
+ manager.send(*dereference(arel_table, *command))
+ }
+ manager
+ end
+
+ def dereference(arel_table, *args)
+ result = args.map { |arg|
+ if arg.is_a?(Array)
+ dereference(arel_table, *arg)
+ elsif arg.is_a?(AttributeProxy)
+ arg.dereference(arel_table)
+ else
+ arg
+ end
+ }
+ end
+ end
+
+ # Quacks like an Arel::Table.
+ class TableProxy
+
+ def initialize(partition_accumulator)
+ @partition_accumulator = partition_accumulator
+ end
+
+ def [](name)
+ AttributeProxy.new(@partition_accumulator, name)
+ end
+ end
+
+ # Quacks like an Arel::Attribute.
+ class AttributeProxy
+
+ def initialize(partition_accumulator, name)
+ @partition_accumulator = partition_accumulator
+ @name = name
+ @commands = []
+ end
+
+ def method_missing(message, *args, &block)
+ if @partition_accumulator.attr?(@name)
+ # TODO: only allow sane predicates like eq and in
+ # TODO: and ensure that we pass good values here.
+ @partition_accumulator.add_value(@name, args.first)
+ end
+ @commands << ([message] << args)
+ self
+ end
+
+ def dereference(arel_table)
+ attr = Arel::Attribute.new(arel_table, @name)
+ @commands.inject(attr) { |node, c| node.send(*c) }
+ end
+ end
+
+ end
+ end
+end
View
27 lib/orel/sharding/partitioned_table.rb
@@ -1,7 +1,7 @@
module Orel
module Sharding
# Decorates a Table so that all operations act on the appropriate shard.
- class ParitionedTable
+ class PartitionedTable
def initialize(partitioner)
@partitioner = partitioner
@@ -12,9 +12,30 @@ def insert(attributes)
table.insert(attributes)
end
- def query(&block)
- []
+ def upsert(options)
+ table = @partitioner.get_partition_for_attributes(options[:insert], true)
+ table.upsert(options)
end
+
+ def query(description=nil, &block)
+ accumulator = PartitionedQuery::PartitionAccumulator.new(@partitioner)
+ table_proxy = PartitionedQuery::TableProxy.new(accumulator)
+ manager_proxy = PartitionedQuery::SelectManagerProxy.new
+
+ yield manager_proxy, table_proxy
+
+ results = []
+
+ accumulator.get_partitions.each { |table|
+ a_table = @partitioner.connection.arel_table(table)
+ a_manager = manager_proxy.get_arel_select_manager(a_table)
+ a_manager.from a_table
+ results.concat @partitioner.connection.execute(a_manager.to_sql, description || "#{self.class} Query #{table.name}").each(:as => :hash, :symbolize_keys => true)
+ }
+
+ results
+ end
+
end
end
end
View
20 lib/orel/sharding/partitioner.rb
@@ -2,10 +2,10 @@ module Orel
module Sharding
class Partitioner
- def initialize(heading, connection, sharded_attribute)
+ def initialize(heading, connection, partitioned_attribute)
@heading = heading
@connection = connection
- @sharded_attribute = sharded_attribute
+ @partitioned_attribute = partitioned_attribute
end
def suffix_heading(suffix)
@@ -17,17 +17,29 @@ def partition_with(&block)
@shard_block = block
end
+ def get_all_partitions
+ @connection.query("SHOW TABLES LIKE '#{@table_namer.table_name}_%'").flatten.map { |row|
+ name = row.to_sym
+ unless name == @heading.namer.table_name
+ Table.new(name, @heading, @connection)
+ end
+ }.compact
+ end
+
def get_partition_for_attributes(attributes, create=false)
namer = get_namer_for_attributes(attributes)
create_table(namer) if create
Table.new(namer.table_name, @heading, @connection)
end
+ attr_reader :partitioned_attribute
+ attr_reader :connection
+
protected
def get_namer_for_attributes(attributes)
- value = attributes[@sharded_attribute]
- raise ArgumentError, "Missing value for #{@sharded_attribute}" unless value
+ value = attributes[@partitioned_attribute]
+ raise ArgumentError, "Missing value for #{@partitioned_attribute}" unless value
suffix = @shard_block.call(value)
Orel::Sharding::Namer.new(@table_namer, suffix)
end
View
12 spec/fixtures/daily_accumulation.rb
@@ -0,0 +1,12 @@
+class DailyAccumulation
+ extend Orel::Sharding
+ heading do
+ key { day / thing }
+ att :day, Orel::Domains::String
+ att :thing, Orel::Domains::String
+ att :count, Orel::Domains::Integer
+ end
+ shard_table_on(:day) do |day|
+ day[0, 6]
+ end
+end
View
3 spec/helper.rb
@@ -15,13 +15,14 @@
)
require 'fixtures/users_and_things'
+require 'fixtures/daily_accumulation'
RSpec.configure do |config|
config.before(:suite) do
Orel.finalize!
Orel.recreate_database!
Orel.create_tables!
- DatabaseCleaner.strategy = :transaction
+ DatabaseCleaner.strategy = :truncation
end
config.before(:each) do
DatabaseCleaner.start
View
76 spec/sharding/partitioned_table_spec.rb
@@ -0,0 +1,76 @@
+require 'helper'
+
+describe Orel::Sharding::PartitionedTable do
+
+ let(:klass) { DailyAccumulation }
+ let(:connection) { klass.connection }
+ let(:partitioner) { klass.shard_partitioner }
+
+ subject { described_class.new(partitioner) }
+
+ def database_tables
+ connection.query("show tables").first
+ end
+
+ def table_named(name)
+ Orel::Table.new(name, klass.get_heading, connection)
+ end
+
+ it "inserts into the appropriate partition" do
+ subject.insert(:day => "20120101", :thing => "this", :count => 1)
+
+ table_named(:daily_accumulations_201201).row_count.should == 1
+ table_named(:daily_accumulations_201201).row_list.should == [
+ { :day => "20120101", :thing => "this", :count => 1 }
+ ]
+ end
+
+ it "upserts into the appropriate partition" do
+ subject.insert(:day => "20120101", :thing => "this", :count => 1)
+
+ subject.upsert(
+ :insert => { :day => "20120101", :thing => "this", :count => 1 },
+ :update => { :values => [:count], :with => :increment }
+ )
+
+ table_named(:daily_accumulations_201201).row_count.should == 1
+ table_named(:daily_accumulations_201201).row_list.should == [
+ { :day => "20120101", :thing => "this", :count => 2 }
+ ]
+ end
+
+ describe "#query" do
+ it "queries against appropriate partitions" do
+ subject.insert(:day => "20120101", :thing => "this", :count => 1)
+ subject.insert(:day => "20120201", :thing => "that", :count => 2)
+ subject.insert(:day => "20120202", :thing => "more", :count => 3)
+ subject.insert(:day => "20120203", :thing => "than", :count => 4)
+
+ result = subject.query { |q, table|
+ q.project table[:day], table[:thing], table[:count]
+ q.where table[:day].in(["20120101", "20120201", "20120202"])
+ }
+
+ result.should =~ [
+ { :day => "20120101", :thing => "this", :count => 1 },
+ { :day => "20120201", :thing => "that", :count => 2 },
+ { :day => "20120202", :thing => "more", :count => 3 }
+ ]
+ end
+ it "queries against all known partitions" do
+ subject.insert(:day => "20120101", :thing => "this", :count => 1)
+ subject.insert(:day => "20120201", :thing => "that", :count => 1)
+ subject.insert(:day => "20120202", :thing => "that", :count => 3)
+
+ result = subject.query { |q, table|
+ q.project table[:day], table[:thing], table[:count]
+ q.where table[:count].lt(3)
+ }
+
+ result.should =~ [
+ { :day => "20120101", :thing => "this", :count => 1 },
+ { :day => "20120201", :thing => "that", :count => 1 }
+ ]
+ end
+ end
+end

0 comments on commit 6306c6f

Please sign in to comment.
Something went wrong with that request. Please try again.