Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the primary key to use for the migration configurable #73

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -5,6 +5,7 @@
* #93 - Makes the atomic switcher retry on metadata locks (@camilo)
* #63 - Sets the LHM's session lock wait timeout variables (@camilo)
* #75 - Remove DataMapper and ActiveRecord 2.x support (@camilo)
* #73 - Allow configuration of the primary key used in migrations (@airhorns)

# 2.2.0 (Jan 16, 2015)

Expand Down
10 changes: 5 additions & 5 deletions README.md
Expand Up @@ -35,10 +35,10 @@ without locking the table. In contrast to [OAK][0] and the
[facebook tool][1], we only use a copy table and triggers.

The Large Hadron is a test driven Ruby solution which can easily be dropped
into an ActiveRecord or DataMapper migration. It presumes a single auto
incremented numerical primary key called id as per the Rails convention. Unlike
the [twitter solution][2], it does not require the presence of an indexed
`updated_at` column.
into an ActiveRecord or DataMapper migration. It presumes an auto incremented
numerical primary key (such as id by Rails conventions). Unlike the [twitter
solution][2], it does not require the presence of an indexed `updated_at`
column.

## Requirements

Expand All @@ -51,7 +51,7 @@ ActiveRecord 3.2.x and 4.x (mysql and mysql2 adapters).
## Limitations

Due to the Chunker implementation, Lhm requires that the table to migrate has a
a monotonically increasing numeric key column called `id`.
a monotonically increasing numeric column.

## Installation

Expand Down
4 changes: 3 additions & 1 deletion lib/lhm.rb
Expand Up @@ -40,12 +40,14 @@ module Lhm
# Use atomic switch to rename tables (defaults to: true)
# If using a version of mysql affected by atomic switch bug, LHM forces user
# to set this option (see SqlHelper#supports_atomic_switch?)
# @option options [String] :order_column
# Column name to order records by. This column must be unique for every record (defaults to: "id")
# @yield [Migrator] Yielded Migrator object records the changes
# @return [Boolean] Returns true if the migration finishes
# @raise [Error] Raises Lhm::Error in case of a error and aborts the migration
def change_table(table_name, options = {}, &block)
origin = Table.parse(table_name, connection)
invoker = Invoker.new(origin, connection)
invoker = Invoker.new(origin, connection, options)
block.call(invoker.migrator)
invoker.run(options)
true
Expand Down
6 changes: 3 additions & 3 deletions lib/lhm/chunker.rb
Expand Up @@ -52,16 +52,16 @@ def top(stride)
def copy(lowest, highest)
"insert ignore into `#{ destination_name }` (#{ destination_columns }) " \
"select #{ origin_columns } from `#{ origin_name }` " \
"#{ conditions } `#{ origin_name }`.`id` between #{ lowest } and #{ highest }"
"#{ conditions } `#{ origin_name }`.`#{ @migration.order_column }` between #{ lowest } and #{ highest }"
end

def select_start
start = connection.select_value("select min(id) from `#{ origin_name }`")
start = connection.select_value("select min(#{ @migration.order_column }) from `#{ origin_name }`")
start ? start.to_i : nil
end

def select_limit
limit = connection.select_value("select max(id) from `#{ origin_name }`")
limit = connection.select_value("select max(#{ @migration.order_column }) from `#{ origin_name }`")
limit ? limit.to_i : nil
end

Expand Down
3 changes: 2 additions & 1 deletion lib/lhm/entangler.rb
Expand Up @@ -17,6 +17,7 @@ def initialize(migration, connection = nil)
@intersection = migration.intersection
@origin = migration.origin
@destination = migration.destination
@order_column = migration.order_column
@connection = connection
end

Expand Down Expand Up @@ -59,7 +60,7 @@ def create_delete_trigger
create trigger `#{ trigger(:del) }`
after delete on `#{ @origin.name }` for each row
delete ignore from `#{ @destination.name }` #{ SqlHelper.annotation }
where `#{ @destination.name }`.`id` = OLD.`id`
where `#{ @destination.name }`.`#{ @order_column }` = OLD.`#{ @order_column }`
}
end

Expand Down
4 changes: 2 additions & 2 deletions lib/lhm/invoker.rb
Expand Up @@ -19,9 +19,9 @@ class Invoker

