-
Notifications
You must be signed in to change notification settings - Fork 1k
/
helper.rb
161 lines (133 loc) · 3.77 KB
/
helper.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# frozen_string_literal: true
require_relative "../../test/helper"
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))
require "redis-clustering"
require_relative 'support/orchestrator'
module Helper
module Cluster
include Generic
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORTS = (16_380..16_385).freeze
ClusterSlotsRawReply = lambda { |host, port|
# @see https://redis.io/topics/protocol
<<-REPLY.delete(' ')
*1\r
*4\r
:0\r
:16383\r
*3\r
$#{host.size}\r
#{host}\r
:#{port}\r
$40\r
649fa246273043021a05f547a79478597d3f1dc5\r
*3\r
$#{host.size}\r
#{host}\r
:#{port}\r
$40\r
649fa246273043021a05f547a79478597d3f1dc5\r
REPLY
}
ClusterNodesRawReply = lambda { |host, port|
line = "649fa246273043021a05f547a79478597d3f1dc5 #{host}:#{port}@17000 "\
'myself,master - 0 1530797742000 1 connected 0-16383'
"$#{line.size}\r\n#{line}\r\n"
}
def init(redis)
redis.flushall
redis
rescue Redis::CannotConnectError
puts <<-MSG
Cannot connect to Redis Cluster.
Make sure Redis is running on localhost, port #{DEFAULT_PORTS}.
Try this once:
$ make stop_cluster
Then run the build again:
$ make
MSG
exit! 1
end
def build_another_client(options = {})
_new_client(options)
end
def redis_cluster_mock(commands, options = {})
host = DEFAULT_HOST
port = nil
cluster_subcommands = if commands.key?(:cluster)
commands.delete(:cluster).transform_keys { |k| k.to_s.downcase }
else
{}
end
commands[:cluster] = lambda { |subcommand, *args|
subcommand = subcommand.downcase
if cluster_subcommands.key?(subcommand)
cluster_subcommands[subcommand].call(*args)
else
case subcommand.downcase
when 'slots' then ClusterSlotsRawReply.call(host, port)
when 'nodes' then ClusterNodesRawReply.call(host, port)
else '+OK'
end
end
}
commands[:command] = ->(*_) { "*0\r\n" }
RedisMock.start(commands, options) do |po|
port = po
scheme = options[:ssl] ? 'rediss' : 'redis'
nodes = %W[#{scheme}://#{host}:#{port}]
yield _new_client(options.merge(nodes: nodes))
end
end
def redis_cluster_down
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
trib.down
yield
ensure
trib.rebuild
trib.close
end
def redis_cluster_failover
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
trib.failover
yield
ensure
trib.rebuild
trib.close
end
def redis_cluster_fail_master
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
trib.fail_serving_master
yield
ensure
trib.restart_cluster_nodes
trib.rebuild
trib.close
end
# @param slot [Integer]
# @param src [String] <ip>:<port>
# @param dest [String] <ip>:<port>
def redis_cluster_resharding(slot, src:, dest:)
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
trib.start_resharding(slot, src, dest)
yield
trib.finish_resharding(slot, dest)
ensure
trib.rebuild
trib.close
end
private
def _default_nodes(host: DEFAULT_HOST, ports: DEFAULT_PORTS)
ports.map { |port| "redis://#{host}:#{port}" }
end
def _format_options(options)
{
timeout: OPTIONS[:timeout],
nodes: _default_nodes
}.merge(options)
end
def _new_client(options = {})
Redis::Cluster.new(_format_options(options).merge(driver: ENV['DRIVER']))
end
end
end