-
Notifications
You must be signed in to change notification settings - Fork 2k
/
workflow_horizontal_resharding.py
133 lines (109 loc) · 4.38 KB
/
workflow_horizontal_resharding.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright 2016 Google Inc. All rights reserved.
# Use of this source code is governed by a BSD-style license that can
# be found in the LICENSE file.
"""End-to-end test for horizontal resharding workflow."""
# pylint: disable=unused-import
import re
import unittest
import utils
import worker
def setUpModule():
try:
worker.setUpModule()
utils.Vtctld().start()
except:
tearDownModule()
raise
def tearDownModule():
worker.tearDownModule()
class TestWorkflowHorizontalResharding(worker.TestBaseSplitClone):
"""End-to-end test for horizontal resharding workflow.
This test reuses worker.py, which covers the happy path
of the horizontal resharding code.
"""
KEYSPACE = 'test_keyspace'
def test_successful_resharding(self):
"""Normal horizontal resharding, "happy path test"."""
worker_proc, _, worker_rpc_port = utils.run_vtworker_bg(
['--cell', 'test_nj'], auto_log=True)
vtworker_endpoint = 'localhost:' + str(worker_rpc_port)
self._error = None
source_shard_list = '0'
dest_shard_list = '-80,80-'
# uncomment this can test the workflow UI
# utils.pause('Now is a good time to look at vtctld UI at:
# localhost:%s' % (utils.vtctld.port))
vtctl_res = utils.run_vtctl(
[
'WorkflowCreate', 'horizontal_resharding',
'-keyspace=test_keyspace',
'-source_shard_list=%s' % source_shard_list,
'-destination_shard_list=%s' % dest_shard_list,
'-vtworker_server_address=%s' % vtworker_endpoint
],
auto_log=True)
hw_uuid = re.match(r'^uuid: (.*)$', vtctl_res[0]).group(1)
utils.pause('Now is a good time to look at vtctld UI at: '
'%s, workflow uuid=%s' % (utils.vtctld.port, hw_uuid))
vtctl_res = utils.run_vtctl(['WorkflowWait', hw_uuid])
self.verify()
utils.kill_sub_process(worker_proc, soft=True)
def verify(self):
self.assert_shard_data_equal(0, worker.shard_master,
worker.shard_0_tablets.replica)
self.assert_shard_data_equal(1, worker.shard_master,
worker.shard_1_tablets.replica)
# Verify effect of MigrateServedTypes. Dest shards are serving now.
utils.check_srv_keyspace('test_nj', self.KEYSPACE,
'Partitions(master): -80 80-\n'
'Partitions(rdonly): -80 80-\n'
'Partitions(replica): -80 80-\n')
utils.run_vtctl(
['RunHealthCheck', worker.shard_rdonly1.tablet_alias], auto_log=True)
# source shard: query service must be disabled after MigrateServedTypes.
utils.check_tablet_query_service(
self, worker.shard_rdonly1, serving=False, tablet_control_disabled=True)
utils.check_tablet_query_service(
self, worker.shard_replica, serving=False, tablet_control_disabled=True)
utils.check_tablet_query_service(
self, worker.shard_master, serving=False, tablet_control_disabled=True)
# dest shard -80: query service must be disabled after MigrateServedTypes.
# Run explicit healthcheck because 'rdonly' tablet may still be 'spare'.
utils.run_vtctl(
['RunHealthCheck', worker.shard_0_rdonly1.tablet_alias], auto_log=True)
utils.check_tablet_query_service(
self,
worker.shard_0_rdonly1,
serving=True,
tablet_control_disabled=False)
utils.check_tablet_query_service(
self,
worker.shard_0_replica,
serving=True,
tablet_control_disabled=False)
utils.check_tablet_query_service(
self,
worker.shard_0_master,
serving=True,
tablet_control_disabled=False)
# dest shard 80-: query service must be disabled after MigrateServedTypes.
# Run explicit healthcheck because 'rdonly' tablet is still 'spare'.
utils.run_vtctl(
['RunHealthCheck', worker.shard_1_rdonly1.tablet_alias], auto_log=True)
utils.check_tablet_query_service(
self,
worker.shard_1_rdonly1,
serving=True,
tablet_control_disabled=False)
utils.check_tablet_query_service(
self,
worker.shard_1_replica,
serving=True,
tablet_control_disabled=False)
utils.check_tablet_query_service(
self,
worker.shard_1_master,
serving=True,
tablet_control_disabled=False)
if __name__ == '__main__':
utils.main(test_options=worker.add_test_options)