Skip to content

Commit

Permalink
KAA-1310: Add validation of endpoint state while receiving events
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvayka committed Sep 22, 2016
1 parent 7488ab7 commit af43446
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 31 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -870,6 +870,12 @@ Copyright 2014-2016 CyberVision, Inc.
<artifactId>akka-slf4j_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>${akka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions server/node/pom.xml
Expand Up @@ -231,6 +231,11 @@
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
Expand Down
Expand Up @@ -368,7 +368,7 @@ private EventServerSync processEventSyncResponse(String endpointId, int requestH
EventServerSync response = new EventServerSync();
List<EventListenersRequest> requests = request.getEventListenersRequests();
if (requests != null && !requests.isEmpty()) {
LOG.debug("[{}] processing {} endpoint detach requests", endpointId, requests.size());
LOG.debug("[{}] processing {} endpoint listener requests", endpointId, requests.size());
List<EventListenersResponse> responses = new ArrayList<>(requests.size());
for (EventListenersRequest elRequest : requests) {
LOG.debug("[{}] processing event listener request {}", endpointId, request);
Expand Down
Expand Up @@ -119,16 +119,23 @@ public void processEndpointSync(ActorContext context, SyncRequestMessage message

public void processEndpointEventReceiveMessage(ActorContext context, EndpointEventReceiveMessage message) {
EndpointEventDeliveryMessage response;
Set<ChannelMetaData> eventChannels = state.getChannelsByType(TransportType.EVENT);
if (!eventChannels.isEmpty()) {
for (ChannelMetaData eventChannel : eventChannels) {
addEventsAndReply(context, eventChannel, message);
if (state.isValidForEvents()) {
Set<ChannelMetaData> eventChannels = state.getChannelsByType(TransportType.EVENT);
if (!eventChannels.isEmpty()) {
for (ChannelMetaData eventChannel : eventChannels) {
addEventsAndReply(context, eventChannel, message);
}
response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.SUCCESS);
} else {
LOG.debug("[{}] Message ignored due to no channel contexts registered for events", actorKey, message);
response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.FAILURE);
state.setUserRegistrationPending(false);
}
response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.SUCCESS);
} else {
LOG.debug("[{}] Message ignored due to no channel contexts registered for events", actorKey, message);
LOG.debug(
"[{}][{}] Endpoint profile is not valid for receiving events. Either no assigned user or no event families in sdk",
endpointKey, actorKey);
response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.FAILURE);
state.setUserRegistrationPending(false);
}
tellParent(context, response);
}
Expand Down
Expand Up @@ -16,8 +16,8 @@

package org.kaaproject.kaa.server.operations.service.akka;

import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.nio.ByteBuffer;
Expand All @@ -34,6 +34,13 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -98,10 +105,13 @@
import org.kaaproject.kaa.server.operations.pojo.SyncContext;
import org.kaaproject.kaa.server.operations.pojo.exceptions.GetDeltaException;
import org.kaaproject.kaa.server.operations.service.OperationsService;
import org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.local.LocalEndpointActorMessageProcessor;
import org.kaaproject.kaa.server.operations.service.akka.messages.core.route.ActorClassifier;
import org.kaaproject.kaa.server.operations.service.akka.messages.core.route.EndpointAddress;
import org.kaaproject.kaa.server.operations.service.akka.messages.core.route.RouteOperation;
import org.kaaproject.kaa.server.operations.service.akka.messages.core.route.ThriftEndpointActorMsg;
import org.kaaproject.kaa.server.operations.service.akka.messages.core.user.EndpointEventDeliveryMessage;
import org.kaaproject.kaa.server.operations.service.akka.messages.core.user.EndpointEventReceiveMessage;
import org.kaaproject.kaa.server.operations.service.cache.AppVersionKey;
import org.kaaproject.kaa.server.operations.service.cache.CacheService;
import org.kaaproject.kaa.server.operations.service.cache.EventClassFqnKey;
Expand Down Expand Up @@ -251,10 +261,10 @@ public void before() throws GeneralSecurityException, CredentialsServiceExceptio
Mockito.when(operationsKeyStoreService.getPublicKey()).thenReturn(serverPair.getPublic());
Mockito.when(operationsKeyStoreService.getPrivateKey()).thenReturn(serverPair.getPrivate());
Mockito.when(metricsService.createMeter(Mockito.anyString(), Mockito.anyString())).thenReturn(Mockito.mock(MeterClient.class));

Mockito.when(credentialsServiceLocator.getCredentialsService(Mockito.anyString())).thenReturn(credentialsService);
Mockito.when(credentialsService.lookupCredentials(Mockito.anyString())).thenReturn(Optional.ofNullable((CredentialsDto)null));
Mockito.when(credentialsService.lookupCredentials(Mockito.anyString())).thenReturn(Optional.ofNullable((CredentialsDto) null));

registerPublicKey(clientPair.getPublic());
registerPublicKey(targetPair.getPublic());

Expand Down Expand Up @@ -359,14 +369,14 @@ public void after() {
}

private SessionInitMessage toSignedRequest(final UUID uuid, final ChannelType channelType, final ChannelContext ctx,
SyncRequest request, final MessageBuilder responseBuilder, final ErrorBuilder errorBuilder) throws Exception {
SyncRequest request, final MessageBuilder responseBuilder, final ErrorBuilder errorBuilder) throws Exception {
MessageEncoderDecoder crypt = new MessageEncoderDecoder(clientPair.getPrivate(), clientPair.getPublic(), serverPair.getPublic());
return toSignedRequest(uuid, channelType, ctx, request, responseBuilder, errorBuilder, crypt);
}

private SessionInitMessage toSignedRequest(final UUID uuid, final ChannelType channelType, final ChannelContext ctx,
SyncRequest request, final MessageBuilder responseBuilder, final ErrorBuilder errorBuilder, MessageEncoderDecoder crypt)
throws Exception {
SyncRequest request, final MessageBuilder responseBuilder, final ErrorBuilder errorBuilder, MessageEncoderDecoder crypt)
throws Exception {
AvroByteArrayConverter<SyncRequest> requestConverter = new AvroByteArrayConverter<>(SyncRequest.class);
byte[] data = requestConverter.toByteArray(request);

Expand Down Expand Up @@ -710,7 +720,7 @@ public void testLongSyncNotification() throws Exception {
request.setSyncRequestMetaData(md);

ConfigurationSyncRequest csRequest = new ConfigurationSyncRequest();
csRequest.setConfigurationHash(ByteBuffer.wrap(new byte[] {}));
csRequest.setConfigurationHash(ByteBuffer.wrap(new byte[]{}));
csRequest.setResyncOnly(true);
request.setConfigurationSyncRequest(csRequest);

Expand Down Expand Up @@ -770,7 +780,7 @@ public void testLongSyncUnicastNotification() throws Exception {

EndpointAddress address = new EndpointAddress(applicationDto.getTenantId(), applicationDto.getApplicationToken(),
EndpointObjectHash.fromBytes(clientPublicKeyHash.array()));
ActorClassifier classifier = ActorClassifier.GLOBAL;
ActorClassifier classifier = ActorClassifier.GLOBAL;
// TODO: replace nulls with values
ThriftUnicastNotificationMessage msg = new ThriftUnicastNotificationMessage(null, null, UNICAST_NOTIFICATION_ID);
clusterServiceListener.onEndpointActorMsg(new ThriftEndpointActorMsg<ThriftUnicastNotificationMessage>(address, classifier, msg));
Expand Down Expand Up @@ -1114,6 +1124,47 @@ public void testEndpointEventBasic() throws Exception {
Mockito.verify(responseBuilder, Mockito.timeout(TIMEOUT).atLeastOnce()).build(encodedData, true);
}

@Test
public void testEndpointNotAttachedEvent() throws Exception {
AkkaContext context = mock(AkkaContext.class);
ActorContext actorCtxMock = mock(ActorContext.class);
ActorRef actorMock = spy(ActorRef.class);

ActorSystem system = ActorSystem.create();
try {
final Props props = Props.create(TestActor.class);
final TestActorRef<TestActor> parentMock = TestActorRef.create(system, props, "testA");

when(actorCtxMock.self()).thenReturn(actorMock);
when(actorCtxMock.parent()).thenReturn(parentMock);
EndpointEventReceiveMessage msg = mock(EndpointEventReceiveMessage.class);
LocalEndpointActorMessageProcessor processor = spy(new LocalEndpointActorMessageProcessor(
context, APP_TOKEN, EndpointObjectHash.fromSHA1(clientPublicKeyHash.array()), "ACTOR_TOKEN"
));
processor.processEndpointEventReceiveMessage(actorCtxMock, msg);
Assert.assertEquals(
EndpointEventDeliveryMessage.EventDeliveryStatus.FAILURE,
((EndpointEventDeliveryMessage) parentMock.underlyingActor().getMsg()).getStatus());

} finally {
JavaTestKit.shutdownActorSystem(system);
}
}

private static class TestActor extends UntypedActor {
private Object msg;

@Override
public void onReceive(Object msg) throws Exception {
this.msg = (Object) msg;
}

public Object getMsg() {
return msg;
}
}


@Test
public void testEndpointEventSeqNumberBasic() throws Exception {
ChannelContext channelContextMock = Mockito.mock(ChannelContext.class);
Expand Down Expand Up @@ -1708,7 +1759,7 @@ public void testEndpointDetach() throws Exception {
// Mockito.timeout(TIMEOUT*100).atLeastOnce())
// .refreshServerEndpointProfile(EndpointObjectHash.fromBytes(clientPublicKeyHash.array()));
// }

//TODO: Implement tests that cover endpoint verification logic.

@Test
Expand All @@ -1731,8 +1782,8 @@ public void testNoEndpointCredentialsSyncRequest() throws Exception {

MessageBuilder responseBuilder = Mockito.mock(MessageBuilder.class);
ErrorBuilder errorBuilder = Mockito.mock(ErrorBuilder.class);
Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto)null));

Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto) null));

SessionInitMessage message = toSignedRequest(UUID.randomUUID(), ChannelType.SYNC, channelContextMock, request, responseBuilder,
errorBuilder);
Expand All @@ -1741,7 +1792,7 @@ public void testNoEndpointCredentialsSyncRequest() throws Exception {

Mockito.verify(errorBuilder, Mockito.timeout(TIMEOUT).atLeastOnce()).build(Mockito.any(EndpointVerificationException.class));
}

@Test
public void testRevokedEndpointCredentialsSyncRequest() throws Exception {
ChannelContext channelContextMock = Mockito.mock(ChannelContext.class);
Expand All @@ -1762,8 +1813,8 @@ public void testRevokedEndpointCredentialsSyncRequest() throws Exception {

MessageBuilder responseBuilder = Mockito.mock(MessageBuilder.class);
ErrorBuilder errorBuilder = Mockito.mock(ErrorBuilder.class);
Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto)null));

Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto) null));
Mockito.when(credentialsService.lookupCredentials(Mockito.anyString())).thenReturn(Optional.of(new CredentialsDto(new byte[]{}, CredentialsStatus.REVOKED)));

