Skip to content

Commit

Permalink
[Java] Dynamic resource API in Java (#4824)
Browse files Browse the repository at this point in the history
  • Loading branch information
jovany-wang authored and suquark committed May 21, 2019
1 parent 02583a8 commit 081708b
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 0 deletions.
15 changes: 15 additions & 0 deletions java/api/src/main/java/org/ray/api/Ray.java
Expand Up @@ -123,6 +123,21 @@ public static RayRuntime internal() {
return runtime;
}

/**
* Update the resource for the specified client.
* Set the resource for the specific node.
*/
public static void setResource(UniqueId nodeId, String resourceName, double capacity) {
runtime.setResource(resourceName, capacity, nodeId);
}

/**
* Set the resource for local node.
*/
public static void setResource(String resourceName, double capacity) {
runtime.setResource(resourceName, capacity, UniqueId.NIL);
}

/**
* Get the runtime context.
*/
Expand Down
9 changes: 9 additions & 0 deletions java/api/src/main/java/org/ray/api/runtime/RayRuntime.java
Expand Up @@ -65,6 +65,15 @@ public interface RayRuntime {
*/
void free(List<UniqueId> objectIds, boolean localOnly, boolean deleteCreatingTasks);

/**
* Set the resource for the specific node.
*
* @param resourceName The name of resource.
* @param capacity The capacity of the resource.
* @param nodeId The node that we want to set its resource.
*/
void setResource(String resourceName, double capacity, UniqueId nodeId);

/**
* Invoke a remote function.
*
Expand Down
Expand Up @@ -210,6 +210,15 @@ public void free(List<UniqueId> objectIds, boolean localOnly, boolean deleteCrea
rayletClient.freePlasmaObjects(objectIds, localOnly, deleteCreatingTasks);
}

@Override
public void setResource(String resourceName, double capacity, UniqueId nodeId) {
Preconditions.checkArgument(Double.compare(capacity, 0) >= 0);
if (nodeId == null) {
nodeId = UniqueId.NIL;
}
rayletClient.setResource(resourceName, capacity, nodeId);
}

private List<List<UniqueId>> splitIntoBatches(List<UniqueId> objectIds) {
List<List<UniqueId>> batches = new ArrayList<>();
int objectsSize = objectIds.size();
Expand Down
Expand Up @@ -209,6 +209,11 @@ public void notifyActorResumedFromCheckpoint(UniqueId actorId, UniqueId checkpoi
throw new NotImplementedException("Not implemented.");
}

@Override
public void setResource(String resourceName, double capacity, UniqueId nodeId) {
LOGGER.error("Not implemented under SINGLE_PROCESS mode.");
}

@Override
public void destroy() {
exec.shutdown();
Expand Down
Expand Up @@ -30,5 +30,7 @@ <T> WaitResult<T> wait(List<RayObject<T>> waitFor, int numReturns, int

void notifyActorResumedFromCheckpoint(UniqueId actorId, UniqueId checkpointId);

void setResource(String resourceName, double capacity, UniqueId nodeId);

void destroy();
}
Expand Up @@ -308,6 +308,10 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
return buffer;
}

public void setResource(String resourceName, double capacity, UniqueId nodeId) {
nativeSetResource(client, resourceName, capacity, nodeId.getBytes());
}

public void destroy() {
nativeDestroy(client);
}
Expand Down Expand Up @@ -357,4 +361,7 @@ private static native void nativeFreePlasmaObjects(long conn, byte[][] objectIds

private static native void nativeNotifyActorResumedFromCheckpoint(long conn, byte[] actorId,
byte[] checkpointId);

private static native void nativeSetResource(long conn, String resourceName, double capacity,
byte[] nodeId) throws RayException;
}
44 changes: 44 additions & 0 deletions java/test/src/main/java/org/ray/api/test/DynamicResourceTest.java
@@ -0,0 +1,44 @@
package org.ray.api.test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.ray.api.Ray;
import org.ray.api.RayObject;
import org.ray.api.TestUtils;
import org.ray.api.WaitResult;
import org.ray.api.annotation.RayRemote;
import org.ray.api.options.CallOptions;
import org.ray.api.runtimecontext.NodeInfo;
import org.testng.Assert;
import org.testng.annotations.Test;

public class DynamicResourceTest extends BaseTest {

@RayRemote
public static String sayHi() {
return "hi";
}

@Test
public void testSetResource() {
TestUtils.skipTestUnderSingleProcess();
CallOptions op1 = new CallOptions(ImmutableMap.of("A", 10.0));
RayObject<String> obj = Ray.call(DynamicResourceTest::sayHi, op1);
WaitResult<String> result = Ray.wait(ImmutableList.of(obj), 1, 1000);
Assert.assertEquals(result.getReady().size(), 0);

Ray.setResource("A", 10.0);

// Assert node info.
List<NodeInfo> nodes = Ray.getRuntimeContext().getAllNodeInfo();
Assert.assertEquals(nodes.size(), 1);
Assert.assertEquals(nodes.get(0).resources.get("A"), 10.0);

// Assert ray call result.
result = Ray.wait(ImmutableList.of(obj), 1, 1000);
Assert.assertEquals(result.getReady().size(), 1);
Assert.assertEquals(Ray.get(obj.getId()), "hi");
}

}
18 changes: 18 additions & 0 deletions src/ray/raylet/lib/java/org_ray_runtime_raylet_RayletClientImpl.cc
Expand Up @@ -302,6 +302,24 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeNotifyActorResumedFromCheckpo
ThrowRayExceptionIfNotOK(env, status);
}

/*
* Class: org_ray_runtime_raylet_RayletClientImpl
* Method: nativeSetResource
* Signature: (JLjava/lang/String;D[B)V
*/
JNIEXPORT void JNICALL
Java_org_ray_runtime_raylet_RayletClientImpl_nativeSetResource(JNIEnv *env, jclass,
jlong client, jstring resourceName, jdouble capacity, jbyteArray nodeId) {
auto raylet_client = reinterpret_cast<RayletClient *>(client);
UniqueIdFromJByteArray<ClientID> node_id(env, nodeId);
const char *native_resource_name = env->GetStringUTFChars(resourceName, JNI_FALSE);

auto status = raylet_client->SetResource(native_resource_name,
static_cast<double>(capacity), node_id.GetId());
env->ReleaseStringUTFChars(resourceName, native_resource_name);
ThrowRayExceptionIfNotOK(env, status);
}

#ifdef __cplusplus
}
#endif

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 081708b

Please sign in to comment.