Skip to content

Commit

Permalink
KAA-244: add user verifier actor initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvayka committed Jan 29, 2015
1 parent f52b886 commit 23a29b6
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 51 deletions.
Expand Up @@ -36,6 +36,7 @@
import org.kaaproject.kaa.server.operations.service.metrics.MetricsService;
import org.kaaproject.kaa.server.operations.service.notification.NotificationDeltaService;
import org.kaaproject.kaa.server.operations.service.security.KeyStoreService;
import org.kaaproject.kaa.server.operations.service.user.EndpointUserService;
import org.kaaproject.kaa.server.sync.platform.PlatformLookup;
import org.kaaproject.kaa.server.transport.message.SessionInitMessage;
import org.kaaproject.kaa.server.transport.session.SessionAware;
Expand Down Expand Up @@ -106,6 +107,9 @@ public class DefaultAkkaService implements AkkaService {

@Autowired
private LogAppenderService logAppenderService;

@Autowired
private EndpointUserService endpointUserService;

private AkkaEventServiceListener listener;

Expand All @@ -121,7 +125,7 @@ public void initActorSystem() {
akka = ActorSystem.create(EPS);
LOG.info("Initializing Akka EPS actor...");
opsActor = akka.actorOf(Props.create(new OperationsServerActor.ActorCreator(cacheService, operationsService,
notificationDeltaService, eventService, applicationService, logAppenderService)), EPS);
notificationDeltaService, eventService, applicationService, logAppenderService, endpointUserService)), EPS);
LOG.info("Lookup platform protocols");
Set<String> platformProtocols = PlatformLookup.lookupPlatformProtocols(PlatformLookup.DEFAULT_PROTOCOL_LOOKUP_PACKAGE_NAME);
LOG.info("Initializing Akka io router...");
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.kaaproject.kaa.server.operations.service.akka.messages.core.user.EndpointUserDisconnectMessage;
import org.kaaproject.kaa.server.operations.service.logs.LogAppenderService;
import org.kaaproject.kaa.server.operations.service.notification.NotificationDeltaService;
import org.kaaproject.kaa.server.operations.service.user.EndpointUserService;
import org.kaaproject.kaa.server.transport.session.SessionAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,11 +78,15 @@ public class ApplicationActor extends UntypedActor {

private final Map<String, ActorRef> logsSessions;

private final Map<String, ActorRef> userVerifierSessions;

private final LogAppenderService logAppenderService;

private final ApplicationService applicationService;

private ActorRef applicationLogActor;

private ActorRef userVerifierActor;

/**
* Instantiates a new application actor.
Expand All @@ -92,7 +97,7 @@ public class ApplicationActor extends UntypedActor {
* the notification delta service
*/
private ApplicationActor(OperationsService operationsService, NotificationDeltaService notificationDeltaService,
ApplicationService applicationService, LogAppenderService logAppenderService, String applicationToken) {
ApplicationService applicationService, LogAppenderService logAppenderService, EndpointUserService endpointUserService, String applicationToken) {
this.operationsService = operationsService;
this.applicationService = applicationService;
this.logAppenderService = logAppenderService;
Expand All @@ -102,7 +107,9 @@ private ApplicationActor(OperationsService operationsService, NotificationDeltaS
this.endpointActorMap = new HashMap<>();
this.topicSessions = new HashMap<>();
this.logsSessions = new HashMap<>();
this.userVerifierSessions = new HashMap<>();
this.applicationLogActor = getOrCreateLogActor(null, logAppenderService, applicationService);
this.userVerifierActor = getOrCreateUserVerifierActor(null, endpointUserService, applicationService);
}

/**
Expand All @@ -124,6 +131,9 @@ public static class ActorCreator implements Creator<ApplicationActor> {

/** The log appender service. */
private final LogAppenderService logAppenderService;

/** The endpoint user service. */
private final EndpointUserService endpointUserService;

private final String applicationToken;

Expand All @@ -136,13 +146,14 @@ public static class ActorCreator implements Creator<ApplicationActor> {
* the notification delta service
*/
public ActorCreator(OperationsService operationsService, NotificationDeltaService notificationDeltaService,
ApplicationService applicationService, LogAppenderService logAppenderService, String applicationToken) {
ApplicationService applicationService, LogAppenderService logAppenderService, EndpointUserService endpointUserService, String applicationToken) {
super();
this.operationsService = operationsService;
this.notificationDeltaService = notificationDeltaService;
this.applicationToken = applicationToken;
this.applicationService = applicationService;
this.logAppenderService = logAppenderService;
this.endpointUserService = endpointUserService;
}

/*
Expand All @@ -152,7 +163,7 @@ public ActorCreator(OperationsService operationsService, NotificationDeltaServic
*/
@Override
public ApplicationActor create() throws Exception {
return new ApplicationActor(operationsService, notificationDeltaService, applicationService, logAppenderService,
return new ApplicationActor(operationsService, notificationDeltaService, applicationService, logAppenderService, endpointUserService,
applicationToken);
}
}
Expand Down Expand Up @@ -203,8 +214,16 @@ private void processLogEventPackMessage(LogEventPackMessage message) {
}

private void processLogNotificationMessage(ThriftNotificationMessage message) {
LOG.debug("[{}] Processing thrift notification message", applicationToken);
applicationLogActor.tell(message, self());
processThriftNotificationMessage(applicationLogActor, message);
}

private void processUserVerifierNotificationMessage(ThriftNotificationMessage message) {
processThriftNotificationMessage(userVerifierActor, message);
}

private void processThriftNotificationMessage(ActorRef actor, ThriftNotificationMessage message) {
LOG.debug("[{}] Processing thrift notification message {}", applicationToken, message);
actor.tell(message, self());
}

/**
Expand All @@ -224,6 +243,9 @@ private void processThriftNotification(ThriftNotificationMessage message) {
} else if (notification.isSetAppenderId()) {
LOG.debug("[{}] Forwarding message to application log actor", applicationToken);
processLogNotificationMessage(message);
} else if (notification.isSetUserVerifierId()) {
LOG.debug("[{}] Forwarding message to application log actor", applicationToken);
processUserVerifierNotificationMessage(message);
} else {
LOG.debug("[{}] Broadcasting message to all endpoints", applicationToken);
broadcastToAllEndpoints(message);
Expand Down Expand Up @@ -468,6 +490,18 @@ private ActorRef getOrCreateLogActor(String name, LogAppenderService logAppender
}
return logActor;
}

private ActorRef getOrCreateUserVerifierActor(String name, EndpointUserService endpointUserService, ApplicationService applicationService) {
ActorRef logActor = userVerifierSessions.get(name);
if (logActor == null) {
logActor = context().actorOf(
Props.create(new ApplicationUserVerifierActor.ActorCreator(endpointUserService, applicationService, applicationToken)));
context().watch(logActor);
userVerifierSessions.put(logActor.path().name(), logActor);
}
return logActor;
}


/**
* Builds the topic key.
Expand Down
Expand Up @@ -41,7 +41,7 @@ public class ApplicationUserVerifierActor extends UntypedActor {
/**
* Instantiates a new application log actor.
*
* @param logAppenderService
* @param endpointUserService
*
* the log appender service
*/
Expand All @@ -59,7 +59,7 @@ public static class ActorCreator implements Creator<ApplicationUserVerifierActor
private static final long serialVersionUID = 1L;

/** The log appender service. */
private final EndpointUserService logAppenderService;
private final EndpointUserService endpointUserService;

/** The log application service. */
private final ApplicationService applicationService;
Expand All @@ -72,9 +72,9 @@ public static class ActorCreator implements Creator<ApplicationUserVerifierActor
* @param logAppenderService
* the log appender service
*/
public ActorCreator(EndpointUserService logAppenderService, ApplicationService applicationService, String applicationToken) {
public ActorCreator(EndpointUserService endpointUserService, ApplicationService applicationService, String applicationToken) {
super();
this.logAppenderService = logAppenderService;
this.endpointUserService = endpointUserService;
this.applicationService = applicationService;
this.applicationToken = applicationToken;
}
Expand All @@ -86,7 +86,7 @@ public ActorCreator(EndpointUserService logAppenderService, ApplicationService a
*/
@Override
public ApplicationUserVerifierActor create() throws Exception {
return new ApplicationUserVerifierActor(logAppenderService, applicationService, applicationToken);
return new ApplicationUserVerifierActor(endpointUserService, applicationService, applicationToken);
}
}

Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.kaaproject.kaa.server.operations.service.event.EventService;
import org.kaaproject.kaa.server.operations.service.logs.LogAppenderService;
import org.kaaproject.kaa.server.operations.service.notification.NotificationDeltaService;
import org.kaaproject.kaa.server.operations.service.user.EndpointUserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,13 +59,16 @@ public class OperationsServerActor extends UntypedActor {

/** The tenants id-actor map. */
private final Map<String, ActorRef> tenants;

/** The application service. */
private final ApplicationService applicationService;

/** The log appender service. */
private final LogAppenderService logAppenderService;

/** The endpoint user service. */
private final EndpointUserService endpointUserService;

/**
* Instantiates a new endpoint server actor.
*
Expand All @@ -81,8 +85,9 @@ public class OperationsServerActor extends UntypedActor {
* @param logAppenderService
* the log appender service
*/
private OperationsServerActor(CacheService cacheService, OperationsService operationsService, NotificationDeltaService notificationDeltaService,
EventService eventService, ApplicationService applicationService, LogAppenderService logAppenderService) {
private OperationsServerActor(CacheService cacheService, OperationsService operationsService,
NotificationDeltaService notificationDeltaService, EventService eventService, ApplicationService applicationService,
LogAppenderService logAppenderService, EndpointUserService endpointUserService) {
super();
this.tenants = new HashMap<String, ActorRef>();
this.cacheService = cacheService;
Expand All @@ -91,6 +96,7 @@ private OperationsServerActor(CacheService cacheService, OperationsService opera
this.eventService = eventService;
this.applicationService = applicationService;
this.logAppenderService = logAppenderService;
this.endpointUserService = endpointUserService;
}

/**
Expand All @@ -112,13 +118,16 @@ public static class ActorCreator implements Creator<OperationsServerActor> {

/** The event service. */
private final EventService eventService;

/** The application service. */
private final ApplicationService applicationService;

/** The log appender service. */
private final LogAppenderService logAppenderService;

/** The endpoint user service. */
private final EndpointUserService endpointUserService;

/**
* Instantiates a new actor creator.
*
Expand All @@ -135,31 +144,34 @@ public static class ActorCreator implements Creator<OperationsServerActor> {
* @param logAppenderService
* the log appender service
*/
public ActorCreator(CacheService cacheService, OperationsService endpointService, NotificationDeltaService notificationDeltaService,
EventService eventService, ApplicationService applicationService, LogAppenderService logAppenderService) {
public ActorCreator(CacheService cacheService, OperationsService endpointService,
NotificationDeltaService notificationDeltaService, EventService eventService, ApplicationService applicationService,
LogAppenderService logAppenderService, EndpointUserService endpointUserService) {
super();
this.cacheService = cacheService;
this.operationsService = endpointService;
this.notificationDeltaService = notificationDeltaService;
this.eventService = eventService;
this.applicationService = applicationService;
this.logAppenderService = logAppenderService;
this.endpointUserService = endpointUserService;
}

/*
* (non-Javadoc)
*
*
* @see akka.japi.Creator#create()
*/
@Override
public OperationsServerActor create() throws Exception {
return new OperationsServerActor(cacheService, operationsService, notificationDeltaService, eventService, applicationService, logAppenderService);
return new OperationsServerActor(cacheService, operationsService, notificationDeltaService, eventService, applicationService,
logAppenderService, endpointUserService);
}
}

/*
* (non-Javadoc)
*
*
* @see akka.actor.UntypedActor#onReceive(java.lang.Object)
*/
@Override
Expand Down Expand Up @@ -216,16 +228,17 @@ private ActorRef getOrCreateTenantActorByAppToken(String appToken) {
private ActorRef getOrCreateTenantActorByTokenId(String tenantId) {
ActorRef tenantActor = tenants.get(tenantId);
if (tenantActor == null) {
tenantActor = context().actorOf(Props.create(new TenantActor.ActorCreator(cacheService, operationsService, notificationDeltaService, eventService,
applicationService, logAppenderService, tenantId)), tenantId);
tenantActor = context().actorOf(
Props.create(new TenantActor.ActorCreator(cacheService, operationsService, notificationDeltaService, eventService,
applicationService, logAppenderService, endpointUserService, tenantId)), tenantId);
tenants.put(tenantId, tenantActor);
}
return tenantActor;
}

/*
* (non-Javadoc)
*
*
* @see akka.actor.UntypedActor#preStart()
*/
@Override
Expand All @@ -235,7 +248,7 @@ public void preStart() {

/*
* (non-Javadoc)
*
*
* @see akka.actor.UntypedActor#postStop()
*/
@Override
Expand Down

0 comments on commit 23a29b6

Please sign in to comment.