Skip to content

Commit

Permalink
Added FunctionPod
Browse files Browse the repository at this point in the history
  • Loading branch information
relvaner committed Jan 19, 2021
1 parent 6ddfb63 commit 5198260
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 0 deletions.
53 changes: 53 additions & 0 deletions src/main/java/io/actor4j/core/pods/functions/FunctionPod.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2015-2021, David A. Bauer. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.actor4j.core.pods.functions;

import io.actor4j.core.actors.ActorRef;
import io.actor4j.core.messages.ActorMessage;
import io.actor4j.core.pods.ActorPod;
import io.actor4j.core.pods.PodContext;
import io.actor4j.core.pods.actors.PodActor;

public abstract class FunctionPod extends ActorPod {
@Override
public PodActor create() {
return new PodActor() {
protected PodFunction podFunction;

@Override
public void preStart() {
if (getContext().isShard())
setAlias(domain()+getContext().getShardId());
else
setAlias(domain());

register();
}

@Override
public void receive(ActorMessage<?> message) {
podFunction.handle(message);
}

@Override
public void register() {
podFunction = createFunction(this, getContext());
}
};
}

public abstract PodFunction createFunction(ActorRef host, PodContext context);
}
33 changes: 33 additions & 0 deletions src/main/java/io/actor4j/core/pods/functions/PodFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2015-2020, David A. Bauer. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.actor4j.core.pods.functions;

import io.actor4j.core.actors.ActorRef;
import io.actor4j.core.messages.ActorMessage;
import io.actor4j.core.pods.PodContext;

