Skip to content

Commit

Permalink
Refactoring of EndpointActors
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvayka committed Dec 29, 2015
1 parent 4df8ef1 commit b1b6e30
Show file tree
Hide file tree
Showing 17 changed files with 273 additions and 139 deletions.
Expand Up @@ -35,7 +35,9 @@
@Component
public class AkkaContext {

private static final String ENDPOINT_ACTOR_TIMEOUT = "endpoint_actor_timeout";
private static final String GLOBAL_ENDPOINT_ACTOR_TIMEOUT = "global_endpoint_actor_timeout";

private static final String LOCAL_ENDPOINT_ACTOR_TIMEOUT = "local_endpoint_actor_timeout";

private static final String ENDPOINT_EVENT_TIMEOUT = "endpoint_event_timeout";

Expand Down Expand Up @@ -94,8 +96,12 @@ public int getIOWorkerCount(){
return config.getInt(IO_WORKER_COUNT_PROP_NAME);
}

public long getInactivityTimeout() {
return config.getLong(ENDPOINT_ACTOR_TIMEOUT);
public long getGlobalEndpointTimeout() {
return config.getLong(GLOBAL_ENDPOINT_ACTOR_TIMEOUT);
}

public long getLocalEndpointTimeout() {
return config.getLong(LOCAL_ENDPOINT_ACTOR_TIMEOUT);
}

public long getEventTimeout() {
Expand Down
Expand Up @@ -23,12 +23,12 @@

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.common.thrift.gen.operations.Notification;
import org.kaaproject.kaa.server.common.thrift.gen.operations.Operation;
import org.kaaproject.kaa.server.operations.service.akka.AkkaContext;
import org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.local.LocalEndpointActorCreator;
import org.kaaproject.kaa.server.operations.service.akka.actors.supervision.SupervisionStrategyFactory;
import org.kaaproject.kaa.server.operations.service.akka.messages.core.endpoint.EndpointAwareMessage;
import org.kaaproject.kaa.server.operations.service.akka.messages.core.endpoint.EndpointStopMessage;
Expand Down Expand Up @@ -410,11 +410,10 @@ private void processEndpointUserDeregistration(EndpointUserDisconnectMessage mes
private void processEndpointRequest(EndpointAwareMessage message) {
ActorMetaData endpointMetaData = endpointSessions.get(message.getKey());
if (endpointMetaData == null) {
UUID uuid = UUID.randomUUID();
String endpointActorId = uuid.toString().replaceAll("-", "");
String endpointActorId = LocalEndpointActorCreator.generateActorKey();
LOG.debug("[{}] Creating actor with endpointKey: {}", applicationToken, endpointActorId);
endpointMetaData = new ActorMetaData(context().actorOf(
Props.create(new EndpointActor.ActorCreator(context, endpointActorId, message.getAppToken(), message.getKey()))
Props.create(new LocalEndpointActorCreator(context, endpointActorId, message.getAppToken(), message.getKey()))
.withDispatcher(ENDPOINT_DISPATCHER_NAME), endpointActorId), endpointActorId);
endpointSessions.put(message.getKey(), endpointMetaData);
endpointActorMap.put(endpointActorId, message.getKey());
Expand Down
@@ -0,0 +1,68 @@
package org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint;

import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.operations.service.OperationsService;
import org.kaaproject.kaa.server.operations.service.akka.messages.core.endpoint.EndpointStopMessage;
import org.kaaproject.kaa.server.operations.service.akka.messages.core.session.ActorTimeoutMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.ActorContext;
import akka.actor.ActorRef;

public abstract class AbstractEndpointActorMessageProcessor<T extends AbstractEndpointActorState> {

/** The Constant LOG. */
private static final Logger LOG = LoggerFactory.getLogger(AbstractEndpointActorMessageProcessor.class);

protected final T state;

/** The operations service. */
protected final OperationsService operationsService;

/** The app token. */
protected final String appToken;

/** The key. */
protected final EndpointObjectHash key;

/** The actor key. */
protected final String actorKey;

/** The endpoint key. */
protected final String endpointKey;

private final long inactivityTimeout;

public AbstractEndpointActorMessageProcessor(T state, OperationsService operationsService, String appToken, EndpointObjectHash key,
String actorKey, String endpointKey, long inactivityTimeout) {
super();
this.state = state;
this.operationsService = operationsService;
this.inactivityTimeout = inactivityTimeout;
this.appToken = appToken;
this.key = key;
this.actorKey = actorKey;
this.endpointKey = endpointKey;
}

public long getInactivityTimeout() {
return inactivityTimeout;
}

public void processActorTimeoutMessage(ActorContext context, ActorTimeoutMessage message) {
if (state.getLastActivityTime() <= message.getLastActivityTime()) {
LOG.debug("[{}][{}] Request stop of endpoint actor due to inactivity timeout", endpointKey, actorKey);
tellParent(context, new EndpointStopMessage(key, actorKey, context.self()));
}
}

protected void tellParent(ActorContext context, Object response) {
context.parent().tell(response, context.self());
}

protected void tellActor(ActorContext context, ActorRef target, Object message) {
target.tell(message, context.self());
}

}
@@ -0,0 +1,23 @@
package org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint;

public abstract class AbstractEndpointActorState {

protected final String endpointKey;
protected final String actorKey;
private long lastActivityTime;

public AbstractEndpointActorState(String endpointKey, String actorKey) {
super();
this.endpointKey = endpointKey;
this.actorKey = actorKey;
}

public long getLastActivityTime() {
return lastActivityTime;
}

public void setLastActivityTime(long time) {
this.lastActivityTime = time;
}

}
@@ -0,0 +1,53 @@
package org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint;

import java.util.UUID;

import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.operations.service.akka.AkkaContext;

import akka.japi.Creator;

/**
* The Class ActorCreator.
*/
public abstract class EndpointActorCreator<T> implements Creator<T> {

/** The Constant serialVersionUID. */
private static final long serialVersionUID = 1L;

/** The Akka service context */
protected final AkkaContext context;

/** The actor key */
protected final String actorKey;

/** The app token. */
protected final String appToken;

/** The endpoint key. */
protected final EndpointObjectHash endpointKey;

/**
* Instantiates a new actor creator.
*
* @param context
* the context
* @param endpointActorKey
* the endpoint actor key
* @param appToken
* the app token
* @param endpointKey
* the endpoint key
*/
public EndpointActorCreator(AkkaContext context, String endpointActorKey, String appToken, EndpointObjectHash endpointKey) {
super();
this.context = context;
this.actorKey = endpointActorKey;
this.appToken = appToken;
this.endpointKey = endpointKey;
}

public static String generateActorKey(){
return UUID.randomUUID().toString().replaceAll("-", "");
}
}
@@ -0,0 +1,13 @@
package org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.global;

import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.operations.service.akka.AkkaContext;

public class GlobalEndpointActor {

GlobalEndpointActor(AkkaContext context, String actorKey, String appToken, EndpointObjectHash endpointKey) {
// TODO Auto-generated constructor stub
}


}
@@ -0,0 +1,20 @@
package org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.global;

import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.operations.service.akka.AkkaContext;
import org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.EndpointActorCreator;

public class GlobalEndpointActorCreator extends EndpointActorCreator<GlobalEndpointActor> {

private static final long serialVersionUID = 9080174513879065821L;

public GlobalEndpointActorCreator(AkkaContext context, String endpointActorKey, String appToken, EndpointObjectHash key) {
super(context, endpointActorKey, appToken, key);
}

@Override
public GlobalEndpointActor create() throws Exception {
return new GlobalEndpointActor(context, actorKey, appToken, endpointKey);
}

}
@@ -0,0 +1,19 @@
package org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.global;

import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.common.Base64Util;
import org.kaaproject.kaa.server.operations.service.akka.AkkaContext;
import org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.AbstractEndpointActorMessageProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GlobalEndpointActorMessageProcessor extends AbstractEndpointActorMessageProcessor<GlobalEndpointActorState> {

/** The Constant LOG. */
private static final Logger LOG = LoggerFactory.getLogger(GlobalEndpointActorMessageProcessor.class);

public GlobalEndpointActorMessageProcessor(AkkaContext context, String appToken, EndpointObjectHash key, String actorKey) {
super(new GlobalEndpointActorState(Base64Util.encode(key.getData()), actorKey), context.getOperationsService(), appToken, key,
actorKey, Base64Util.encode(key.getData()), context.getGlobalEndpointTimeout());
}
}
@@ -0,0 +1,11 @@
package org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.global;

import org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.AbstractEndpointActorState;

public class GlobalEndpointActorState extends AbstractEndpointActorState {

public GlobalEndpointActorState(String endpointKey, String actorKey) {
super(endpointKey, actorKey);
}

}
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kaaproject.kaa.server.operations.service.akka.actors.core;
package org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.local;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.kaaproject.kaa.server.operations.service.akka.actors.core;
package org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.local;

import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.operations.service.akka.AkkaContext;
Expand All @@ -38,19 +38,18 @@
import org.slf4j.LoggerFactory;

import akka.actor.UntypedActor;
import akka.japi.Creator;

/**
* The Class EndpointActor.
*/
public class EndpointActor extends UntypedActor {
public class LocalEndpointActor extends UntypedActor {

/** The Constant LOG. */
private static final Logger LOG = LoggerFactory.getLogger(EndpointActor.class);
private static final Logger LOG = LoggerFactory.getLogger(LocalEndpointActor.class);

private final String actorKey;

private final EndpointActorMessageProcessor messageProcessor;
private final LocalEndpointActorMessageProcessor messageProcessor;

/**
* Instantiates a new endpoint actor.
Expand All @@ -60,57 +59,11 @@ public class EndpointActor extends UntypedActor {
* @param appToken the app token
* @param key the key
*/
public EndpointActor(AkkaContext context, String endpointActorKey, String appToken, EndpointObjectHash key) {
this.messageProcessor = new EndpointActorMessageProcessor(context, appToken, key, endpointActorKey);
LocalEndpointActor(AkkaContext context, String endpointActorKey, String appToken, EndpointObjectHash key) {
this.messageProcessor = new LocalEndpointActorMessageProcessor(context, appToken, key, endpointActorKey);
this.actorKey = endpointActorKey;
}

/**
* The Class ActorCreator.
*/
public static class ActorCreator implements Creator<EndpointActor> {

/** The Constant serialVersionUID. */
private static final long serialVersionUID = 1L;

/** The Akka service context */
private final AkkaContext context;

private final String actorKey;

/** The app token. */
private final String appToken;

/** The key. */
private final EndpointObjectHash key;

/**
* Instantiates a new actor creator.
*
* @param context the context
* @param endpointActorKey the endpoint actor key
* @param appToken the app token
* @param key the key
*/
public ActorCreator(AkkaContext context, String endpointActorKey, String appToken, EndpointObjectHash key) {
super();
this.context = context;
this.actorKey = endpointActorKey;
this.appToken = appToken;
this.key = key;
}

/*
* (non-Javadoc)
*
* @see akka.japi.Creator#create()
*/
@Override
public EndpointActor create() throws Exception {
return new EndpointActor(context, actorKey, appToken, key);
}
}

/*
* (non-Javadoc)
*
Expand Down
@@ -0,0 +1,20 @@
package org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.local;

import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.operations.service.akka.AkkaContext;
import org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.EndpointActorCreator;

public class LocalEndpointActorCreator extends EndpointActorCreator<LocalEndpointActor> {

private static final long serialVersionUID = 9080174513879065821L;

public LocalEndpointActorCreator(AkkaContext context, String endpointActorKey, String appToken, EndpointObjectHash key) {
super(context, endpointActorKey, appToken, key);
}

@Override
public LocalEndpointActor create() throws Exception {
return new LocalEndpointActor(context, actorKey, appToken, endpointKey);
}

}

0 comments on commit b1b6e30

Please sign in to comment.