From af43446e280c6f6001e28adcc83a234f3b943ba9 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Thu, 22 Sep 2016 15:00:14 +0300 Subject: [PATCH] KAA-1310: Add validation of endpoint state while receiving events --- pom.xml | 6 ++ server/node/pom.xml | 5 + .../service/DefaultOperationsService.java | 2 +- .../LocalEndpointActorMessageProcessor.java | 21 ++-- .../service/akka/DefaultAkkaServiceTest.java | 97 ++++++++++++++----- 5 files changed, 100 insertions(+), 31 deletions(-) diff --git a/pom.xml b/pom.xml index e35fe02df9..0835ceee0f 100644 --- a/pom.xml +++ b/pom.xml @@ -870,6 +870,12 @@ Copyright 2014-2016 CyberVision, Inc. akka-slf4j_2.11 ${akka.version} + + com.typesafe.akka + akka-testkit_2.11 + ${akka.version} + test + net.sf.ehcache ehcache diff --git a/server/node/pom.xml b/server/node/pom.xml index 3ed06a9421..c4102a8b06 100644 --- a/server/node/pom.xml +++ b/server/node/pom.xml @@ -231,6 +231,11 @@ com.typesafe.akka akka-slf4j_2.11 + + com.typesafe.akka + akka-testkit_2.11 + test + de.flapdoodle.embed de.flapdoodle.embed.mongo diff --git a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/DefaultOperationsService.java b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/DefaultOperationsService.java index daff3835e1..40fe591ed5 100644 --- a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/DefaultOperationsService.java +++ b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/DefaultOperationsService.java @@ -368,7 +368,7 @@ private EventServerSync processEventSyncResponse(String endpointId, int requestH EventServerSync response = new EventServerSync(); List requests = request.getEventListenersRequests(); if (requests != null && !requests.isEmpty()) { - LOG.debug("[{}] processing {} endpoint detach requests", endpointId, requests.size()); + LOG.debug("[{}] processing {} endpoint listener requests", endpointId, requests.size()); List responses = new ArrayList<>(requests.size()); for (EventListenersRequest elRequest : requests) { LOG.debug("[{}] processing event listener request {}", endpointId, request); diff --git a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/akka/actors/core/endpoint/local/LocalEndpointActorMessageProcessor.java b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/akka/actors/core/endpoint/local/LocalEndpointActorMessageProcessor.java index ab083736df..508c142e6a 100644 --- a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/akka/actors/core/endpoint/local/LocalEndpointActorMessageProcessor.java +++ b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/akka/actors/core/endpoint/local/LocalEndpointActorMessageProcessor.java @@ -119,16 +119,23 @@ public void processEndpointSync(ActorContext context, SyncRequestMessage message public void processEndpointEventReceiveMessage(ActorContext context, EndpointEventReceiveMessage message) { EndpointEventDeliveryMessage response; - Set eventChannels = state.getChannelsByType(TransportType.EVENT); - if (!eventChannels.isEmpty()) { - for (ChannelMetaData eventChannel : eventChannels) { - addEventsAndReply(context, eventChannel, message); + if (state.isValidForEvents()) { + Set eventChannels = state.getChannelsByType(TransportType.EVENT); + if (!eventChannels.isEmpty()) { + for (ChannelMetaData eventChannel : eventChannels) { + addEventsAndReply(context, eventChannel, message); + } + response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.SUCCESS); + } else { + LOG.debug("[{}] Message ignored due to no channel contexts registered for events", actorKey, message); + response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.FAILURE); + state.setUserRegistrationPending(false); } - response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.SUCCESS); } else { - LOG.debug("[{}] Message ignored due to no channel contexts registered for events", actorKey, message); + LOG.debug( + "[{}][{}] Endpoint profile is not valid for receiving events. Either no assigned user or no event families in sdk", + endpointKey, actorKey); response = new EndpointEventDeliveryMessage(message, EventDeliveryStatus.FAILURE); - state.setUserRegistrationPending(false); } tellParent(context, response); } diff --git a/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/akka/DefaultAkkaServiceTest.java b/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/akka/DefaultAkkaServiceTest.java index 207d8bff16..6a487373d2 100644 --- a/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/akka/DefaultAkkaServiceTest.java +++ b/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/akka/DefaultAkkaServiceTest.java @@ -16,8 +16,8 @@ package org.kaaproject.kaa.server.operations.service.akka; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.nio.ByteBuffer; @@ -34,6 +34,13 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; +import akka.actor.ActorContext; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -98,10 +105,13 @@ import org.kaaproject.kaa.server.operations.pojo.SyncContext; import org.kaaproject.kaa.server.operations.pojo.exceptions.GetDeltaException; import org.kaaproject.kaa.server.operations.service.OperationsService; +import org.kaaproject.kaa.server.operations.service.akka.actors.core.endpoint.local.LocalEndpointActorMessageProcessor; import org.kaaproject.kaa.server.operations.service.akka.messages.core.route.ActorClassifier; import org.kaaproject.kaa.server.operations.service.akka.messages.core.route.EndpointAddress; import org.kaaproject.kaa.server.operations.service.akka.messages.core.route.RouteOperation; import org.kaaproject.kaa.server.operations.service.akka.messages.core.route.ThriftEndpointActorMsg; +import org.kaaproject.kaa.server.operations.service.akka.messages.core.user.EndpointEventDeliveryMessage; +import org.kaaproject.kaa.server.operations.service.akka.messages.core.user.EndpointEventReceiveMessage; import org.kaaproject.kaa.server.operations.service.cache.AppVersionKey; import org.kaaproject.kaa.server.operations.service.cache.CacheService; import org.kaaproject.kaa.server.operations.service.cache.EventClassFqnKey; @@ -251,10 +261,10 @@ public void before() throws GeneralSecurityException, CredentialsServiceExceptio Mockito.when(operationsKeyStoreService.getPublicKey()).thenReturn(serverPair.getPublic()); Mockito.when(operationsKeyStoreService.getPrivateKey()).thenReturn(serverPair.getPrivate()); Mockito.when(metricsService.createMeter(Mockito.anyString(), Mockito.anyString())).thenReturn(Mockito.mock(MeterClient.class)); - + Mockito.when(credentialsServiceLocator.getCredentialsService(Mockito.anyString())).thenReturn(credentialsService); - Mockito.when(credentialsService.lookupCredentials(Mockito.anyString())).thenReturn(Optional.ofNullable((CredentialsDto)null)); - + Mockito.when(credentialsService.lookupCredentials(Mockito.anyString())).thenReturn(Optional.ofNullable((CredentialsDto) null)); + registerPublicKey(clientPair.getPublic()); registerPublicKey(targetPair.getPublic()); @@ -359,14 +369,14 @@ public void after() { } private SessionInitMessage toSignedRequest(final UUID uuid, final ChannelType channelType, final ChannelContext ctx, - SyncRequest request, final MessageBuilder responseBuilder, final ErrorBuilder errorBuilder) throws Exception { + SyncRequest request, final MessageBuilder responseBuilder, final ErrorBuilder errorBuilder) throws Exception { MessageEncoderDecoder crypt = new MessageEncoderDecoder(clientPair.getPrivate(), clientPair.getPublic(), serverPair.getPublic()); return toSignedRequest(uuid, channelType, ctx, request, responseBuilder, errorBuilder, crypt); } private SessionInitMessage toSignedRequest(final UUID uuid, final ChannelType channelType, final ChannelContext ctx, - SyncRequest request, final MessageBuilder responseBuilder, final ErrorBuilder errorBuilder, MessageEncoderDecoder crypt) - throws Exception { + SyncRequest request, final MessageBuilder responseBuilder, final ErrorBuilder errorBuilder, MessageEncoderDecoder crypt) + throws Exception { AvroByteArrayConverter requestConverter = new AvroByteArrayConverter<>(SyncRequest.class); byte[] data = requestConverter.toByteArray(request); @@ -710,7 +720,7 @@ public void testLongSyncNotification() throws Exception { request.setSyncRequestMetaData(md); ConfigurationSyncRequest csRequest = new ConfigurationSyncRequest(); - csRequest.setConfigurationHash(ByteBuffer.wrap(new byte[] {})); + csRequest.setConfigurationHash(ByteBuffer.wrap(new byte[]{})); csRequest.setResyncOnly(true); request.setConfigurationSyncRequest(csRequest); @@ -770,7 +780,7 @@ public void testLongSyncUnicastNotification() throws Exception { EndpointAddress address = new EndpointAddress(applicationDto.getTenantId(), applicationDto.getApplicationToken(), EndpointObjectHash.fromBytes(clientPublicKeyHash.array())); - ActorClassifier classifier = ActorClassifier.GLOBAL; + ActorClassifier classifier = ActorClassifier.GLOBAL; // TODO: replace nulls with values ThriftUnicastNotificationMessage msg = new ThriftUnicastNotificationMessage(null, null, UNICAST_NOTIFICATION_ID); clusterServiceListener.onEndpointActorMsg(new ThriftEndpointActorMsg(address, classifier, msg)); @@ -1114,6 +1124,47 @@ public void testEndpointEventBasic() throws Exception { Mockito.verify(responseBuilder, Mockito.timeout(TIMEOUT).atLeastOnce()).build(encodedData, true); } + @Test + public void testEndpointNotAttachedEvent() throws Exception { + AkkaContext context = mock(AkkaContext.class); + ActorContext actorCtxMock = mock(ActorContext.class); + ActorRef actorMock = spy(ActorRef.class); + + ActorSystem system = ActorSystem.create(); + try { + final Props props = Props.create(TestActor.class); + final TestActorRef parentMock = TestActorRef.create(system, props, "testA"); + + when(actorCtxMock.self()).thenReturn(actorMock); + when(actorCtxMock.parent()).thenReturn(parentMock); + EndpointEventReceiveMessage msg = mock(EndpointEventReceiveMessage.class); + LocalEndpointActorMessageProcessor processor = spy(new LocalEndpointActorMessageProcessor( + context, APP_TOKEN, EndpointObjectHash.fromSHA1(clientPublicKeyHash.array()), "ACTOR_TOKEN" + )); + processor.processEndpointEventReceiveMessage(actorCtxMock, msg); + Assert.assertEquals( + EndpointEventDeliveryMessage.EventDeliveryStatus.FAILURE, + ((EndpointEventDeliveryMessage) parentMock.underlyingActor().getMsg()).getStatus()); + + } finally { + JavaTestKit.shutdownActorSystem(system); + } + } + + private static class TestActor extends UntypedActor { + private Object msg; + + @Override + public void onReceive(Object msg) throws Exception { + this.msg = (Object) msg; + } + + public Object getMsg() { + return msg; + } + } + + @Test public void testEndpointEventSeqNumberBasic() throws Exception { ChannelContext channelContextMock = Mockito.mock(ChannelContext.class); @@ -1708,7 +1759,7 @@ public void testEndpointDetach() throws Exception { // Mockito.timeout(TIMEOUT*100).atLeastOnce()) // .refreshServerEndpointProfile(EndpointObjectHash.fromBytes(clientPublicKeyHash.array())); // } - + //TODO: Implement tests that cover endpoint verification logic. @Test @@ -1731,8 +1782,8 @@ public void testNoEndpointCredentialsSyncRequest() throws Exception { MessageBuilder responseBuilder = Mockito.mock(MessageBuilder.class); ErrorBuilder errorBuilder = Mockito.mock(ErrorBuilder.class); - - Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto)null)); + + Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto) null)); SessionInitMessage message = toSignedRequest(UUID.randomUUID(), ChannelType.SYNC, channelContextMock, request, responseBuilder, errorBuilder); @@ -1741,7 +1792,7 @@ public void testNoEndpointCredentialsSyncRequest() throws Exception { Mockito.verify(errorBuilder, Mockito.timeout(TIMEOUT).atLeastOnce()).build(Mockito.any(EndpointVerificationException.class)); } - + @Test public void testRevokedEndpointCredentialsSyncRequest() throws Exception { ChannelContext channelContextMock = Mockito.mock(ChannelContext.class); @@ -1762,8 +1813,8 @@ public void testRevokedEndpointCredentialsSyncRequest() throws Exception { MessageBuilder responseBuilder = Mockito.mock(MessageBuilder.class); ErrorBuilder errorBuilder = Mockito.mock(ErrorBuilder.class); - - Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto)null)); + + Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto) null)); Mockito.when(credentialsService.lookupCredentials(Mockito.anyString())).thenReturn(Optional.of(new CredentialsDto(new byte[]{}, CredentialsStatus.REVOKED))); SessionInitMessage message = toSignedRequest(UUID.randomUUID(), ChannelType.SYNC, channelContextMock, request, responseBuilder, @@ -1773,7 +1824,7 @@ public void testRevokedEndpointCredentialsSyncRequest() throws Exception { Mockito.verify(errorBuilder, Mockito.timeout(TIMEOUT).atLeastOnce()).build(Mockito.any(EndpointVerificationException.class)); } - + @Test public void testInUseEndpointCredentialsSyncRequest() throws Exception { ChannelContext channelContextMock = Mockito.mock(ChannelContext.class); @@ -1794,8 +1845,8 @@ public void testInUseEndpointCredentialsSyncRequest() throws Exception { MessageBuilder responseBuilder = Mockito.mock(MessageBuilder.class); ErrorBuilder errorBuilder = Mockito.mock(ErrorBuilder.class); - - Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto)null)); + + Mockito.when(registrationService.findEndpointRegistrationByCredentialsId(Mockito.anyString())).thenReturn(Optional.ofNullable((EndpointRegistrationDto) null)); Mockito.when(credentialsService.lookupCredentials(Mockito.anyString())).thenReturn(Optional.of(new CredentialsDto(new byte[]{}, CredentialsStatus.IN_USE))); SessionInitMessage message = toSignedRequest(UUID.randomUUID(), ChannelType.SYNC, channelContextMock, request, responseBuilder, @@ -1805,7 +1856,7 @@ public void testInUseEndpointCredentialsSyncRequest() throws Exception { Mockito.verify(errorBuilder, Mockito.timeout(TIMEOUT).atLeastOnce()).build(Mockito.any(EndpointVerificationException.class)); } - + @Test public void testLongSyncRevocation() throws Exception { ChannelContext channelContextMock = Mockito.mock(ChannelContext.class); @@ -1816,7 +1867,7 @@ public void testLongSyncRevocation() throws Exception { request.setSyncRequestMetaData(md); ConfigurationSyncRequest csRequest = new ConfigurationSyncRequest(); - csRequest.setConfigurationHash(ByteBuffer.wrap(new byte[] {})); + csRequest.setConfigurationHash(ByteBuffer.wrap(new byte[]{})); csRequest.setResyncOnly(true); request.setConfigurationSyncRequest(csRequest); @@ -1840,14 +1891,14 @@ public void testLongSyncRevocation() throws Exception { EndpointAddress address = new EndpointAddress(applicationDto.getTenantId(), applicationDto.getApplicationToken(), EndpointObjectHash.fromBytes(clientPublicKeyHash.array())); - ActorClassifier classifier = ActorClassifier.APPLICATION; - + ActorClassifier classifier = ActorClassifier.APPLICATION; + clusterServiceListener.onEndpointActorMsg(new ThriftEndpointActorMsg( address, classifier, new ThriftEndpointDeregistrationMessage())); Mockito.verify(errorBuilder, Mockito.timeout(TIMEOUT).atLeastOnce()).build(Mockito.any(EndpointRevocationException.class)); } - + private SyncRequestMetaData buildSyncRequestMetaData() { return buildSyncRequestMetaData(clientPublicKeyHash); }