-
Notifications
You must be signed in to change notification settings - Fork 189
/
invoker.rb
81 lines (67 loc) · 2.51 KB
/
invoker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# Copyright (c) 2011 - 2013, SoundCloud Ltd., Rany Keddo, Tobias Bielohlawek, Tobias
# Schmidt
require 'lhm/chunker'
require 'lhm/entangler'
require 'lhm/atomic_switcher'
require 'lhm/locked_switcher'
require 'lhm/migrator'
module Lhm
# Copies an origin table to an altered destination table. Live activity is
# synchronized into the destination table using triggers.
#
# Once the origin and destination tables have converged, origin is archived
# and replaced by destination.
class Invoker
include SqlHelper
LOCK_WAIT_TIMEOUT_DELTA = -2
attr_reader :migrator, :connection
def initialize(origin, connection)
@connection = connection
@migrator = Migrator.new(origin, connection)
end
def set_session_lock_wait_timeouts
global_innodb_lock_wait_timeout = @connection.select_one("SHOW GLOBAL VARIABLES LIKE 'innodb_lock_wait_timeout'")
global_lock_wait_timeout = @connection.select_one("SHOW GLOBAL VARIABLES LIKE 'lock_wait_timeout'")
if global_innodb_lock_wait_timeout
@connection.execute("SET SESSION innodb_lock_wait_timeout=#{global_innodb_lock_wait_timeout['Value'].to_i + LOCK_WAIT_TIMEOUT_DELTA}")
end
if global_lock_wait_timeout
@connection.execute("SET SESSION lock_wait_timeout=#{global_lock_wait_timeout['Value'].to_i + LOCK_WAIT_TIMEOUT_DELTA}")
end
end
def run(options = {})
normalize_options(options)
set_session_lock_wait_timeouts
migration = @migrator.run
Entangler.new(migration, @connection).run do
Chunker.new(migration, @connection, options).run
if options[:atomic_switch]
AtomicSwitcher.new(migration, @connection).run
else
LockedSwitcher.new(migration, @connection).run
end
end
end
private
def normalize_options(options)
Lhm.logger.info "Starting LHM run on table=#{@migrator.name}"
unless options.include?(:atomic_switch)
if supports_atomic_switch?
options[:atomic_switch] = true
else
raise Error.new(
"Using mysql #{version_string}. You must explicitly set " \
'options[:atomic_switch] (re SqlHelper#supports_atomic_switch?)')
end
end
if options[:throttler]
options[:throttler] = Throttler::Factory.create_throttler(options[:throttler])
else
options[:throttler] = Lhm.throttler
end
rescue => e
Lhm.logger.error "LHM run failed with exception=#{e.class} message=#{e.message}"
raise
end
end
end