public abstract class PodFunction {
protected ActorRef host;
protected PodContext context;

public PodFunction(ActorRef host, PodContext context) {
super();
this.host = host;
this.context = context;
}

public abstract void handle(ActorMessage<?> message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2015-2020, David A. Bauer. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.actor4j.core.pods.functions;

import io.actor4j.core.actors.ActorRef;
import io.actor4j.core.pods.PodContext;
import io.actor4j.core.pods.RemotePodMessage;
import io.actor4j.core.utils.Pair;

public abstract class PodRemoteFunction extends PodFunction {
public PodRemoteFunction(ActorRef host, PodContext context) {
super(host, context);
}

public abstract Pair<Object, Integer> handle(RemotePodMessage remoteMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2015-2021, David A. Bauer. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.actor4j.core.pods.functions;

import io.actor4j.core.actors.ActorRef;
import io.actor4j.core.messages.ActorMessage;
import io.actor4j.core.pods.PodContext;
import io.actor4j.core.pods.RemotePodMessage;
import io.actor4j.core.pods.actors.PodActor;
import io.actor4j.core.pods.actors.RemoteHandlerPodActor;
import io.actor4j.core.utils.Pair;

public abstract class RemoteFunctionPod extends FunctionPod {
@Override
public PodActor create() {
return new PodActor() {
protected PodFunction podFunction;
protected PodRemoteFunction podRemoteFunction;

@Override
public void preStart() {
if (getContext().isShard())
setAlias(domain()+getContext().getShardId());
else
setAlias(domain());

register();
}

@Override
public void receive(ActorMessage<?> message) {
if (message.value!=null && message.value instanceof RemotePodMessage) {
Pair<Object, Integer> result = podRemoteFunction.handle((RemotePodMessage)message.value);
internal_callback((RemotePodMessage)message.value, result);
}
else
podFunction.handle(message);
}

@Override
public void register() {
podFunction = createFunction(this, getContext());
podRemoteFunction = createRemoteFunction(this, getContext());
}
};
}

protected void internal_callback(RemotePodMessage remoteMessage, Pair<Object, Integer> result) {
if (remoteMessage.remotePodMessageDTO.reply && RemoteHandlerPodActor.internal_server_callback!=null)
RemoteHandlerPodActor.internal_server_callback.accept(remoteMessage.replyAddress, result.a, result.b);
}

public abstract PodRemoteFunction createRemoteFunction(ActorRef host, PodContext context);
}
81 changes: 81 additions & 0 deletions src/test/java/io/actor4j/core/features/PodFeature.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.actor4j.core.ActorSystem;
import io.actor4j.core.actors.Actor;
import io.actor4j.core.features.pod.ExampleReplicationWithActorPod;
import io.actor4j.core.features.pod.ExampleReplicationWithFunctionPod;
import io.actor4j.core.messages.ActorMessage;
import io.actor4j.core.pods.PodConfiguration;
import io.actor4j.core.utils.ActorGroupSet;
Expand Down Expand Up @@ -399,4 +400,84 @@ public void receive(ActorMessage<?> message) {

system.shutdownWithActors(true);
}

//----------------------------------------------------------------------------------------------------------------------------------------------

@Test(timeout=5000)
public void test_factory_ExampleReplicationWithFunctionPod() {
CountDownLatch testDone = new CountDownLatch(1);

system.deployPods(
() -> new ExampleReplicationWithFunctionPod(),
new PodConfiguration("ExampleReplicationWithFunctionPod", ExampleReplicationWithFunctionPod.class.getName(), 1, 1));
UUID client = system.addActor(() -> new Actor(){
@Override
public void receive(ActorMessage<?> message) {
logger().debug(String.format("client received a message ('%s') from ExampleReplicationWithFunctionPod", message.value));

assertEquals(42, message.tag);
assertTrue(message.value!=null);
assertTrue(message.value instanceof String);
assertTrue(message.valueAsString().startsWith("Hello Test!"));
testDone.countDown();
}
});
system.start();

system.sendViaAlias(new ActorMessage<>("Test", 0, client, null), "ExampleReplicationWithFunctionPod");

try {
testDone.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
system.shutdownWithActors(true);
}

@Test(timeout=5000)
public void test_factory_ExampleReplicationWithFunctionPod_more() {
CountDownLatch testDone = new CountDownLatch(1);

system.deployPods(
() -> new ExampleReplicationWithFunctionPod(),
new PodConfiguration("ExampleReplicationWithFunctionPod", ExampleReplicationWithFunctionPod.class.getName(), 3, 3));

UUID client = system.addActor(() -> new Actor() {
protected Set<UUID> handlers;

@Override
public void preStart() {
handlers = new HashSet<>();
}

@Override
public void receive(ActorMessage<?> message) {
logger().debug(String.format("client received a message ('%s') from ExampleReplicationWithFunctionPod", message.value));

assertEquals(42, message.tag);
assertTrue(message.value!=null);
assertTrue(message.value instanceof String);
assertTrue(message.valueAsString().startsWith("Hello Test!"));
handlers.add(message.source);
if (handlers.size()==3)
testDone.countDown();
}
});
UUID starter = system.addActor(() -> new Actor() {
@Override
public void receive(ActorMessage<?> message) {
List<UUID> handlers = system.underlyingImpl().getActorsFromAlias("ExampleReplicationWithFunctionPod");
system.broadcast(new ActorMessage<>("Test", 0, client, null), new ActorGroupSet(handlers));
}
});
system.start();
system.send(new ActorMessage<>(null, 0, system.SYSTEM_ID, starter));

try {
testDone.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
system.shutdownWithActors(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2015-2021, David A. Bauer. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.actor4j.core.features.pod;

import static io.actor4j.core.logging.user.ActorLogger.logger;

import io.actor4j.core.actors.ActorRef;
import io.actor4j.core.messages.ActorMessage;
import io.actor4j.core.pods.PodContext;
import io.actor4j.core.pods.functions.FunctionPod;
import io.actor4j.core.pods.functions.PodFunction;

public class ExampleReplicationWithFunctionPod extends FunctionPod {
@Override
public PodFunction createFunction(ActorRef host, PodContext context) {
return new PodFunction(host, context) {
@Override
public void handle(ActorMessage<?> message) {
logger().debug(message.value.toString());
host.tell(String.format("Hello %s! [domain:%s, primaryReplica:%s]",
message.value,
context.getDomain(),
context.isPrimaryReplica())
, 42, message.source, message.interaction);
}
};
}

@Override
public String domain() {
return "ExampleReplicationWithFunctionPod";
}
}

0 comments on commit 5198260

Please sign in to comment.