SessionInitMessage message = toSignedRequest(UUID.randomUUID(), ChannelType.SYNC, channelContextMock, request, responseBuilder,
Expand All @@ -1773,7 +1824,7 @@ public void testRevokedEndpointCredentialsSyncRequest() throws Exception {

Mockito.verify(errorBuilder, Mockito.timeout(TIMEOUT).atLeastOnce()).build(Mockito.any(EndpointVerificationException.class));
}

@Test
public void testInUseEndpointCredentialsSyncRequest() throws Exception {
ChannelContext channelContextMock = Mockito.mock(ChannelContext.class);
Expand All @@ -1794,8 +1845,8 @@ public void testInUseEndpointCredentialsSyncRequest() throws Exception {

MessageBuilder responseBuilder = Mockito.mock(MessageBuilder.class);
ErrorBuilder errorBuilder = Mockito.mock(ErrorBuilder.class);
Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto)null));

Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto) null));
Mockito.when(credentialsService.lookupCredentials(Mockito.anyString())).thenReturn(Optional.of(new CredentialsDto(new byte[]{}, CredentialsStatus.IN_USE)));

SessionInitMessage message = toSignedRequest(UUID.randomUUID(), ChannelType.SYNC, channelContextMock, request, responseBuilder,
Expand All @@ -1805,7 +1856,7 @@ public void testInUseEndpointCredentialsSyncRequest() throws Exception {

Mockito.verify(errorBuilder, Mockito.timeout(TIMEOUT).atLeastOnce()).build(Mockito.any(EndpointVerificationException.class));
}

@Test
public void testLongSyncRevocation() throws Exception {
ChannelContext channelContextMock = Mockito.mock(ChannelContext.class);
Expand All @@ -1816,7 +1867,7 @@ public void testLongSyncRevocation() throws Exception {
request.setSyncRequestMetaData(md);

ConfigurationSyncRequest csRequest = new ConfigurationSyncRequest();
csRequest.setConfigurationHash(ByteBuffer.wrap(new byte[] {}));
csRequest.setConfigurationHash(ByteBuffer.wrap(new byte[]{}));
csRequest.setResyncOnly(true);
request.setConfigurationSyncRequest(csRequest);

Expand All @@ -1840,14 +1891,14 @@ public void testLongSyncRevocation() throws Exception {

EndpointAddress address = new EndpointAddress(applicationDto.getTenantId(), applicationDto.getApplicationToken(),
EndpointObjectHash.fromBytes(clientPublicKeyHash.array()));
ActorClassifier classifier = ActorClassifier.APPLICATION;
ActorClassifier classifier = ActorClassifier.APPLICATION;

clusterServiceListener.onEndpointActorMsg(new ThriftEndpointActorMsg<ThriftEndpointDeregistrationMessage>(
address, classifier, new ThriftEndpointDeregistrationMessage()));

Mockito.verify(errorBuilder, Mockito.timeout(TIMEOUT).atLeastOnce()).build(Mockito.any(EndpointRevocationException.class));
}

private SyncRequestMetaData buildSyncRequestMetaData() {
return buildSyncRequestMetaData(clientPublicKeyHash);
}
Expand Down

0 comments on commit af43446

Please sign in to comment.