Skip to content

Commit

Permalink
[Core][Observability]Java worker adds job_id and state filters for "g…
Browse files Browse the repository at this point in the history
…et all actor info" api (#39574)

Signed-off-by: LarryLian <554538252@qq.com>
  • Loading branch information
larrylian committed Sep 15, 2023
1 parent 9c2fb41 commit 515fbe8
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public interface RuntimeContext {
*/
List<ActorInfo> getAllActorInfo();

/** Get all actor information of Ray cluster filtered by job id or actor state. */
public List<ActorInfo> getAllActorInfo(JobId jobId, ActorState actorState);

/**
* Get the handle to the current actor itself. Note that this method must be invoked in an actor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
import io.ray.api.runtimecontext.ActorInfo;
import io.ray.api.runtimecontext.ActorState;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.api.runtimecontext.ResourceValue;
import io.ray.api.runtimecontext.RuntimeContext;
Expand Down Expand Up @@ -68,7 +69,12 @@ public List<NodeInfo> getAllNodeInfo() {

@Override
public List<ActorInfo> getAllActorInfo() {
return runtime.getGcsClient().getAllActorInfo();
return runtime.getGcsClient().getAllActorInfo(null, null);
}

@Override
public List<ActorInfo> getAllActorInfo(JobId jobId, ActorState actorState) {
return runtime.getGcsClient().getAllActorInfo(jobId, actorState);
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ public List<NodeInfo> getAllNodeInfo() {
return new ArrayList<>(nodes.values());
}

public List<ActorInfo> getAllActorInfo() {
public List<ActorInfo> getAllActorInfo(JobId jobId, ActorState actorState) {
List<ActorInfo> actorInfos = new ArrayList<>();
List<byte[]> results = globalStateAccessor.getAllActorInfo();
List<byte[]> results = globalStateAccessor.getAllActorInfo(jobId, actorState);
results.forEach(
result -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.google.common.base.Preconditions;
import io.ray.api.id.ActorId;
import io.ray.api.id.JobId;
import io.ray.api.id.PlacementGroupId;
import io.ray.api.runtimecontext.ActorState;
import java.util.List;

/** `GlobalStateAccessor` is used for accessing information from GCS. */
Expand Down Expand Up @@ -101,11 +103,20 @@ public byte[] getInternalKV(String n, String k) {
}

/** Returns A list of actor info with ActorInfo protobuf schema. */
public List<byte[]> getAllActorInfo() {
public List<byte[]> getAllActorInfo(JobId jobId, ActorState actorState) {
// Fetch a actor list with protobuf bytes format from GCS.
synchronized (GlobalStateAccessor.class) {
validateGlobalStateAccessorPointer();
return this.nativeGetAllActorInfo(globalStateAccessorNativePointer);
byte[] jobIdBytes = null;
String actorStateName = null;
if (jobId != null) {
jobIdBytes = jobId.getBytes();
}
if (actorState != null) {
actorStateName = actorState.getName();
}
return this.nativeGetAllActorInfo(
globalStateAccessorNativePointer, jobIdBytes, actorStateName);
}
}

Expand Down Expand Up @@ -149,7 +160,8 @@ private void destroyGlobalStateAccessor() {

private native List<byte[]> nativeGetAllNodeInfo(long nativePtr);

private native List<byte[]> nativeGetAllActorInfo(long nativePtr);
private native List<byte[]> nativeGetAllActorInfo(
long nativePtr, byte[] jobId, String actor_state_name);

private native byte[] nativeGetActorInfo(long nativePtr, byte[] actorId);

Expand Down
59 changes: 59 additions & 0 deletions java/test/src/main/java/io/ray/test/GetActorInfoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,31 @@
import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.id.JobId;
import io.ray.api.runtimecontext.ActorInfo;
import io.ray.api.runtimecontext.ActorState;
import io.ray.api.runtimecontext.Address;
import io.ray.api.runtimecontext.NodeInfo;
import java.util.ArrayList;
import java.util.List;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"cluster"})
public class GetActorInfoTest extends BaseTest {

@BeforeClass
public void setUp() {
System.setProperty("ray.head-args.0", "--num-cpus=5");
}

@AfterClass
public void tearDown() {
System.clearProperty("ray.head-args.0");
}

private static class Echo {

public String echo(String str) {
Expand Down Expand Up @@ -52,4 +66,49 @@ public void testGetAllActorInfo() {
Assert.assertEquals(info.address.ip, thisAddress.ip);
});
}

public void testGetActorsByJobIdAndState() {
final int numActors = 5;
ArrayList<ActorHandle<Echo>> echos = new ArrayList<>(numActors);
for (int i = 0; i < numActors; ++i) {
ActorHandle<Echo> echo = Ray.actor(Echo::new).setResource("CPU", 1.0).remote();
echos.add(echo);
}
ArrayList<ObjectRef<String>> objs = new ArrayList<>();
echos.forEach(echo -> objs.add(echo.task(Echo::echo, "hello").remote()));
Ray.get(objs, 10000);

// get all actors
List<ActorInfo> actorInfo = Ray.getRuntimeContext().getAllActorInfo(null, null);
Assert.assertEquals(actorInfo.size(), 5);

// Filtered by job id
JobId jobId = Ray.getRuntimeContext().getCurrentJobId();
actorInfo = Ray.getRuntimeContext().getAllActorInfo(jobId, null);
Assert.assertEquals(actorInfo.size(), 5);

// Filtered by actor sate
actorInfo = Ray.getRuntimeContext().getAllActorInfo(null, ActorState.ALIVE);
Assert.assertEquals(actorInfo.size(), 5);

actorInfo = Ray.getRuntimeContext().getAllActorInfo(null, ActorState.DEPENDENCIES_UNREADY);
Assert.assertEquals(actorInfo.size(), 0);

actorInfo = Ray.getRuntimeContext().getAllActorInfo(null, ActorState.RESTARTING);
Assert.assertEquals(actorInfo.size(), 0);

ActorHandle<Echo> actor = Ray.actor(Echo::new).setResource("CPU", 1.0).remote();
Assert.assertTrue(
TestUtils.waitForCondition(
() ->
Ray.getRuntimeContext().getAllActorInfo(null, ActorState.PENDING_CREATION).size()
== 1,
5000));

actor.kill(true);
Assert.assertTrue(
TestUtils.waitForCondition(
() -> Ray.getRuntimeContext().getAllActorInfo(null, ActorState.DEAD).size() == 1,
5000));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,23 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *env,

JNIEXPORT jobject JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllActorInfo(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr) {
JNIEnv *env,
jobject o,
jlong gcs_accessor_ptr,
jbyteArray j_job_id,
jstring j_actor_state_name) {
std::optional<JobID> job_id = std::nullopt;
std::optional<std::string> actor_state_name = std::nullopt;
if (j_job_id != NULL) {
job_id = std::make_optional<JobID>(JavaByteArrayToId<JobID>(env, j_job_id));
}
if (j_actor_state_name != NULL) {
actor_state_name = std::make_optional<std::string>(
JavaStringToNativeString(env, j_actor_state_name));
}
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto actor_info_list = gcs_accessor->GetAllActorInfo();
auto actor_info_list =
gcs_accessor->GetAllActorInfo(std::nullopt, job_id, actor_state_name);
return NativeVectorToJavaList<std::string>(
env, actor_info_list, [](JNIEnv *env, const std::string &str) {
return NativeStringToJavaByteArray(env, str);
Expand All @@ -95,8 +109,8 @@ JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetActorInfo(JNIEnv *env,
jobject o,
jlong gcs_accessor_ptr,
jbyteArray actorId) {
const auto actor_id = JavaByteArrayToId<ActorID>(env, actorId);
jbyteArray j_actor_id) {
const auto actor_id = JavaByteArrayToId<ActorID>(env, j_actor_id);
auto *gcs_accessor = reinterpret_cast<gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto actor_info = gcs_accessor->GetActorInfo(actor_id);
if (actor_info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,11 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *,
/*
* Class: io_ray_runtime_gcs_GlobalStateAccessor
* Method: nativeGetAllActorInfo
* Signature: (J)Ljava/util/List;
* Signature: (J[BLjava/lang/String;)Ljava/util/List;
*/
JNIEXPORT jobject JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllActorInfo(JNIEnv *,
jobject,
jlong);
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllActorInfo(
JNIEnv *, jobject, jlong, jbyteArray, jstring);

/*
* Class: io_ray_runtime_gcs_GlobalStateAccessor
Expand Down

0 comments on commit 515fbe8

Please sign in to comment.