Skip to content

Commit

Permalink
add endpoint to list schema versions, needed to check schema agreement (
Browse files Browse the repository at this point in the history
#172)

* add endpoint to list schema versions, needed for checking schema agreement

* fix uri

* add @rpc annotation

* attempt to fix json parsing

* use method available across C* versions

* add api docs
  • Loading branch information
jsanda committed Jan 14, 2022
1 parent f20b469 commit 7acc28c
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -441,6 +442,12 @@ public List<Map<String, List<Map<String, String>>>> getStreamInfo()
return ShimLoader.instance.get().getStreamInfo();
}

@Rpc(name = "getSchemaVersions")
public Map<String, List<String>> getSchemaVersions()
{
return StorageProxy.instance.getSchemaVersions();
}

@Rpc(name = "getKeyspaces")
public List<String> getKeyspaces()
{
Expand Down
16 changes: 16 additions & 0 deletions management-api-server/doc/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,22 @@
},
"summary" : "Rebuild data by streaming data from other nodes. This operation returns immediately with a job id."
}
},
"/api/v1/ops/node/schema/versions" : {
"get" : {
"operationId" : "getSchemaVersions",
"responses" : {
"200" : {
"content" : {
"application/json" : {
"example" : "{2207c2a9-f598-3971-986b-2926e09e239d: [10.244.1.4, 10.244.2.3, 10.244.3.3]}"
}
},
"description" : "Gets the schema versions for each node. Useful for checking schema agreement"
}
},
"summary" : "Get schema versions."
}
}
},
"components" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@
import com.datastax.mgmtapi.CqlService;
import com.datastax.mgmtapi.ManagementApplication;
import com.datastax.mgmtapi.resources.helpers.ResponseTools;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.ExampleObject;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.*;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import static com.datastax.mgmtapi.resources.NodeOpsResources.handle;

@Path("/api/v1/ops/node")
public class NodeOpsResources {

Expand All @@ -40,7 +46,7 @@ public NodeOpsResources(ManagementApplication application) {
)
public Response decommission(@QueryParam(value="force")boolean force)
{
return com.datastax.mgmtapi.resources.NodeOpsResources.handle(() ->
return handle(() ->
Response.accepted(
ResponseTools.getSingleRowStringResponse(app.dbUnixSocketFile, cqlService,"CALL NodeOps.decommission(?, ?)", force, true)
).build());
Expand All @@ -59,10 +65,35 @@ public Response decommission(@QueryParam(value="force")boolean force)
)
public Response rebuild(@QueryParam("src_dc") String srcDatacenter)
{
return com.datastax.mgmtapi.resources.NodeOpsResources.handle(() ->
return handle(() ->
Response.accepted(
ResponseTools.getSingleRowStringResponse(app.dbUnixSocketFile, cqlService, "CALL NodeOps.rebuild(?)", srcDatacenter)
).build());
}

@GET
@Path("/schema/versions")
@Produces(MediaType.APPLICATION_JSON)
@ApiResponse(
responseCode = "200",
description = "Gets the schema versions for each node. Useful for checking schema agreement",
content = @Content(
mediaType = MediaType.APPLICATION_JSON,
examples = @ExampleObject(value = "{2207c2a9-f598-3971-986b-2926e09e239d: [10.244.1.4, 10.244.2.3, 10.244.3.3]}")))
@Operation(summary = "Get schema versions.", operationId = "getSchemaVersions")
public Response schemaVersions()
{
return handle(() ->
{
Row row = cqlService.executePreparedStatement(app.dbUnixSocketFile, "CALL NodeOps.getSchemaVersions()").one();

Map<String, List> schemaVersions = Collections.emptyMap();
if (row != null)
{
schemaVersions = row.getMap(0, String.class, List.class);
}
return Response.ok(schemaVersions).build();
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.assertj.core.data.MapEntry;
import org.assertj.core.util.Lists;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -539,6 +542,39 @@ public void testGetKeyspaces() throws IOException, URISyntaxException
assertEquals(ks, keyspaces.get(0));
}

@Test
public void testGetSchemaVersions() throws IOException, URISyntaxException
{
assumeTrue(IntegrationTestUtils.shouldRun());
ensureStarted();

NettyHttpClient client = new NettyHttpClient(BASE_URL);

URIBuilder uriBuilder = new URIBuilder("http://localhost:8080/api/v1/ops/node/schema/versions");
URI uri = uriBuilder.build();

Pair<Integer, String> response = client.get(uri.toURL()).thenApply(this::responseAsCodeAndBody).join();
assertThat(response.getLeft()).isEqualTo(HttpStatus.SC_OK);

// The response body should look something like this,
//
// 2207c2a9-f598-3971-986b-2926e09e239d: [10.244.1.4, 10.244.2.3, 10.244.3.3]
//
// The uuid is the schema version and list on the right are the nodes at that version. Because
// are only testing with a single node we should expect the list to contain a single value.

Map<String, List> actual = new JsonMapper().readValue(response.getRight(), new TypeReference<Map<String, List>>(){});
assertThat(actual).hasSizeGreaterThanOrEqualTo(1);

List nodes = Lists.emptyList();
for (Map.Entry<String, List> entry : actual.entrySet()) {
nodes = entry.getValue();
break;
}

assertThat(nodes.size()).isEqualTo(1);
}

@Test
public void testGetSnapshotDetails() throws IOException, URISyntaxException, InterruptedException
{
Expand Down

0 comments on commit 7acc28c

Please sign in to comment.