Skip to content
Browse files

Adding support for DataMapper

Basically we had to change every usage of

ActiveRecord::Base.connection
to use the new abstraction we created.

This led to more problems than expected when dealing with threading issues and connection availability to each thread.
  • Loading branch information...
1 parent 798168b commit ba34c750090b98e00730d7abdf05d441a6c23238 @filipesabella filipesabella committed
View
1 .travis.yml
@@ -8,3 +8,4 @@ gemfile:
- gemfiles/ar-2.3_mysql.gemfile
- gemfiles/ar-3.2_mysql.gemfile
- gemfiles/ar-3.2_mysql2.gemfile
+ - gemfiles/dm_mysql.gemfile
View
48 README.md
@@ -22,7 +22,7 @@ is great if you are using this engine, but only solves half the problem.
At SoundCloud we started having migration pains quite a while ago, and after
looking around for third party solutions, we decided to create our
own. We called it Large Hadron Migrator, and it is a gem for online
-ActiveRecord migrations.
+ActiveRecord and DataMapper migrations.
![LHC](http://farm4.static.flickr.com/3093/2844971993_17f2ddf2a8_z.jpg)
@@ -35,18 +35,22 @@ without locking the table. In contrast to [OAK][0] and the
[facebook tool][1], we only use a copy table and triggers.
The Large Hadron is a test driven Ruby solution which can easily be dropped
-into an ActiveRecord migration. It presumes a single auto incremented
-numerical primary key called id as per the Rails convention. Unlike the
-[twitter solution][2], it does not require the presence of an indexed
+into an ActiveRecord or DataMapper migration. It presumes a single auto
+incremented numerical primary key called id as per the Rails convention. Unlike
+the [twitter solution][2], it does not require the presence of an indexed
`updated_at` column.
## Requirements
Lhm currently only works with MySQL databases and requires an established
-ActiveRecord connection.
+ActiveRecord or DataMapper connection.
It is compatible and [continuously tested][4] with Ruby 1.8.7 and Ruby 1.9.x,
-ActiveRecord 2.3.x and 3.x as well as mysql and mysql2 adapters.
+ActiveRecord 2.3.x and 3.x (mysql and mysql2 adapters), as well as DataMapper
+1.2 (dm-mysql-adapter).
+
+Lhm also works with dm-master-slave-adapter, it'll bind to the master before
+running the migrations.
## Installation
@@ -66,6 +70,10 @@ ActiveRecord::Base.establish_connection(
:database => 'lhm'
)
+# or with DataMapper
+Lhm.setup(DataMapper.setup(:default, 'mysql://127.0.0.1/lhm'))
+
+# and migrate
Lhm.change_table :users do |m|
m.add_column :arbitrary, "INT(12)"
m.add_index [:arbitrary_id, :created_at]
@@ -97,7 +105,33 @@ class MigrateUsers < ActiveRecord::Migration
end
```
-**Note:** LHM won't delete the old, leftover table. This is on purpose, in order to prevent accidental data loss.
+Using dm-migrations, you'd define all your migrations as follows, and then call
+`migrate_up!` or `migrate_down!` as normal.
+
+```ruby
+require 'dm-migrations/migration_runner'
+require 'lhm'
+
+migration 1, :migrate_users do
+ up do
+ Lhm.change_table :users do |m|
+ m.add_column :arbitrary, "INT(12)"
+ m.add_index [:arbitrary_id, :created_at]
+ m.ddl("alter table %s add column flag tinyint(1)" % m.name)
+ end
+ end
+
+ down do
+ Lhm.change_table :users do |m|
+ m.remove_index [:arbitrary_id, :created_at]
+ m.remove_column :arbitrary
+ end
+ end
+end
+```
+
+**Note:** Lhm won't delete the old, leftover table. This is on purpose, in order
+to prevent accidental data loss.
## Table rename strategies
View
1 Rakefile
@@ -17,4 +17,3 @@ end
task :specs => [:unit, :integration]
task :default => :specs
-
View
6 bin/lhm-spec-clobber.sh
@@ -6,15 +6,15 @@ set -u
source ~/.lhm
lhmkill() {
- ps -ef | gsed -n "/[m]ysqld.*lhm-cluster/p" | awk '{ print $2 }' | xargs kill
- sleep 5
+ echo killing lhm-cluster
+ ps -ef | sed -n "/[m]ysqld.*lhm-cluster/p" | awk '{ print $2 }' | xargs kill
+ sleep 2
}
echo stopping other running mysql instance
launchctl remove com.mysql.mysqld || { echo launchctl did not remove mysqld; }
"$mysqldir"/bin/mysqladmin shutdown || { echo mysqladmin did not shut down anything; }
-echo killing lhm-cluster
lhmkill
echo removing $basedir
View
5 gemfiles/dm_mysql.gemfile
@@ -0,0 +1,5 @@
+source :rubygems
+
+gem 'dm-core'
+gem 'dm-mysql-adapter'
+gemspec :path=>"../"
View
2 lhm.gemspec
@@ -21,7 +21,5 @@ Gem::Specification.new do |s|
s.add_development_dependency "minitest", "= 2.10.0"
s.add_development_dependency "rake"
-
- s.add_dependency "activerecord"
end
View
17 lib/lhm.rb
@@ -1,9 +1,9 @@
# Copyright (c) 2011, SoundCloud Ltd., Rany Keddo, Tobias Bielohlawek, Tobias
# Schmidt
-require 'active_record'
require 'lhm/table'
require 'lhm/invoker'
+require 'lhm/connection'
require 'lhm/version'
# Large hadron migrator - online schema change tool
@@ -34,7 +34,8 @@ module Lhm
# @return [Boolean] Returns true if the migration finishes
# @raise [Error] Raises Lhm::Error in case of a error and aborts the migration
def self.change_table(table_name, options = {}, &block)
- connection = ActiveRecord::Base.connection
+ connection = Connection.new(adapter)
+
origin = Table.parse(table_name, connection)
invoker = Invoker.new(origin, connection)
block.call(invoker.migrator)
@@ -42,4 +43,16 @@ def self.change_table(table_name, options = {}, &block)
true
end
+
+ def self.setup(adapter)
+ @@adapter = adapter
+ end
+
+ def self.adapter
+ @@adapter ||=
+ begin
+ raise 'Please call Lhm.setup' unless defined?(ActiveRecord)
+ ActiveRecord::Base.connection
+ end
+ end
end
View
6 lib/lhm/atomic_switcher.rb
@@ -13,7 +13,6 @@ module Lhm
# Lhm::SqlHelper.supports_atomic_switch?.
class AtomicSwitcher
include Command
- include SqlHelper
attr_reader :connection
@@ -36,14 +35,15 @@ def atomic_switch
end
def validate
- unless table?(@origin.name) && table?(@destination.name)
+ unless @connection.table_exists?(@origin.name) &&
+ @connection.table_exists?(@destination.name)
error "`#{ @origin.name }` and `#{ @destination.name }` must exist"
end
end
private
def execute
- sql statements
+ @connection.sql(statements)
end
end
end
View
2 lib/lhm/chunker.rb
@@ -83,7 +83,7 @@ def validate
def execute
up_to do |lowest, highest|
- affected_rows = update(copy(lowest, highest))
+ affected_rows = @connection.update(copy(lowest, highest))
if affected_rows > 0
sleep(throttle_seconds)
View
143 lib/lhm/connection.rb
@@ -0,0 +1,143 @@
+module Lhm
+ require 'lhm/sql_helper'
+
+ class Connection
+ def self.new(adapter)
+ if defined?(DataMapper) && adapter.is_a?(DataMapper::Adapters::AbstractAdapter)
+ DataMapperConnection.new(adapter)
+ elsif defined?(ActiveRecord)
+ ActiveRecordConnection.new(adapter)
+ else
+ raise 'Neither DataMapper nor ActiveRecord found.'
+ end
+ end
+
+ class DataMapperConnection
+ include SqlHelper
+
+ def initialize(adapter)
+ @adapter = adapter
+ @database_name = adapter.options['path'][1..-1]
+ end
+
+ def sql(statements)
+ [statements].flatten.each do |statement|
+ execute(tagged(statement))
+ end
+ end
+
+ def show_create(table_name)
+ sql = "show create table `#{ table_name }`"
+ select_values(sql).last
+ end
+
+ def current_database
+ @database_name
+ end
+
+ def update(statements)
+ [statements].flatten.inject(0) do |memo, statement|
+ result = @adapter.execute(tagged(statement))
+ memo += result.affected_rows
+ end
+ end
+
+ def select_all(sql)
+ @adapter.select(sql).to_a
+ end
+
+ def select_one(sql)
+ select_all(sql).first
+ end
+
+ def select_values(sql)
+ select_one(sql).values
+ end
+
+ def select_value(sql)
+ select_one(sql)
+ end
+
+ def destination_create(origin)
+ original = %{CREATE TABLE "#{ origin.name }"}
+ replacement = %{CREATE TABLE "#{ origin.destination_name }"}
+
+ sql(origin.ddl.gsub(original, replacement))
+ end
+
+ def execute(sql)
+ @adapter.execute(sql)
+ end
+
+ def table_exists?(table_name)
+ !!select_one(%Q{
+ select *
+ from information_schema.tables
+ where table_schema = '#{ @database_name }'
+ and table_name = '#{ table_name }'
+ })
+ end
+ end
+
+ class ActiveRecordConnection
+ include SqlHelper
+
+ def initialize(adapter)
+ @adapter = adapter
+ @database_name = @adapter.current_database
+ end
+
+ def sql(statements)
+ [statements].flatten.each do |statement|
+ execute(tagged(statement))
+ end
+ end
+
+ def show_create(table_name)
+ sql = "show create table `#{ table_name }`"
+ specification = nil
+ execute(sql).each { |row| specification = row.last }
+ specification
+ end
+
+ def current_database
+ @database_name
+ end
+
+ def update(sql)
+ @adapter.update(sql)
+ end
+
+ def select_all(sql)
+ @adapter.select_all(sql)
+ end
+
+ def select_one(sql)
+ @adapter.select_one(sql)
+ end
+
+ def select_values(sql)
+ @adapter.select_values(sql)
+ end
+
+ def select_value(sql)
+ @adapter.select_value(sql)
+ end
+
+ def destination_create(origin)
+ original = %{CREATE TABLE `#{ origin.name }`}
+ replacement = %{CREATE TABLE `#{ origin.destination_name }`}
+
+ sql(origin.ddl.gsub(original, replacement))
+ end
+
+ def execute(sql)
+ @adapter.execute(sql)
+ end
+
+ def table_exists?(table_name)
+ @adapter.table_exists?(table_name)
+ end
+ end
+ end
+end
View
8 lib/lhm/entangler.rb
@@ -68,21 +68,21 @@ def trigger(type)
end
def validate
- unless table?(@origin.name)
+ unless @connection.table_exists?(@origin.name)
error("#{ @origin.name } does not exist")
end
- unless table?(@destination.name)
+ unless @connection.table_exists?(@destination.name)
error("#{ @destination.name } does not exist")
end
end
def before
- sql(entangle)
+ @connection.sql(entangle)
end
def after
- sql(untangle)
+ @connection.sql(untangle)
end
def revert
View
7 lib/lhm/locked_switcher.rb
@@ -53,7 +53,8 @@ def uncommitted(&block)
end
def validate
- unless table?(@origin.name) && table?(@destination.name)
+ unless @connection.table_exists?(@origin.name) &&
+ @connection.table_exists?(@destination.name)
error "`#{ @origin.name }` and `#{ @destination.name }` must exist"
end
end
@@ -61,11 +62,11 @@ def validate
private
def revert
- sql "unlock tables"
+ @connection.sql("unlock tables")
end
def execute
- sql statements
+ @connection.sql(statements)
end
end
end
View
11 lib/lhm/migrator.rb
@@ -142,7 +142,7 @@ def remove_index(columns, index_name = nil)
private
def validate
- unless table?(@origin.name)
+ unless @connection.table_exists?(@origin.name)
error("could not find origin table #{ @origin.name }")
end
@@ -152,22 +152,19 @@ def validate
dest = @origin.destination_name
- if table?(dest)
+ if @connection.table_exists?(dest)
error("#{ dest } should not exist; not cleaned up from previous run?")
end
end
def execute
destination_create
- sql(@statements)
+ @connection.sql(@statements)
Migration.new(@origin, destination_read)
end
def destination_create
- original = "CREATE TABLE `#{ @origin.name }`"
- replacement = "CREATE TABLE `#{ @origin.destination_name }`"
-
- sql(@origin.ddl.gsub(original, replacement))
+ @connection.destination_create(@origin)
end
def destination_read
View
34 lib/lhm/sql_helper.rb
@@ -20,28 +20,10 @@ def idx_spec(cols)
end.join(', ')
end
- def table?(table_name)
- connection.table_exists?(table_name)
- end
-
- def sql(statements)
- [statements].flatten.each do |statement|
- connection.execute(tagged(statement))
- end
- rescue ActiveRecord::StatementInvalid => e
- error e.message
- end
-
- def update(statements)
- [statements].flatten.inject(0) do |memo, statement|
- memo += connection.update(tagged(statement))
- end
- rescue ActiveRecord::StatementInvalid => e
- error e.message
- end
-
def version_string
- connection.select_one("show variables like 'version'")["Value"]
+ row = connection.select_one("show variables like 'version'")
+ value = struct_key(row, "Value")
+ row[value]
end
private
@@ -81,5 +63,15 @@ def supports_atomic_switch?
end
return true
end
+
+ def struct_key(struct, key)
+ keys = if struct.is_a? Hash
+ struct.keys
+ else
+ struct.members
+ end
+
+ keys.find {|k| k.to_s.downcase == key.to_s.downcase }
+ end
end
end
View
42 lib/lhm/table.rb
@@ -37,10 +37,7 @@ def initialize(table_name, connection)
end
def ddl
- sql = "show create table `#{ @table_name }`"
- specification = nil
- @connection.execute(sql).each { |row| specification = row.last }
- specification
+ @connection.show_create(@table_name)
end
def parse
@@ -48,10 +45,14 @@ def parse
Table.new(@table_name, extract_primary_key(schema), ddl).tap do |table|
schema.each do |defn|
- table.columns[defn["COLUMN_NAME"]] = {
- :type => defn["COLUMN_TYPE"],
- :is_nullable => defn["IS_NULLABLE"],
- :column_default => defn["COLUMN_DEFAULT"]
+ column_name = struct_key(defn, "COLUMN_NAME")
+ column_type = struct_key(defn, "COLUMN_TYPE")
+ is_nullable = struct_key(defn, "IS_NULLABLE")
+ column_default = struct_key(defn, "COLUMN_DEFAULT")
+ table.columns[defn[column_name]] = {
+ :type => defn[column_type],
+ :is_nullable => defn[is_nullable],
+ :column_default => defn[column_default]
}
end
@@ -67,20 +68,25 @@ def read_information_schema
@connection.select_all %Q{
select *
from information_schema.columns
- where table_name = "#{ @table_name }"
- and table_schema = "#{ @schema_name }"
+ where table_name = '#{ @table_name }'
+ and table_schema = '#{ @schema_name }'
}
end
def read_indices
@connection.select_all %Q{
show indexes from `#{ @schema_name }`.`#{ @table_name }`
- where key_name != "PRIMARY"
+ where key_name != 'PRIMARY'
}
end
def extract_indices(indices)
- indices.map { |row| [row["Key_name"], row["Column_name"]] }.
+ indices.
+ map do |row|
+ key_name = struct_key(row, "Key_name")
+ column_name = struct_key(row, "COLUMN_NAME")
+ [row[key_name], row[column_name]]
+ end.
inject(Hash.new { |h, k| h[k] = []}) do |memo, (idx, column)|
memo[idx] << column
memo
@@ -88,8 +94,16 @@ def extract_indices(indices)
end
def extract_primary_key(schema)
- cols = schema.select { |defn| defn["COLUMN_KEY"] == "PRI" }
- keys = cols.map { |defn| defn["COLUMN_NAME"] }
+ cols = schema.select do |defn|
+ column_key = struct_key(defn, "COLUMN_KEY")
+ defn[column_key] == "PRI"
+ end
+
+ keys = cols.map do |defn|
+ column_name = struct_key(defn, "COLUMN_NAME")
+ defn[column_name]
+ end
+
keys.length == 1 ? keys.first : keys
end
end
View
4 spec/integration/atomic_switcher_spec.rb
@@ -14,9 +14,9 @@
describe "switching" do
before(:each) do
- @origin = table_create("origin")
+ @origin = table_create("origin")
@destination = table_create("destination")
- @migration = Lhm::Migration.new(@origin, @destination)
+ @migration = Lhm::Migration.new(@origin, @destination)
end
it "rename origin to archive" do
View
51 spec/integration/integration_helper.rb
@@ -3,22 +3,27 @@
require File.expand_path(File.dirname(__FILE__)) + "/../bootstrap"
-require 'active_record'
begin
- require 'mysql2'
+ require 'active_record'
+ begin
+ require 'mysql2'
+ rescue LoadError
+ require 'mysql'
+ end
rescue LoadError
- require 'mysql'
+ require 'dm-core'
+ require 'dm-mysql-adapter'
end
require 'lhm/table'
require 'lhm/sql_helper'
+require 'lhm/connection'
module IntegrationHelper
#
# Connectivity
#
-
def connection
- ActiveRecord::Base.connection
+ @connection
end
def connect_master!
@@ -30,28 +35,37 @@ def connect_slave!
end
def connect!(port)
- ActiveRecord::Base.establish_connection(
- :adapter => defined?(Mysql2) ? 'mysql2' : 'mysql',
- :host => '127.0.0.1',
- :database => 'lhm',
- :username => '',
- :port => port
- )
+ adapter = nil
+ if defined?(ActiveRecord)
+ ActiveRecord::Base.establish_connection(
+ :adapter => defined?(Mysql2) ? 'mysql2' : 'mysql',
+ :host => '127.0.0.1',
+ :database => 'lhm',
+ :username => 'root',
+ :port => port
+ )
+ adapter = ActiveRecord::Base.connection
+ elsif defined?(DataMapper)
+ adapter = DataMapper.setup(:default, "mysql://root@localhost:#{port}/lhm")
+ end
+
+ Lhm.setup(adapter)
+ @connection = Lhm::Connection.new(adapter)
end
def select_one(*args)
- connection.select_one(*args)
+ @connection.select_one(*args)
end
def select_value(*args)
- connection.select_value(*args)
+ @connection.select_value(*args)
end
def execute(*args)
retries = 10
begin
- connection.execute(*args)
- rescue ActiveRecord::StatementInvalid => e
+ @connection.execute(*args)
+ rescue => e
if (retries -= 1) > 0 && e.message =~ /Table '.*?' doesn't exist/
sleep 0.1
retry
@@ -69,8 +83,11 @@ def slave(&block)
# check the master binlog position and wait for the slave to catch up
# to that position.
sleep 1
+ elsif
+ connect_master!
end
+
yield block
if master_slave_mode?
@@ -93,7 +110,7 @@ def table_create(fixture_name)
end
def table_read(fixture_name)
- Lhm::Table.parse(fixture_name, connection)
+ Lhm::Table.parse(fixture_name, @connection)
end
def table_exists?(table)
View
3 spec/integration/lhm_spec.rb
@@ -161,10 +161,12 @@
insert = Thread.new do
10.times do |n|
+ connect_master!
execute("insert into users set reference = '#{ 100 + n }'")
sleep(0.17)
end
end
+ sleep 2
options = { :stride => 10, :throttle => 97, :atomic_switch => false }
Lhm.change_table(:users, options) do |t|
@@ -187,6 +189,7 @@
sleep(0.17)
end
end
+ sleep 2
options = { :stride => 10, :throttle => 97, :atomic_switch => false }
Lhm.change_table(:users, options) do |t|
View
40 spec/unit/active_record_connection_spec.rb
@@ -0,0 +1,40 @@
+# Copyright (c) 2011, SoundCloud Ltd.
+
+require File.expand_path(File.dirname(__FILE__)) + '/unit_helper'
+require 'lhm/connection'
+
+if defined?(ActiveRecord)
+ describe Lhm::Connection::ActiveRecordConnection do
+ let(:active_record) { MiniTest::Mock.new }
+
+ before do
+ active_record.expect :current_database, 'the db'
+ end
+
+ after do
+ active_record.verify
+ end
+
+ it 'creates an ActiveRecord connection when the DM classes are not there' do
+ connection.must_be_instance_of(Lhm::Connection::ActiveRecordConnection)
+ end
+
+ it 'initializes the db name from the connection' do
+ connection.current_database.must_equal('the db')
+ end
+
+ it 'backticks the table names' do
+ table_name = 'my_table'
+
+ active_record.expect :execute,
+ [['returned sql']],
+ ["show create table `#{table_name}`"]
+
+ connection.show_create(table_name)
+ end
+
+ def connection
+ Lhm::Connection.new(active_record)
+ end
+ end
+end
View
11 spec/unit/connection_spec.rb
@@ -0,0 +1,11 @@
+# Copyright (c) 2011, SoundCloud Ltd.
+
+require 'lhm/connection'
+
+describe Lhm::Connection do
+ it 'raises an exception when it cannot find the dependencies' do
+ assert_raises(NameError) do
+ Connection.new({})
+ end
+ end
+end
View
43 spec/unit/datamapper_connection_spec.rb
@@ -0,0 +1,43 @@
+# Copyright (c) 2011, SoundCloud Ltd.
+
+require File.expand_path(File.dirname(__FILE__)) + '/unit_helper'
+require 'lhm/connection'
+
+if defined?(DataMapper)
+ describe Lhm::Connection::DataMapperConnection do
+ let(:data_mapper) { MiniTest::Mock.new }
+ let(:options) { { 'path' => '/the db' } }
+
+ before do
+ data_mapper.expect :is_a?, true, [DataMapper::Adapters::AbstractAdapter]
+ data_mapper.expect :options, options
+ end
+
+ after do
+ data_mapper.verify
+ end
+
+ it 'creates a DataMapperConnection when the adapter is from DM' do
+ connection.must_be_instance_of(Lhm::Connection::DataMapperConnection)
+ end
+
+
+ it 'initializes the db name from the options' do
+ connection.current_database.must_equal('the db')
+ end
+
+ it 'backticks the table names' do
+ table_name = 'my_table'
+
+ data_mapper.expect :select,
+ [{ :sql => 'returned sql' }],
+ ["show create table `#{table_name}`"]
+
+ connection.show_create(table_name)
+ end
+
+ def connection
+ Lhm::Connection.new(data_mapper)
+ end
+ end
+end
View
11 spec/unit/unit_helper.rb
@@ -3,6 +3,17 @@
require File.expand_path(File.dirname(__FILE__)) + "/../bootstrap"
+begin
+ require 'active_record'
+ begin
+ require 'mysql2'
+ rescue LoadError
+ require 'mysql'
+ end
+rescue LoadError
+ require 'dm-core'
+end
+
module UnitHelper
def fixture(name)
File.read $fixtures.join(name)

0 comments on commit ba34c75

Please sign in to comment.
Something went wrong with that request. Please try again.