From 25f5dc5d7e38836a9fa018e8e2c5731b3f990263 Mon Sep 17 00:00:00 2001 From: xianjingfeng Date: Fri, 10 Mar 2023 15:58:55 +0800 Subject: [PATCH] [#80][Part-3] feat: add REST API for decommisson (#684) ### What changes were proposed in this pull request? Add REST API for decommisson ### Why are the changes needed? Support shuffle server decommission. It is a part of #80 ### Does this PR introduce _any_ user-facing change? Env: * Server IP: 127.0.0.1 * HTTP port: 19998 * RPC port: 19999 Decommission example: ```shell curl -XPOST -H "Content-type:application/json" "http://127.0.0.1:19998/api/server/decommission" -d '{"serverIds:": ["127.0.0.1:19999"]}' ``` Cancel decommission example: ```shell curl -XPOST -H "Content-type:application/json" "http://127.0.0.1:19998/api/server/cancelDecommission" -d '{"serverIds:": ["127.0.0.1:19999"]}' ``` Get server list: ```shell # path: /api/server/nodes[?id={serverId}][?status={serverStatus}] curl "http://127.0.0.1:19998/api/server/nodes?status=DECOMMISSIONING" curl "http://127.0.0.1:19998/api/server/nodes?status=ACTIVE" ``` ### How was this patch tested? UT --- .../uniffle/common/ServerStatusTest.java | 2 +- .../uniffle/common/metrics/TestUtils.java | 37 ++++- .../uniffle/common/rpc/StatusCodeTest.java | 2 +- coordinator/pom.xml | 1 + .../coordinator/CoordinatorServer.java | 17 ++ .../uniffle/coordinator/web/Response.java | 71 ++++++++ .../request/CancelDecommissionRequest.java | 32 ++++ .../web/request/DecommissionRequest.java | 32 ++++ .../coordinator/web/servlet/BaseServlet.java | 84 ++++++++++ .../servlet/CancelDecommissionServlet.java | 50 ++++++ .../web/servlet/DecommissionServlet.java | 50 ++++++ .../coordinator/web/servlet/NodesServlet.java | 56 +++++++ .../metric/CoordinatorMetricsTest.java | 8 +- docs/coordinator_guide.md | 58 +++++++ .../org/apache/uniffle/test/ServletTest.java | 157 ++++++++++++++++++ .../server/ShuffleServerMetricsTest.java | 8 +- .../uniffle/server/ShuffleServerTest.java | 6 +- 17 files changed, 650 insertions(+), 21 deletions(-) create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/web/Response.java create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/CancelDecommissionRequest.java create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/DecommissionRequest.java create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java create mode 100644 integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java diff --git a/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java b/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java index e449215aad..2546cfa6d1 100644 --- a/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java +++ b/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java @@ -55,7 +55,7 @@ public void test() throws Exception { fail(e.getMessage()); } } - for (int i = 0; i < serverStatuses.size() - 1; i++) { + for (int i = 0; i < serverStatuses.size(); i++) { assertEquals(protoServerStatuses.get(i), serverStatuses.get(i).toProto()); assertEquals(ServerStatus.fromProto(protoServerStatuses.get(i)), serverStatuses.get(i)); } diff --git a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java index 1233f9b535..ea623ea5d4 100644 --- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java +++ b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java @@ -20,6 +20,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; @@ -28,18 +29,38 @@ public class TestUtils { private TestUtils() { } - public static String httpGetMetrics(String urlString) throws IOException { + public static String httpGet(String urlString) throws IOException { URL url = new URL(urlString); HttpURLConnection con = (HttpURLConnection) url.openConnection(); con.setRequestMethod("GET"); - BufferedReader in = new BufferedReader( - new InputStreamReader(con.getInputStream())); - String inputLine; - StringBuffer content = new StringBuffer(); - while ((inputLine = in.readLine()) != null) { - content.append(inputLine); + StringBuilder content = new StringBuilder(); + try (BufferedReader in = new BufferedReader( + new InputStreamReader(con.getInputStream()));) { + String inputLine; + while ((inputLine = in.readLine()) != null) { + content.append(inputLine); + } } - in.close(); + return content.toString(); + } + + public static String httpPost(String urlString, String postData) throws IOException { + URL url = new URL(urlString); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setDoOutput(true); + con.setRequestMethod("POST"); + StringBuilder content = new StringBuilder(); + try (OutputStream outputStream = con.getOutputStream();) { + outputStream.write(postData.getBytes()); + try (BufferedReader in = new BufferedReader( + new InputStreamReader(con.getInputStream()));) { + String inputLine; + while ((inputLine = in.readLine()) != null) { + content.append(inputLine); + } + } + } + return content.toString(); } } diff --git a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java index 5956c1d34d..4e9c5b2f82 100644 --- a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java +++ b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java @@ -55,7 +55,7 @@ public void test() throws Exception { fail(e.getMessage()); } } - for (int i = 0; i < statusCodes.size() - 1; i++) { + for (int i = 0; i < statusCodes.size(); i++) { assertEquals(protoStatusCode.get(i), statusCodes.get(i).toProto()); assertEquals(StatusCode.fromProto(protoStatusCode.get(i)), statusCodes.get(i)); } diff --git a/coordinator/pom.xml b/coordinator/pom.xml index ac6b2ec5e8..df147d3ffc 100644 --- a/coordinator/pom.xml +++ b/coordinator/pom.xml @@ -107,6 +107,7 @@ com.google.protobuf:protobuf-java-util com.google.guava:guava + com.google.guava:failureaccess com.fasterxml.jackson.core:jackson-databind com.fasterxml.jackson.core:jackson-core diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java index 00b0a514af..bfec239d12 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java @@ -41,6 +41,9 @@ import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy; import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory; import org.apache.uniffle.coordinator.util.CoordinatorUtils; +import org.apache.uniffle.coordinator.web.servlet.CancelDecommissionServlet; +import org.apache.uniffle.coordinator.web.servlet.DecommissionServlet; +import org.apache.uniffle.coordinator.web.servlet.NodesServlet; import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE; import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE; @@ -153,6 +156,7 @@ private void initialization() throws Exception { id = ip + "-" + port; LOG.info("Start to initialize coordinator {}", id); jettyServer = new JettyServer(coordinatorConf); + registerRESTAPI(); // register metrics first to avoid NPE problem when add dynamic metrics registerMetrics(); coordinatorConf.setString(CoordinatorUtils.COORDINATOR_ID, id); @@ -185,6 +189,19 @@ private void initialization() throws Exception { server = coordinatorFactory.getServer(); } + private void registerRESTAPI() throws Exception { + LOG.info("Register REST API"); + jettyServer.addServlet( + new NodesServlet(this), + "/api/server/nodes"); + jettyServer.addServlet( + new DecommissionServlet(this), + "/api/server/decommission"); + jettyServer.addServlet( + new CancelDecommissionServlet(this), + "/api/server/cancelDecommission"); + } + private void registerMetrics() throws Exception { LOG.info("Register metrics"); CollectorRegistry coordinatorCollectorRegistry = new CollectorRegistry(true); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/Response.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/Response.java new file mode 100644 index 0000000000..b883d55fde --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/Response.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web; + +public class Response { + private static final int SUCCESS_CODE = 0; + private static final int ERROR_CODE = -1; + private int code; + private T data; + private String errMsg; + + public Response() { + } + + public Response(int code, T data, String errMsg) { + this.code = code; + this.data = data; + this.errMsg = errMsg; + } + + public static Response success(T data) { + return new Response<>(SUCCESS_CODE, data, null); + } + + public static Response fail(String msg) { + return new Response<>(ERROR_CODE, null, msg); + } + + public static Response fail(String msg, int code) { + return new Response<>(code, null, msg); + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public T getData() { + return data; + } + + public void setData(T data) { + this.data = data; + } + + public String getErrMsg() { + return errMsg; + } + + public void setErrMsg(String errMsg) { + this.errMsg = errMsg; + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/CancelDecommissionRequest.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/CancelDecommissionRequest.java new file mode 100644 index 0000000000..997a135ea4 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/CancelDecommissionRequest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.request; + +import java.util.Set; + +public class CancelDecommissionRequest { + private Set serverIds; + + public Set getServerIds() { + return serverIds; + } + + public void setServerIds(Set serverIds) { + this.serverIds = serverIds; + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/DecommissionRequest.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/DecommissionRequest.java new file mode 100644 index 0000000000..c11a716c43 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/request/DecommissionRequest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.request; + +import java.util.Set; + +public class DecommissionRequest { + private Set serverIds; + + public Set getServerIds() { + return serverIds; + } + + public void setServerIds(Set serverIds) { + this.serverIds = serverIds; + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java new file mode 100644 index 0000000000..a67701f224 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.servlet; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.Callable; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.uniffle.coordinator.web.Response; + +public abstract class BaseServlet extends HttpServlet { + public static final String JSON_MIME_TYPE = "application/json"; + final ObjectMapper mapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { + writeJSON(resp, handlerRequest(() -> handleGet(req, resp))); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { + writeJSON(resp, handlerRequest(() -> handlePost(req, resp))); + } + + private Response handlerRequest( + Callable function) { + Response response; + try { + // todo: Do something for authentication + response = function.call(); + } catch (Exception e) { + response = Response.fail(e.getMessage()); + } + return response; + } + + protected Response handleGet( + HttpServletRequest req, + HttpServletResponse resp) throws ServletException, IOException { + throw new IOException("Method not support!"); + } + + protected Response handlePost( + HttpServletRequest req, + HttpServletResponse resp) throws ServletException, IOException { + throw new IOException("Method not support!"); + } + + protected void writeJSON(final HttpServletResponse resp, final Object obj) + throws IOException { + if (obj == null) { + return; + } + resp.setContentType(JSON_MIME_TYPE); + final OutputStream stream = resp.getOutputStream(); + mapper.writeValue(stream, obj); + } + + protected T parseParamsFromJson(HttpServletRequest req, Class clazz) throws IOException { + return mapper.readValue(req.getInputStream(), clazz); + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java new file mode 100644 index 0000000000..b7411d4e22 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.servlet; + +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.collections.CollectionUtils; + +import org.apache.uniffle.coordinator.ClusterManager; +import org.apache.uniffle.coordinator.CoordinatorServer; +import org.apache.uniffle.coordinator.web.Response; +import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest; + +public class CancelDecommissionServlet extends BaseServlet { + private final CoordinatorServer coordinator; + + public CancelDecommissionServlet(CoordinatorServer coordinator) { + this.coordinator = coordinator; + } + + @Override + protected Response handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException { + CancelDecommissionRequest params = parseParamsFromJson(req, CancelDecommissionRequest.class); + if (CollectionUtils.isEmpty(params.getServerIds())) { + return Response.fail("Parameter[serverIds] should not be null!"); + } + ClusterManager clusterManager = coordinator.getClusterManager(); + params.getServerIds().forEach((serverId) -> { + clusterManager.cancelDecommission(serverId); + }); + return Response.success(null); + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java new file mode 100644 index 0000000000..96f06dd3c2 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.servlet; + +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.collections.CollectionUtils; + +import org.apache.uniffle.coordinator.ClusterManager; +import org.apache.uniffle.coordinator.CoordinatorServer; +import org.apache.uniffle.coordinator.web.Response; +import org.apache.uniffle.coordinator.web.request.DecommissionRequest; + +public class DecommissionServlet extends BaseServlet { + private final CoordinatorServer coordinator; + + public DecommissionServlet(CoordinatorServer coordinator) { + this.coordinator = coordinator; + } + + @Override + protected Response handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException { + DecommissionRequest params = parseParamsFromJson(req, DecommissionRequest.class); + if (CollectionUtils.isEmpty(params.getServerIds())) { + return Response.fail("Parameter[serverIds] should not be null!"); + } + ClusterManager clusterManager = coordinator.getClusterManager(); + params.getServerIds().forEach((serverId) -> { + clusterManager.decommission(serverId); + }); + return Response.success(null); + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java new file mode 100644 index 0000000000..048502352a --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.servlet; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.uniffle.coordinator.CoordinatorServer; +import org.apache.uniffle.coordinator.ServerNode; +import org.apache.uniffle.coordinator.web.Response; + + +public class NodesServlet extends BaseServlet { + private final CoordinatorServer coordinator; + + public NodesServlet(CoordinatorServer coordinator) { + this.coordinator = coordinator; + } + + @Override + protected Response handleGet(HttpServletRequest req, HttpServletResponse resp) { + List serverList = coordinator.getClusterManager().getServerList(Collections.EMPTY_SET); + String id = req.getParameter("id"); + String status = req.getParameter("status"); + serverList = serverList.stream().filter((server) -> { + if (id != null && !id.equals(server.getId())) { + return false; + } + if (status != null && !server.getStatus().toString().equals(status)) { + return false; + } + return true; + }).collect(Collectors.toList()); + Collections.sort(serverList, Comparator.comparing(ServerNode::getId)); + return Response.success(serverList); + } +} diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java index 945cafda2d..454e6bb83d 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java @@ -68,7 +68,7 @@ public static void tearDown() throws Exception { @Test public void testDynamicMetrics() throws Exception { - String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL); + String content = TestUtils.httpGet(SERVER_METRICS_URL); ObjectMapper mapper = new ObjectMapper(); JsonNode metricsNode = mapper.readTree(content).get("metrics"); String remoteStorageMetricsName = CoordinatorMetrics.REMOTE_STORAGE_IN_USED_PREFIX + "path1"; @@ -85,7 +85,7 @@ public void testDynamicMetrics() throws Exception { @Test public void testCoordinatorMetrics() throws Exception { - String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL); + String content = TestUtils.httpGet(SERVER_METRICS_URL); ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(content); assertEquals(2, actualObj.size()); @@ -101,7 +101,7 @@ public void testCoordinatorMetrics() throws Exception { @Test public void testJvmMetrics() throws Exception { - String content = TestUtils.httpGetMetrics(SERVER_JVM_URL); + String content = TestUtils.httpGet(SERVER_JVM_URL); ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(content); assertEquals(2, actualObj.size()); @@ -109,7 +109,7 @@ public void testJvmMetrics() throws Exception { @Test public void testGrpcMetrics() throws Exception { - String content = TestUtils.httpGetMetrics(SERVER_GRPC_URL); + String content = TestUtils.httpGet(SERVER_GRPC_URL); ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(content); assertEquals(2, actualObj.size()); diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md index 09593f55e3..d3815a040b 100644 --- a/docs/coordinator_guide.md +++ b/docs/coordinator_guide.md @@ -136,3 +136,61 @@ PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, whi |rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). | |rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed. | |rss.metrics.prometheus.pushgateway.report.interval.seconds|10| The interval in seconds for the reporter to report metrics. | + +## RESTful API(beta) + +### Fetch Shuffle servers + +
+ GET /api/server/nodes + +##### Parameters + +> |name|type|data type|description| +> |----|----|---------|-----------| +> |id|required|string|shuffle server id, eg:127.0.0.1:19999| +> |status|optional|string|Shuffle server status, eg:ACTIVE, DECOMMISSIONING, DECOMMISSIONED| + +##### Example cURL + +> ```bash +> curl -X GET http://localhost:19998/api/server/nodes +> ``` +
+ +### Decommission shuffle servers + +
+ POST /api/server/decommission + +##### Parameters + +> |name|type| data type |description| +> |----|-------------------|---------|-----------| +> |serverIds|required| array |Shuffle server array, eg:["127.0.0.1:19999"]| +> +##### Example cURL + +> ```bash +> curl -X POST -H "Content-Type: application/json" http://localhost:19998/api/server/decommission -d '{"serverIds:": ["127.0.0.1:19999"]}' +> ``` +
+ + +### Cancel decommission shuffle servers + +
+ POST /api/server/cancelDecommission + +##### Parameters + +> |name|type| data type |description| +> |----|-------------------|---------|-----------| +> |serverIds|required| array |Shuffle server array, eg:["127.0.0.1:19999"]| +> +##### Example cURL + +> ```bash +> curl -X POST -H "Content-Type: application/json" http://localhost:19998/api/server/cancelDecommission -d '{"serverIds:": ["127.0.0.1:19999"]}' +> ``` +
\ No newline at end of file diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java new file mode 100644 index 0000000000..1963195e24 --- /dev/null +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; +import org.apache.uniffle.client.request.RssRegisterShuffleRequest; +import org.apache.uniffle.common.PartitionRange; +import org.apache.uniffle.common.ServerStatus; +import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.metrics.TestUtils; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.CoordinatorServer; +import org.apache.uniffle.coordinator.web.Response; +import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest; +import org.apache.uniffle.coordinator.web.request.DecommissionRequest; +import org.apache.uniffle.server.ShuffleServer; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.storage.util.StorageType; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class ServletTest extends IntegrationTestBase { + private static final String URL_PREFIX = "http://127.0.0.1:12345/api/"; + private static final String NODES_URL = URL_PREFIX + "server/nodes"; + private static final String DECOMMISSION_URL = URL_PREFIX + "server/decommission"; + private static final String CANCEL_DECOMMISSION_URL = URL_PREFIX + "server/cancelDecommission"; + private static CoordinatorServer coordinatorServer; + private ObjectMapper objectMapper = new ObjectMapper(); + + @BeforeAll + public static void setUp(@TempDir File tmpDir) throws Exception { + CoordinatorConf coordinatorConf = new CoordinatorConf(); + coordinatorConf.set(RssBaseConf.JETTY_HTTP_PORT, 12345); + coordinatorConf.set(RssBaseConf.JETTY_CORE_POOL_SIZE, 128); + coordinatorConf.set(RssBaseConf.RPC_SERVER_PORT, 12346); + createCoordinatorServer(coordinatorConf); + + ShuffleServerConf shuffleServerConf = getShuffleServerConf(); + shuffleServerConf.set(RssBaseConf.RSS_COORDINATOR_QUORUM, "127.0.0.1:12346"); + File dataDir1 = new File(tmpDir, "data1"); + File dataDir2 = new File(tmpDir, "data2"); + List basePath = Lists.newArrayList(dataDir1.getAbsolutePath(), dataDir2.getAbsolutePath()); + shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name()); + shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath); + createShuffleServer(shuffleServerConf); + File dataDir3 = new File(tmpDir, "data3"); + File dataDir4 = new File(tmpDir, "data4"); + basePath = Lists.newArrayList(dataDir3.getAbsolutePath(), dataDir4.getAbsolutePath()); + shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath); + shuffleServerConf.set(RssBaseConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT + 1); + shuffleServerConf.set(RssBaseConf.JETTY_HTTP_PORT, 18081); + createShuffleServer(shuffleServerConf); + startServers(); + coordinatorServer = coordinators.get(0); + Awaitility.await().timeout(30, TimeUnit.SECONDS).until(() -> + coordinatorServer.getClusterManager().list().size() == 2); + } + + @Test + public void testNodesServlet() throws Exception { + String content = TestUtils.httpGet(NODES_URL); + Response> response = objectMapper.readValue(content, new TypeReference>>() { + }); + List serverList = response.getData(); + assertEquals(0, response.getCode()); + assertEquals(2, serverList.size()); + assertEquals(SHUFFLE_SERVER_PORT, Integer.parseInt(serverList.get(0).get("port").toString())); + assertEquals(ServerStatus.ACTIVE.toString(), serverList.get(0).get("status")); + assertEquals(SHUFFLE_SERVER_PORT + 1, Integer.parseInt(serverList.get(1).get("port").toString())); + assertEquals(ServerStatus.ACTIVE.toString(), serverList.get(1).get("status")); + + // Only fetch one server. + ShuffleServer shuffleServer = shuffleServers.get(0); + content = TestUtils.httpGet(NODES_URL + "?id=" + shuffleServer.getId()); + response = objectMapper.readValue(content, new TypeReference>>() { + }); + serverList = response.getData(); + assertEquals(1, serverList.size()); + assertEquals(shuffleServer.getId(), serverList.get(0).get("id")); + + content = TestUtils.httpGet(NODES_URL + "?status=DECOMMISSIONED"); + response = objectMapper.readValue(content, new TypeReference>>() {}); + serverList = response.getData(); + assertEquals(0, serverList.size()); + content = TestUtils.httpGet(NODES_URL + "?status=ACTIVE"); + response = objectMapper.readValue(content, new TypeReference>>() {}); + serverList = response.getData(); + assertEquals(2, serverList.size()); + } + + @Test + public void testDecommissionServlet() throws Exception { + ShuffleServer shuffleServer = shuffleServers.get(0); + assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus()); + DecommissionRequest decommissionRequest = new DecommissionRequest(); + decommissionRequest.setServerIds(Sets.newHashSet("not_exist_serverId")); + String content = TestUtils.httpPost(CANCEL_DECOMMISSION_URL, objectMapper.writeValueAsString(decommissionRequest)); + Response response = objectMapper.readValue(content, Response.class); + assertEquals(-1, response.getCode()); + assertNotNull(response.getErrMsg()); + CancelDecommissionRequest cancelDecommissionRequest = new CancelDecommissionRequest(); + cancelDecommissionRequest.setServerIds(Sets.newHashSet(shuffleServer.getId())); + content = TestUtils.httpPost(CANCEL_DECOMMISSION_URL, objectMapper.writeValueAsString(cancelDecommissionRequest)); + response = objectMapper.readValue(content, Response.class); + assertEquals(0, response.getCode()); + + // Register shuffle, avoid server exiting immediately. + ShuffleServerGrpcClient shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT); + shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest("testDecommissionServlet_appId", 0, + Lists.newArrayList(new PartitionRange(0, 1)), "")); + decommissionRequest.setServerIds(Sets.newHashSet(shuffleServer.getId())); + content = TestUtils.httpPost(DECOMMISSION_URL, objectMapper.writeValueAsString(decommissionRequest)); + response = objectMapper.readValue(content, Response.class); + assertEquals(0, response.getCode()); + assertEquals(ServerStatus.DECOMMISSIONING, shuffleServer.getServerStatus()); + + // Wait until shuffle server send heartbeat to coordinator. + Awaitility.await().timeout(10, TimeUnit.SECONDS).until(() -> + ServerStatus.DECOMMISSIONING.equals( + coordinatorServer.getClusterManager().getServerNodeById(shuffleServer.getId()).getStatus())); + // Cancel decommission. + content = TestUtils.httpPost(CANCEL_DECOMMISSION_URL, objectMapper.writeValueAsString(cancelDecommissionRequest)); + response = objectMapper.readValue(content, Response.class); + assertEquals(0, response.getCode()); + assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus()); + } +} diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java index b995a90f3b..99993eb489 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java @@ -77,7 +77,7 @@ public static void tearDown() throws Exception { @Test public void testJvmMetrics() throws Exception { - String content = TestUtils.httpGetMetrics(SERVER_JVM_URL); + String content = TestUtils.httpGet(SERVER_JVM_URL); ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(content); assertEquals(2, actualObj.size()); @@ -85,7 +85,7 @@ public void testJvmMetrics() throws Exception { @Test public void testServerMetrics() throws Exception { - String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL); + String content = TestUtils.httpGet(SERVER_METRICS_URL); ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(content); assertEquals(2, actualObj.size()); @@ -145,7 +145,7 @@ public void testStorageCounter() { @Test public void testGrpcMetrics() throws Exception { - String content = TestUtils.httpGetMetrics(SERVER_GRPC_URL); + String content = TestUtils.httpGet(SERVER_GRPC_URL); ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(content); assertEquals(2, actualObj.size()); @@ -187,7 +187,7 @@ public Void call() throws Exception { f.get(); } - String content = TestUtils.httpGetMetrics(SERVER_METRICS_URL); + String content = TestUtils.httpGet(SERVER_METRICS_URL); ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(content); diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java index 19c9513e91..6dc5e56eb0 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java @@ -50,7 +50,6 @@ public void startTest() { ShuffleServerConf serverConf = createShuffleServerConf(); ShuffleServer ss1 = new ShuffleServer(serverConf); ss1.start(); - ss1.stopServer(); ExitUtils.disableSystemExit(); ShuffleServer ss2 = new ShuffleServer(serverConf); String expectMessage = "Fail to start jetty http server"; @@ -61,7 +60,6 @@ public void startTest() { assertEquals(expectMessage, e.getMessage()); assertEquals(expectStatus, ((ExitException) e).getStatus()); } - ss2.stopServer(); serverConf.setInteger("rss.jetty.http.port", 9529); ss2 = new ShuffleServer(serverConf); @@ -72,7 +70,7 @@ public void startTest() { assertEquals(expectMessage, e.getMessage()); assertEquals(expectStatus, ((ExitException) e).getStatus()); } - ss2.stopServer(); + ss1.stopServer(); final Thread t = new Thread(null, () -> { throw new AssertionError("TestUncaughtException"); @@ -92,6 +90,8 @@ public void decommissionTest(boolean shutdown) throws Exception { ShuffleServerConf serverConf = createShuffleServerConf(); serverConf.set(SERVER_DECOMMISSION_CHECK_INTERVAL, 1000L); serverConf.set(SERVER_DECOMMISSION_SHUTDOWN, shutdown); + serverConf.set(ShuffleServerConf.RPC_SERVER_PORT, 19527); + serverConf.set(ShuffleServerConf.JETTY_HTTP_PORT, 19528); ShuffleServer shuffleServer = new ShuffleServer(serverConf); shuffleServer.start(); assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());