Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- `tt cluster failover switch`: minor change in output that displays corresponding
`switch-status` command with quoted URI argument so it could be copy-pasted for
subsequent launch as-is.
- `tt rs vshard bootstrap`: make more informative error message when sharding roles
are not configured (for example when launched against non-vshard cluster).

### Fixed

Expand Down
3 changes: 2 additions & 1 deletion cli/replicaset/cconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ var (
//go:embed lua/cconfig/bootstrap_vshard_body.lua
cconfigBootstrapVShardBody string

cconfigGetShardingRolesBody = "return require('config'):get().sharding.roles"
//go:embed lua/cconfig/get_sharding_roles_body.lua
cconfigGetShardingRolesBody string
)

// cconfigTopology used to export topology information from a Tarantool
Expand Down
10 changes: 7 additions & 3 deletions cli/replicaset/lua/cconfig/bootstrap_vshard_body.lua
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
local sharding = require('config'):get().sharding
if sharding == nil or sharding.roles == nil then
error("sharding roles are not configured, please make sure managed cluster is sharded")
end

local ok, vshard = pcall(require, 'vshard')
if not ok then
error("failed to require vshard module")
end
local fiber = require('fiber')
local config = require('config')

local is_router = false
for _, role in ipairs(config:get().sharding.roles) do
for _, role in ipairs(sharding.roles) do
if role == "router" then
is_router = true
break
Expand All @@ -19,6 +22,7 @@ end

pcall(vshard.router.master_search_wakeup)

local fiber = require('fiber')
local timeout = ...
local deadline = fiber.time() + timeout
local ok, err
Expand Down
5 changes: 5 additions & 0 deletions cli/replicaset/lua/cconfig/get_sharding_roles_body.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
local sharding = require('config'):get().sharding
if sharding == nil or sharding.roles == nil then
error("sharding roles are not configured, please make sure managed cluster is sharded")
end
return sharding.roles
42 changes: 25 additions & 17 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import os
import platform
import shutil
Expand Down Expand Up @@ -277,34 +278,41 @@ def tt_app(tt, tt_path, tt_instances, tt_running_targets, tt_post_start):
yield app


# Fixture to be used to avoid port collision in different tarantool instances.
@pytest.fixture(scope="session")
def port_factory():
# It is expected that step by step existent tests with hardcoded ports
# are adapted to use this fixture. It is also planned that the new tests
# are able to be run in parallel so for now let's start far beyond the
# default 3301 that is widely used in existent tests to avoid collision.
ports = itertools.count(5501)

def _port_factory():
return next(ports)

return _port_factory


@pytest.fixture
def cluster_params():
return None
return tt_helper.make_cluster_params(dict())


@pytest.fixture
def cluster(request, tt, cluster_params):
def cluster(request, tt, cluster_params, port_factory):
if utils.is_tarantool_less_3():
pytest.skip("centralized config requires Tarantool v3.x")

params = dict(
app_name="cluster_app",
num_replicasets=1,
num_replicas=3,
username="client",
password="secret",
)
if cluster_params is not None:
params.update(cluster_params)

input_params = [
params["num_replicasets"],
params["num_replicas"],
params["username"],
params["password"],
cluster_params["num_replicasets"],
cluster_params["num_replicas"],
cluster_params["username"],
cluster_params["password"],
]
app = tt_helper.TtCluster(tt, params["app_name"], input_params)
app = tt_helper.TtCluster(tt, cluster_params["app_name"], input_params)
request.addfinalizer(lambda: app.stop("--yes"))
app.update_ports(cluster_params["host"], port_factory)

return app


Expand Down
16 changes: 16 additions & 0 deletions test/integration/replicaset/test_replicaset_vshard.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,19 @@ def test_vshard_bootstrap_not_enough_timeout(tt_cmd, vshard_cconfig_app_timeout_
assert rc == 1
assert "failed to bootstrap vshard" in out
assert "attempt to index field '_configdata_applied' (a nil value)" in out


@pytest.mark.parametrize("target_type", ["APP", "INST", "URI"])
def test_vshard_bootstrap_non_vshard(tt, cluster, target_type):
targets = {
"APP": cluster.app_name,
"INST": f"{cluster.app_name}:{cluster.instances[0]['name']}",
"URI": f"client:secret@{cluster.instances[0]['endpoint']}",
}

p = cluster.start()
assert p.returncode == 0
assert cluster.wait_for_running(5)
p = tt.run("rs", "vshard", "bootstrap", targets[target_type])
assert p.returncode != 0
assert "sharding roles are not configured" in p.stdout
48 changes: 48 additions & 0 deletions test/tt_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,27 @@ def __init__(self, tt, app_name, input_params):
input = "".join(["\n" if x is None else f"{x}\n" for x in input_params])
p = self.__tt.run("create", "cluster", "--name", self.__app_name, input=input)
assert p.returncode == 0
self.__instances = None

@property
def app_name(self):
return self.__app_name

@property
def instances(self):
if self.__instances is None:
self.__instances = []
for group in self.config["groups"].values():
for rs in group["replicasets"].values():
for inst_name, inst in rs["instances"].items():
self.__instances.append(
dict(
name=inst_name,
endpoint=inst["iproto"]["listen"][0]["uri"],
),
)
return self.__instances

@property
def config_path(self):
return self.__tt.work_dir / self.__app_name / "config.yaml"
Expand All @@ -133,6 +149,7 @@ def config(self):

@config.setter
def config(self, config):
self.__instances = None
self.config_path.write_text(yaml.dump(config))

def start(self, *args):
Expand Down Expand Up @@ -165,6 +182,37 @@ def update_config_leaves(self, other):
utils.update_dict_leaves(config, other)
self.config = config

def update_instances_config(self, configure_instance_func, *args):
groups = {}
for group_name, group in self.config["groups"].items():
replicasets = {}
for rs_name, rs in group["replicasets"].items():
instances = {}
for inst_name in rs["instances"]:
instances[inst_name] = configure_instance_func(*args)
replicasets[rs_name] = {"instances": instances}
groups[group_name] = {"replicasets": replicasets}

self.update_config_leaves({"groups": groups})

def update_ports(self, host, port_factory):
def configure_instance_port(host, port_factory):
return {"iproto": {"listen": [{"uri": f"{host}:{port_factory()}"}]}}

self.update_instances_config(configure_instance_port, host, port_factory)


def make_cluster_params(params):
default_params = dict(
app_name="cluster_app",
num_replicasets=1,
num_replicas=3,
username="client",
password="secret",
host="127.0.0.1",
)
return default_params | params


def status(tt, *args):
rc, out = tt.exec("status", *args)
Expand Down
Loading