Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Fix instanceof RayPyActor #6377

Merged
merged 2 commits into from Dec 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -8,40 +8,54 @@
import java.util.List;
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayPyActor;
import org.ray.api.id.ActorId;
import org.ray.api.id.UniqueId;
import org.ray.api.runtime.RayRuntime;
import org.ray.runtime.AbstractRayRuntime;
import org.ray.runtime.RayNativeRuntime;
import org.ray.runtime.RayMultiWorkerNativeRuntime;
import org.ray.runtime.RayNativeRuntime;
import org.ray.runtime.generated.Common.Language;

/**
* RayActor implementation for cluster mode. This is a wrapper class for C++ ActorHandle.
* RayActor abstract language-independent implementation for cluster mode. This is a wrapper class
* for C++ ActorHandle.
*/
public class NativeRayActor implements RayActor, RayPyActor, Externalizable {
public abstract class NativeRayActor implements RayActor, Externalizable {

/**
* Address of core worker.
*/
private long nativeCoreWorkerPointer;
long nativeCoreWorkerPointer;
/**
* ID of the actor.
*/
private byte[] actorId;
byte[] actorId;

public NativeRayActor(long nativeCoreWorkerPointer, byte[] actorId) {
private Language language;

NativeRayActor(long nativeCoreWorkerPointer, byte[] actorId, Language language) {
Preconditions.checkState(nativeCoreWorkerPointer != 0);
Preconditions.checkState(!ActorId.fromBytes(actorId).isNil());
this.nativeCoreWorkerPointer = nativeCoreWorkerPointer;
this.actorId = actorId;
this.language = language;
}

/**
* Required by FST
*/
public NativeRayActor() {
NativeRayActor() {
}

public static NativeRayActor create(long nativeCoreWorkerPointer, byte[] actorId,
Language language) {
Preconditions.checkState(nativeCoreWorkerPointer != 0);
switch (language) {
case JAVA:
return new NativeRayJavaActor(nativeCoreWorkerPointer, actorId);
case PYTHON:
return new NativeRayPyActor(nativeCoreWorkerPointer, actorId);
default:
throw new IllegalStateException("Unknown actor handle language: " + language);
}
}

@Override
Expand All @@ -50,30 +64,17 @@ public ActorId getId() {
}

public Language getLanguage() {
return Language.forNumber(nativeGetLanguage(nativeCoreWorkerPointer, actorId));
return language;
}

public boolean isDirectCallActor() {
return nativeIsDirectCallActor(nativeCoreWorkerPointer, actorId);
}

@Override
public String getModuleName() {
Preconditions.checkState(getLanguage() == Language.PYTHON);
return nativeGetActorCreationTaskFunctionDescriptor(
nativeCoreWorkerPointer, actorId).get(0);
}

@Override
public String getClassName() {
Preconditions.checkState(getLanguage() == Language.PYTHON);
return nativeGetActorCreationTaskFunctionDescriptor(
nativeCoreWorkerPointer, actorId).get(1);
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(nativeSerialize(nativeCoreWorkerPointer, actorId));
out.writeObject(language);
}

@Override
Expand All @@ -82,23 +83,21 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
if (runtime instanceof RayMultiWorkerNativeRuntime) {
runtime = ((RayMultiWorkerNativeRuntime) runtime).getCurrentRuntime();
}

Preconditions.checkState(runtime instanceof RayNativeRuntime);
nativeCoreWorkerPointer = ((RayNativeRuntime)runtime).getNativeCoreWorkerPointer();

nativeCoreWorkerPointer = ((RayNativeRuntime) runtime).getNativeCoreWorkerPointer();
actorId = nativeDeserialize(nativeCoreWorkerPointer, (byte[]) in.readObject());
language = (Language) in.readObject();
}

@Override
protected void finalize() {
// TODO(zhijunfu): do we need to free the ActorHandle in core worker?
}

private static native int nativeGetLanguage(long nativeCoreWorkerPointer, byte[] actorId);

private static native boolean nativeIsDirectCallActor(long nativeCoreWorkerPointer, byte[] actorId);

private static native List<String> nativeGetActorCreationTaskFunctionDescriptor(
static native List<String> nativeGetActorCreationTaskFunctionDescriptor(
long nativeCoreWorkerPointer, byte[] actorId);

private static native byte[] nativeSerialize(long nativeCoreWorkerPointer, byte[] actorId);
Expand Down
@@ -0,0 +1,29 @@
package org.ray.runtime.actor;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.ObjectInput;
import org.ray.runtime.generated.Common.Language;

/**
* RayActor Java implementation for cluster mode.
*/
public class NativeRayJavaActor extends NativeRayActor {

NativeRayJavaActor(long nativeCoreWorkerPointer, byte[] actorId) {
super(nativeCoreWorkerPointer, actorId, Language.JAVA);
}

/**
* Required by FST
*/
public NativeRayJavaActor() {
super();
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
Preconditions.checkState(getLanguage() == Language.JAVA);
}
}
@@ -0,0 +1,40 @@
package org.ray.runtime.actor;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.ObjectInput;
import org.ray.api.RayPyActor;
import org.ray.runtime.generated.Common.Language;

/**
* RayActor Python implementation for cluster mode.
*/
public class NativeRayPyActor extends NativeRayActor implements RayPyActor {

NativeRayPyActor(long nativeCoreWorkerPointer, byte[] actorId) {
super(nativeCoreWorkerPointer, actorId, Language.PYTHON);
}

/**
* Required by FST
*/
public NativeRayPyActor() {
super();
}

@Override
public String getModuleName() {
return nativeGetActorCreationTaskFunctionDescriptor(nativeCoreWorkerPointer, actorId).get(0);
}

@Override
public String getClassName() {
return nativeGetActorCreationTaskFunctionDescriptor(nativeCoreWorkerPointer, actorId).get(1);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
Preconditions.checkState(getLanguage() == Language.PYTHON);
}
}
Expand Up @@ -37,7 +37,7 @@ public RayActor createActor(FunctionDescriptor functionDescriptor, List<Function
ActorCreationOptions options) {
byte[] actorId = nativeCreateActor(nativeCoreWorkerPointer, functionDescriptor, args,
options);
return new NativeRayActor(nativeCoreWorkerPointer, actorId);
return NativeRayActor.create(nativeCoreWorkerPointer, actorId, functionDescriptor.getLanguage());
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions java/test/src/main/java/org/ray/api/test/ActorTest.java
Expand Up @@ -7,6 +7,7 @@
import org.ray.api.Ray;
import org.ray.api.RayActor;
import org.ray.api.RayObject;
import org.ray.api.RayPyActor;
import org.ray.api.TestUtils;
import org.ray.api.TestUtils.LargeObject;
import org.ray.api.annotation.RayRemote;
Expand Down Expand Up @@ -50,6 +51,8 @@ public void testCreateAndCallActor() {
// Test creating an actor from a constructor
RayActor<Counter> actor = Ray.createActor(Counter::new, 1);
Assert.assertNotEquals(actor.getId(), UniqueId.NIL);
// A java actor is not a python actor
Assert.assertFalse(actor instanceof RayPyActor);
// Test calling an actor
Assert.assertEquals(Integer.valueOf(1), Ray.call(Counter::getValue, actor).get());
Ray.call(Counter::increase, actor, 1);
Expand Down
Expand Up @@ -13,16 +13,6 @@ inline ray::CoreWorker &GetCoreWorker(jlong nativeCoreWorkerPointer) {
extern "C" {
#endif

JNIEXPORT jint JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetLanguage(
JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray actorId) {
auto actor_id = JavaByteArrayToId<ray::ActorID>(env, actorId);
ray::ActorHandle *native_actor_handle = nullptr;
auto status = GetCoreWorker(nativeCoreWorkerPointer)
.GetActorHandle(actor_id, &native_actor_handle);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (jint)0);
return (jint)native_actor_handle->ActorLanguage();
}

JNIEXPORT jboolean JNICALL
Java_org_ray_runtime_actor_NativeRayActor_nativeIsDirectCallActor(
JNIEnv *env, jclass o, jlong nativeCoreWorkerPointer, jbyteArray actorId) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.