attr_reader :migrator, :connection

def initialize(origin, connection)
def initialize(origin, connection, options)
@connection = connection
@migrator = Migrator.new(origin, connection)
@migrator = Migrator.new(origin, connection, options)
end

def set_session_lock_wait_timeouts
Expand Down
5 changes: 3 additions & 2 deletions lib/lhm/migration.rb
Expand Up @@ -5,12 +5,13 @@

module Lhm
class Migration
attr_reader :origin, :destination, :conditions, :renames
attr_reader :origin, :destination, :conditions, :renames, :order_column

def initialize(origin, destination, conditions = nil, renames = {}, time = Time.now)
def initialize(origin, destination, order_column, conditions = nil, renames = {}, time = Time.now)
@origin = origin
@destination = destination
@conditions = conditions
@order_column = order_column
@start = time
@renames = renames
end
Expand Down
15 changes: 12 additions & 3 deletions lib/lhm/migrator.rb
Expand Up @@ -15,10 +15,11 @@ class Migrator

attr_reader :name, :statements, :connection, :conditions, :renames

def initialize(table, connection = nil)
def initialize(table, connection = nil, options = nil)
@connection = connection
@origin = table
@name = table.destination_name
@options = options
@statements = []
@renames = {}
end
Expand Down Expand Up @@ -188,7 +189,11 @@ def validate
end

unless @origin.satisfies_id_autoincrement_requirement?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no way for this to not error out when an order_column is specified.

@origin.satisfies_id_autoincrement_requirement? checks the id column, even if an order_column is set. Once that check fails, an error is set no matter what. The code should probably look more like:

      if @options[:order_column]
        if !@origin.can_use_order_column?(@options[:order_column])
          error("origin does not satisfy primary key requirements")
        end
      else
        if !@origin.satisfies_id_autoincrement_requirement?
          error("origin does not satisfy primary key requirements")
        end
      end

Sorry for the comment, not sure what the best way to submit changes to a PR is. Thanks!

error('origin does not satisfy primary key requirements')
if @options[:order_column] && !@origin.can_use_order_column?(@options[:order_column])
error("order column needs to be specified because no satisfactory primary key exists")
else
error("origin does not satisfy primary key requirements")
end
end

dest = @origin.destination_name
Expand All @@ -201,7 +206,7 @@ def validate
def execute
destination_create
@connection.sql(@statements)
Migration.new(@origin, destination_read, conditions, renames)
Migration.new(@origin, destination_read, order_column, conditions, renames)
end

def destination_create
Expand All @@ -225,5 +230,9 @@ def assert_valid_idx_name(index_name)
raise ArgumentError, 'index_name must be a string or symbol'
end
end

def order_column
@options[:order_column] || @origin.pk
end
end
end
20 changes: 16 additions & 4 deletions lib/lhm/table.rb
Expand Up @@ -16,9 +16,11 @@ def initialize(name, pk = 'id', ddl = nil)
end

def satisfies_id_autoincrement_requirement?
!!((id = columns['id']) &&
id[:extra] == 'auto_increment' &&
id[:type] =~ /int\(\d+\)/)
monotonically_increasing_numeric_key?(columns["id"])
end

def can_use_order_column?(column_name)
monotonically_increasing_numeric_key?(columns[column_name])
end

def destination_name
Expand Down Expand Up @@ -109,8 +111,18 @@ def extract_primary_key(schema)
defn[column_name]
end

keys.length == 1 ? keys.first : keys
case keys.length
when 0 then nil
when 1 then keys.first
else keys
end
end
end

