Skip to content

Commit

Permalink
Add #each_server to Database and Dataset, for running operations on a…
Browse files Browse the repository at this point in the history
…ll shards

Sequel has supported sharding for a long time (since 2.4.0), but it
hasn't had built in support for updating all shards at once until
now.  Database#each_server and Dataset#each_server are designed to
fill those needs.

Database#each_server yields a new Database object for each server,
and is intended for use when you want to run schema modification
(DDL) queries, or other custom SQL against all shards.

Dataset#each_server yields copies of the current dataset, each
of which is tied to a separate server in the connection pool.
It's intended for when you want to select/insert/update/delete
from all shards.

To implement this, ConnectionPool#servers and Database#servers
methods were added.
  • Loading branch information
jeremyevans committed Dec 14, 2009
1 parent 43e862c commit adf6a89
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG
@@ -1,5 +1,9 @@
=== HEAD

* Add Database#each_server, which yields a new Database object for each server in the connection pool which is connected to only that server (jeremyevans)

* Add Dataset#each_server, which yields a dataset for each server in the connection pool which is will execute on that server (jeremyevans)

* Remove meta_eval and metaclass private methods from Sequel::Metaprogramming (jeremyevans)

* Merge Dataset::FROM_SELF_KEEP_OPTS into Dataset::NON_SQL_OPTIONS (jeremyevans)
Expand Down
5 changes: 5 additions & 0 deletions lib/sequel/connection_pool.rb
Expand Up @@ -178,6 +178,11 @@ def remove_servers(servers)
end
end

# Return an array of symbols for servers in the connection pool.
def servers
sync{@servers.keys}
end

private

# Assigns a connection to the supplied thread for the given server, if one
Expand Down
24 changes: 19 additions & 5 deletions lib/sequel/database.rb
Expand Up @@ -309,6 +309,15 @@ def disconnect(opts = {})
pool.disconnect(opts)
end

# Yield a new database object for every server in the connection pool.
# Intended for use in sharded environments where there is a need to make schema
# modifications (DDL queries) on each shard.
#
# DB.each_server{|db| db.create_table(:users){primary_key :id; String :name}}
def each_server(&block)
servers.each{|s| self.class.connect(server_opts(s), &block)}
end

# Executes the given SQL on the database. This method should be overridden in descendants.
# This method should not be called directly by user code.
def execute(sql, opts={})
Expand Down Expand Up @@ -475,11 +484,6 @@ def run(sql, opts={})
nil
end

# Returns a new dataset with the select method invoked.
def select(*args, &block)
dataset.select(*args, &block)
end

# Parse the schema from the database.
# Returns the schema for the given table as an array with all members being arrays of length 2,
# the first member being the column name, and the second member being a hash of column information.
Expand Down Expand Up @@ -508,6 +512,16 @@ def schema(table, opts={})
@schemas[quoted_name] = cols
end

# Returns a new dataset with the select method invoked.
def select(*args, &block)
dataset.select(*args, &block)
end

# An array of servers/shards for this Database object.
def servers
pool.servers
end

# Returns true if the database is using a single-threaded connection pool.
def single_threaded?
@single_threaded
Expand Down
9 changes: 9 additions & 0 deletions lib/sequel/dataset.rb
Expand Up @@ -120,6 +120,15 @@ def def_mutation_method(*meths)
end
end

# Yield a dataset for each server in the connection pool that is tied to that server.
# Intended for use in sharded environments where all servers need to be modified
# with the same data:
#
# DB[:configs].where(:key=>'setting').each_server{|ds| ds.update(:value=>'new_value')}
def each_server
db.servers.each{|s| yield server(s)}
end

# Returns a string representation of the dataset including the class name
# and the corresponding SQL select statement.
def inspect
Expand Down
4 changes: 4 additions & 0 deletions spec/core/connection_pool_spec.rb
Expand Up @@ -414,6 +414,10 @@ def value
@pool = Sequel::ConnectionPool.new(CONNECTION_POOL_DEFAULTS.merge(:servers=>{:read_only=>{}})){|server| "#{server}#{@invoked_counts[server] += 1}"}
end

specify "#servers should return symbols for all servers" do
@pool.servers.sort_by{|s| s.to_s}.should == [:default, :read_only]
end

specify "should use the :default server by default" do
@pool.size.should == 0
@pool.hold do |c|
Expand Down
36 changes: 36 additions & 0 deletions spec/core/database_spec.rb
Expand Up @@ -1330,6 +1330,42 @@ def @db.disconnect_connection(c)
end
end

context "Database#each_server" do
before do
@db = Sequel.connect(:adapter=>:mock, :host=>1, :database=>2, :servers=>{:server1=>{:host=>3}, :server2=>{:host=>4}})
def @db.connect(server)
server_opts(server)
end
def @db.disconnect_connection(c)
end
end

specify "should yield a separate database object for each server" do
hosts = []
@db.each_server do |db|
db.should be_a_kind_of(Sequel::Database)
db.should_not == @db
db.opts[:database].should == 2
hosts << db.opts[:host]
end
hosts.sort.should == [1, 3, 4]
end

specify "should disconnect and remove entry from Sequel::DATABASES after use" do
dbs = []
dcs = []
@db.each_server do |db|
dbs << db
Sequel::DATABASES.should include(db)
db.meta_def(:disconnect){dcs << db}
end
dbs.each do |db|
Sequel::DATABASES.should_not include(db)
end
dbs.should == dcs
end
end

context "Database#raise_error" do
specify "should reraise if the exception class is not in opts[:classes]" do
e = Class.new(StandardError)
Expand Down
24 changes: 24 additions & 0 deletions spec/core/dataset_spec.rb
Expand Up @@ -3069,6 +3069,30 @@ def @ds.fetch_rows(sql, &block)
end
end

context "Sequel::Dataset#each_server" do
before do
@db = Sequel::Database.new(:servers=>{:s=>{}, :i=>{}})
@ds = @db[:items]
sqls = @sqls = []
@db.meta_def(:execute) do |sql, opts|
sqls << [sql, opts[:server]]
end
def @ds.fetch_rows(sql, &block)
execute(sql)
end
end

specify "should yield a dataset for each server" do
@ds.each_server do |ds|
ds.should be_a_kind_of(Sequel::Dataset)
ds.should_not == @ds
ds.sql.should == @ds.sql
ds.all
end
@sqls.sort_by{|sql, s| s.to_s}.should == [['SELECT * FROM items', :default], ['SELECT * FROM items', :i], ['SELECT * FROM items', :s]]
end
end

context "Sequel::Dataset #set_defaults" do
before do
@ds = Sequel::Dataset.new(nil).from(:items).set_defaults(:x=>1)
Expand Down
1 change: 1 addition & 0 deletions spec/core/spec_helper.rb
Expand Up @@ -24,6 +24,7 @@ def quoted_identifier(c)
end

class MockDatabase < Sequel::Database
set_adapter_scheme :mock
@@quote_identifiers = false
self.identifier_input_method = nil
self.identifier_output_method = nil
Expand Down

0 comments on commit adf6a89

Please sign in to comment.