Skip to content

Commit

Permalink
fix(csi): fixed volume unpublishing for offlined nexuses
Browse files Browse the repository at this point in the history
ControllerUnpublishVolume() now successfully unpublishes the volumes
whose nexuses are located on offline nodes.

Resolves: CAS-1236
  • Loading branch information
mtzaurus committed Nov 15, 2021
1 parent 61c9efb commit bd8a01f
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 39 deletions.
8 changes: 6 additions & 2 deletions control-plane/csi-controller/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,15 @@ impl MayastorApiClient {

/// Unpublish volume (i.e. destroy a target which exposes the volume).
#[instrument(fields(volume.uuid = %volume_id), skip(volume_id))]
pub async fn unpublish_volume(&self, volume_id: &uuid::Uuid) -> Result<(), ApiClientError> {
pub async fn unpublish_volume(
&self,
volume_id: &uuid::Uuid,
force: bool,
) -> Result<(), ApiClientError> {
Self::delete_idempotent(
self.rest_client
.volumes_api()
.del_volume_target(volume_id, Some(false))
.del_volume_target(volume_id, Some(force))
.await,
true,
)?;
Expand Down
66 changes: 35 additions & 31 deletions control-plane/csi-controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,38 +431,40 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {

let uri =
// Volume is already published, make sure the protocol matches and get URI.
if let Some(target) = &volume.spec.target {
if target.protocol != Some(protocol) {
let m = format!(
"Volume {} already shared via different protocol: {:?}",
volume_id, target.protocol,
);
error!("{}", m);
return Err(Status::failed_precondition(m));
}

if let Some((node, uri)) = get_volume_share_location(&volume) {
// Make sure volume is published at the same node.
if node_id != node {
match &volume.spec.target {
Some(target) => {
if target.protocol != Some(protocol) {
let m = format!(
"Volume {} already published on a different node: {}",
volume_id, node,
"Volume {} already shared via different protocol: {:?}",
volume_id, target.protocol,
);
error!("{}", m);
return Err(Status::failed_precondition(m));
}

debug!("Volume {} already published at {}", volume_id, uri);
uri
} else {
let m = format!(
"Volume {} reports no info about its publishing status",
volume_id
);
error!("{}", m);
return Err(Status::internal(m));
}
} else {
if let Some((node, uri)) = get_volume_share_location(&volume) {
// Make sure volume is published at the same node.
if node_id != node {
let m = format!(
"Volume {} already published on a different node: {}",
volume_id, node,
);
error!("{}", m);
return Err(Status::failed_precondition(m));
}

debug!("Volume {} already published at {}", volume_id, uri);
uri
} else {
let m = format!(
"Volume {} reports no info about its publishing status",
volume_id
);
error!("{}", m);
return Err(Status::internal(m));
}
},
_ => {
// Volume is not published.
let v = MayastorApiClient::get_client()
.publish_volume(&volume_id, &node_id, protocol)
Expand All @@ -482,7 +484,8 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
error!("{}", m);
return Err(Status::internal(m));
}
};
}
};

// Prepare the context for the Mayastor Node CSI plugin.
let mut publish_context = HashMap::new();
Expand Down Expand Up @@ -524,11 +527,11 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
};

// Check if target volume is published and the node matches.
if let Some((node, _)) = get_volume_share_location(&volume) {
if !args.node_id.is_empty() && node != normalize_hostname(&args.node_id) {
if let Some(target) = &volume.spec.target.as_ref() {
if !args.node_id.is_empty() && target.node != normalize_hostname(&args.node_id) {
return Err(Status::not_found(format!(
"Volume {} is published on a different node: {}",
&args.volume_id, node
&args.volume_id, target.node
)));
}
} else {
Expand All @@ -540,8 +543,9 @@ impl rpc::csi::controller_server::Controller for CsiControllerSvc {
return Ok(Response::new(ControllerUnpublishVolumeResponse {}));
}

// Do forced volume upublish as Kubernetes already detached the volume.
MayastorApiClient::get_client()
.unpublish_volume(&volume_uuid)
.unpublish_volume(&volume_uuid, true)
.await
.map_err(|e| {
Status::not_found(format!(
Expand Down
6 changes: 6 additions & 0 deletions tests/bdd/features/csi/controller.feature
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,9 @@ Scenario: list local volume
When a ListVolumesRequest is sent to CSI controller
Then listed local volume must be accessible only from all existing Mayastor nodes
And no topology restrictions should be imposed to non-local volumes

Scenario: unpublish volume when nexus node is offline
Given a volume published on a node
When a node that hosts the nexus becomes offline
Then a ControllerUnpublishVolume request should succeed as if nexus node was online
And volume should be successfully republished on the other node
108 changes: 102 additions & 6 deletions tests/bdd/test_csi_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import csi_pb2 as pb
import csi_pb2_grpc as rpc
import grpc
from time import sleep
import subprocess

from urllib.parse import urlparse
Expand All @@ -27,17 +28,20 @@
VOLUME1_UUID = "d01b8bfb-0116-47b0-a03a-447fcbdc0e99"
VOLUME2_UUID = "d8aab0f1-82f4-406c-89ee-14f08b004aea"
VOLUME3_UUID = "f29b8e73-67d0-4b32-a8ea-a1277d48ef07"
VOLUME4_UUID = "955a12c4-707e-4040-9c4d-e9682213588f" # 2 replicas
NOT_EXISTING_VOLUME_UUID = "11111111-2222-3333-4444-555555555555"
PVC_VOLUME1_NAME = "pvc-%s" % VOLUME1_UUID
PVC_VOLUME2_NAME = "pvc-%s" % VOLUME2_UUID
PVC_VOLUME3_NAME = "pvc-%s" % VOLUME3_UUID
PVC_VOLUME4_NAME = "pvc-%s" % VOLUME4_UUID
POOL1_UUID = "ec176677-8202-4199-b461-2b68e53a055f"
POOL2_UUID = "bcabda21-9e66-4d81-8c75-bf9f3b687cdc"
NODE1 = "mayastor-1"
NODE2 = "mayastor-2"
VOLUME1_SIZE = 1024 * 1024 * 72
VOLUME2_SIZE = 1024 * 1024 * 32
VOLUME3_SIZE = 1024 * 1024 * 48
VOLUME1_SIZE = 1024 * 1024 * 32
VOLUME2_SIZE = 1024 * 1024 * 22
VOLUME3_SIZE = 1024 * 1024 * 28
VOLUME4_SIZE = 1024 * 1024 * 32
K8S_HOSTNAME = "kubernetes.io/hostname"


Expand All @@ -52,16 +56,20 @@ def setup():
pool_api.put_node_pool(
NODE1,
POOL1_UUID,
CreatePoolBody(["malloc:///disk?size_mb=96"], labels=pool_labels),
CreatePoolBody(["malloc:///disk?size_mb=128"], labels=pool_labels),
)
pool_api.put_node_pool(
NODE2,
POOL2_UUID,
CreatePoolBody(["malloc:///disk?size_mb=128"], labels=pool_labels),
)
yield
pool_api.del_pool(POOL1_UUID)
pool_api.del_pool(POOL2_UUID)

try:
pool_api.del_pool(POOL1_UUID)
pool_api.del_pool(POOL2_UUID)
except:
pass
common.deployer_stop()


Expand Down Expand Up @@ -176,6 +184,13 @@ def test_list_local_volume(setup):
"""list local volume"""


@scenario(
"features/csi/controller.feature", "unpublish volume when nexus node is offline"
)
def test_unpublish_volume_with_offline_nexus_node(setup):
"""unpublish volume when nexus node is offline"""


@given("a running CSI controller plugin", target_fixture="csi_instance")
def a_csi_instance():
return csi_rpc_handle()
Expand Down Expand Up @@ -216,6 +231,66 @@ def populate_published_volume(_create_1_replica_nvmf_volume):
return volume


@pytest.fixture
def _create_2_replica_nvmf_volume():
yield csi_create_2_replica_nvmf_volume4()
csi_delete_2_replica_nvmf_volume4()


@pytest.fixture
def populate_published_2_replica_volume(_create_2_replica_nvmf_volume):
do_publish_volume(VOLUME4_UUID, NODE1)

# Make sure volume is published.
volume = common.get_volumes_api().get_volume(VOLUME4_UUID)
assert (
str(volume.spec.target.protocol) == "nvmf"
), "Protocol mismatches for published volume"
assert (
volume.state.target["protocol"] == "nvmf"
), "Protocol mismatches for published volume"
return volume


@pytest.fixture
def start_stop_ms1():
docker_client = docker.from_env()
try:
node1 = docker_client.containers.list(all=True, filters={"name": NODE1})[0]
except docker.errors.NotFound:
raise Exception("No Mayastor instance found that hosts the nexus")
# Stop the nexus node and wait till nexus offline status is also reflected in volume target info.
# Wait at most 60 seconds.
node1.stop()
state_synced = False
for i in range(12):
vol = common.get_volumes_api().get_volume(VOLUME4_UUID)
if getattr(vol.state, "target", None) is None:
state_synced = True
break
sleep(5)
assert state_synced, "Nexus failure is not reflected in volume target info"
yield
node1.start()


@when(
"a node that hosts the nexus becomes offline", target_fixture="offline_nexus_node"
)
def offline_nexus_node(populate_published_2_replica_volume, start_stop_ms1):
pass


@then("a ControllerUnpublishVolume request should succeed as if nexus node was online")
def check_unpublish_volume_for_offline_nexus_node(offline_nexus_node):
do_unpublish_volume(VOLUME4_UUID, NODE1)


@then("volume should be successfully republished on the other node")
def check_republish_volume_for_offline_nexus_node(offline_nexus_node):
do_publish_volume(VOLUME4_UUID, NODE2)


@when(
"a CreateVolume request is sent to create a 1 replica local nvmf volume (local=true)",
target_fixture="create_1_replica_local_nvmf_volume",
Expand Down Expand Up @@ -584,6 +659,21 @@ def csi_create_1_replica_nvmf_volume1():
return csi_rpc_handle().controller.CreateVolume(req)


def csi_create_2_replica_nvmf_volume4():
capacity = pb.CapacityRange(required_bytes=VOLUME4_SIZE, limit_bytes=0)
parameters = {
"protocol": "nvmf",
"ioTimeout": "30",
"repl": "2",
}

req = pb.CreateVolumeRequest(
name=PVC_VOLUME4_NAME, capacity_range=capacity, parameters=parameters
)

return csi_rpc_handle().controller.CreateVolume(req)


def csi_create_1_replica_local_nvmf_volume():
capacity = pb.CapacityRange(required_bytes=VOLUME3_SIZE, limit_bytes=0)
parameters = {"protocol": "nvmf", "ioTimeout": "30", "repl": "1", "local": "true"}
Expand Down Expand Up @@ -645,6 +735,12 @@ def csi_delete_1_replica_nvmf_volume1():
)


def csi_delete_2_replica_nvmf_volume4():
csi_rpc_handle().controller.DeleteVolume(
pb.DeleteVolumeRequest(volume_id=VOLUME1_UUID)
)


def csi_delete_1_replica_local_nvmf_volume1():
csi_rpc_handle().controller.DeleteVolume(
pb.DeleteVolumeRequest(volume_id=VOLUME3_UUID)
Expand Down

0 comments on commit bd8a01f

Please sign in to comment.