def monotonically_increasing_numeric_key?(key)
!!(key &&
key[:extra] == 'auto_increment' &&
key[:type] =~ /int\(\d+\)/)
end
end
end
4 changes: 4 additions & 0 deletions spec/fixtures/no_primary_key.ddl
@@ -0,0 +1,4 @@
CREATE TABLE `no_primary_key` (
`foreign_id` int(11) NOT NULL,
`value` int(11) NOT NULL DEFAULT '0'
) ENGINE=InnoDB DEFAULT CHARSET=utf8
6 changes: 6 additions & 0 deletions spec/fixtures/primary_keys.ddl
@@ -0,0 +1,6 @@
CREATE TABLE `primary_keys` (
`weird_id` int(11) NOT NULL AUTO_INCREMENT,
`origin` int(11) DEFAULT NULL,
`common` varchar(255) DEFAULT NULL,
PRIMARY KEY (`weird_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
2 changes: 1 addition & 1 deletion spec/integration/atomic_switcher_spec.rb
Expand Up @@ -17,7 +17,7 @@
Thread.abort_on_exception = true
@origin = table_create("origin")
@destination = table_create("destination")
@migration = Lhm::Migration.new(@origin, @destination)
@migration = Lhm::Migration.new(@origin, @destination, "id")
Lhm.logger = Logger.new('/dev/null')
@connection.execute("SET GLOBAL innodb_lock_wait_timeout=3")
@connection.execute("SET GLOBAL lock_wait_timeout=3")
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/chunker_spec.rb
Expand Up @@ -14,7 +14,7 @@
before(:each) do
@origin = table_create(:origin)
@destination = table_create(:destination)
@migration = Lhm::Migration.new(@origin, @destination)
@migration = Lhm::Migration.new(@origin, @destination, "id")
end

it 'should copy 23 rows from origin to destination with time based throttler' do
Expand Down
4 changes: 2 additions & 2 deletions spec/integration/cleanup_spec.rb
Expand Up @@ -32,7 +32,7 @@

it 'should show temporary tables within range' do
table = OpenStruct.new(:name => 'users')
table_name = Lhm::Migration.new(table, nil, nil, {}, Time.now - 172800).archive_name
table_name = Lhm::Migration.new(table, nil, nil, nil, Time.now - 172800).archive_name
table_rename(:users, table_name)

output = capture_stdout do
Expand All @@ -44,7 +44,7 @@

it 'should exclude temporary tables outside range' do
table = OpenStruct.new(:name => 'users')
table_name = Lhm::Migration.new(table, nil, nil, {}, Time.now).archive_name
table_name = Lhm::Migration.new(table, nil, nil, nil, Time.now).archive_name
table_rename(:users, table_name)

output = capture_stdout do
Expand Down
6 changes: 3 additions & 3 deletions spec/integration/entangler_spec.rb
Expand Up @@ -14,9 +14,9 @@

describe 'entanglement' do
before(:each) do
@origin = table_create('origin')
@destination = table_create('destination')
@migration = Lhm::Migration.new(@origin, @destination)
@origin = table_create("origin")
@destination = table_create("destination")
@migration = Lhm::Migration.new(@origin, @destination, "id")
@entangler = Lhm::Entangler.new(@migration, connection)
end

Expand Down
38 changes: 36 additions & 2 deletions spec/integration/lhm_spec.rb
Expand Up @@ -323,8 +323,42 @@
end
end

describe 'parallel' do
it 'should perserve inserts during migration' do
it "should change a table with a primary key other than id" do
table_create(:primary_keys)

Lhm.change_table(:primary_keys, :atomic_switch => false, :order_column => 'weird_id') do |t|
t.change_column(:weird_id, "int(5)")
end

slave do
table_read(:primary_keys).columns["weird_id"].must_equal({
:type => "int(5)",
:is_nullable => "NO",
:column_default => "0",
:extra => ""
})
end
end

it "should change a table with a no primary key" do
table_create(:no_primary_key)

Lhm.change_table(:no_primary_key, :atomic_switch => false, :order_column => 'foreign_id') do |t|
t.change_column(:value, "text")
end

slave do
table_read(:no_primary_key).columns["value"].must_equal({
:type => "text",
:is_nullable => "YES",
:column_default => nil,
:extra => ""
})
end
end

describe "parallel" do
it "should perserve inserts during migration" do
50.times { |n| execute("insert into users set reference = '#{ n }'") }

insert = Thread.new do
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/lock_wait_timeout_spec.rb
Expand Up @@ -16,7 +16,7 @@
global_innodb_lock_wait_timeout = connection.execute("SHOW GLOBAL VARIABLES LIKE 'innodb_lock_wait_timeout'").first.last.to_i
global_lock_wait_timeout = connection.execute("SHOW GLOBAL VARIABLES LIKE 'lock_wait_timeout'").first.last.to_i

invoker = Lhm::Invoker.new(Lhm::Table.parse(:users, connection), connection)
invoker = Lhm::Invoker.new(Lhm::Table.parse(:users, connection), connection, nil)
invoker.set_session_lock_wait_timeouts

session_innodb_lock_wait_timeout = connection.execute("SHOW SESSION VARIABLES LIKE 'innodb_lock_wait_timeout'").first.last.to_i
Expand Down
6 changes: 3 additions & 3 deletions spec/integration/locked_switcher_spec.rb
Expand Up @@ -22,9 +22,9 @@

describe 'switching' do
before(:each) do
@origin = table_create('origin')
@destination = table_create('destination')
@migration = Lhm::Migration.new(@origin, @destination)
@origin = table_create("origin")
@destination = table_create("destination")
@migration = Lhm::Migration.new(@origin, @destination, "id")
end

it 'rename origin to archive' do
Expand Down
27 changes: 24 additions & 3 deletions spec/unit/chunker_spec.rb
Expand Up @@ -12,9 +12,9 @@
include UnitHelper

before(:each) do
@origin = Lhm::Table.new('foo')
@destination = Lhm::Table.new('bar')
@migration = Lhm::Migration.new(@origin, @destination)
@origin = Lhm::Table.new("foo")
@destination = Lhm::Table.new("bar")
@migration = Lhm::Migration.new(@origin, @destination, "id")
@connection = MiniTest::Mock.new
# This is a poor man's stub
@throttler = Object.new
Expand Down Expand Up @@ -141,4 +141,25 @@ def @migration.conditions
@connection.verify
end
end

describe "copy into with a different column to order by" do
before(:each) do
@migration = Lhm::Migration.new(@origin, @destination, "weird_id")
@origin.columns["secret"] = { :metadata => "VARCHAR(255)"}
@destination.columns["secret"] = { :metadata => "VARCHAR(255)"}
@chunker = Lhm::Chunker.new(@migration, @connection, :throttler => @throttler,
:start => 1,
:limit => 2)
end

it "should copy the correct range and column" do
@connection.expect(:update, 1) do |stmt|
stmt = stmt.first if stmt.is_a?(Array)
stmt =~ /where `foo`.`weird_id` between 1 and 1/
end

@chunker.run
@connection.verify
end
end
end
6 changes: 3 additions & 3 deletions spec/unit/entangler_spec.rb
Expand Up @@ -11,9 +11,9 @@
include UnitHelper

before(:each) do
@origin = Lhm::Table.new('origin')
@destination = Lhm::Table.new('destination')
@migration = Lhm::Migration.new(@origin, @destination)
@origin = Lhm::Table.new("origin")
@destination = Lhm::Table.new("destination")
@migration = Lhm::Migration.new(@origin, @destination, "id")
@entangler = Lhm::Entangler.new(@migration)
end

Expand Down
10 changes: 5 additions & 5 deletions spec/unit/migration_spec.rb
Expand Up @@ -11,18 +11,18 @@

before(:each) do
@start = Time.now
@origin = Lhm::Table.new('origin')
@destination = Lhm::Table.new('destination')
@migration = Lhm::Migration.new(@origin, @destination, nil, @start)
@origin = Lhm::Table.new("origin")
@destination = Lhm::Table.new("destination")
@migration = Lhm::Migration.new(@origin, @destination, "id", nil, @start)
end

it 'should name archive' do
stamp = "%Y_%m_%d_%H_%M_%S_#{ '%03d' % (@start.usec / 1000) }"
@migration.archive_name.must_equal "lhma_#{ @start.strftime(stamp) }_origin"
end

it 'should limit table name to 64 characters' do
migration = Lhm::Migration.new(OpenStruct.new(:name => 'a_very_very_long_table_name_that_should_make_the_LHMA_table_go_over_64_chars'), nil)
it "should limit table name to 64 characters" do
migration = Lhm::Migration.new(OpenStruct.new(:name => "a_very_very_long_table_name_that_should_make_the_LHMA_table_go_over_64_chars"), nil, "id")
migration.archive_name.size == 64
end
end