From 9e07dd77385d590391f5e2f543553145370991a1 Mon Sep 17 00:00:00 2001 From: Kirill Liubun Date: Wed, 20 Jul 2016 20:00:38 +0300 Subject: [PATCH] Cached raw data during merging conf schema --- .../service/cache/CacheService.java | 4 +- .../concurrent/ConcurrentCacheService.java | 15 ++--- .../service/delta/DefaultDeltaService.java | 60 ++++++++++--------- .../service/event/ESTestCacheService.java | 4 +- 4 files changed, 45 insertions(+), 38 deletions(-) diff --git a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/cache/CacheService.java b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/cache/CacheService.java index 32ebf7dfff..d29689914a 100644 --- a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/cache/CacheService.java +++ b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/cache/CacheService.java @@ -35,6 +35,8 @@ import org.kaaproject.kaa.common.dto.event.ApplicationEventFamilyMapDto; import org.kaaproject.kaa.common.hash.EndpointObjectHash; import org.kaaproject.kaa.server.common.core.configuration.BaseData; +import org.kaaproject.kaa.server.common.core.configuration.RawData; +import org.kaaproject.kaa.server.common.core.structure.Pair; import org.kaaproject.kaa.server.common.dao.ApplicationEventMapService; import org.kaaproject.kaa.server.common.dao.ApplicationService; import org.kaaproject.kaa.server.common.dao.ConfigurationService; @@ -159,7 +161,7 @@ public interface CacheService { * @param worker the worker * @return the merged configuration */ - BaseData getMergedConfiguration(List egsList, Computable, BaseData> worker); + Pair getMergedConfiguration(List egsList, Computable, Pair> worker); /** * Sets the merged configuration. diff --git a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/cache/concurrent/ConcurrentCacheService.java b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/cache/concurrent/ConcurrentCacheService.java index 3012d7202e..a6a9983ffd 100644 --- a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/cache/concurrent/ConcurrentCacheService.java +++ b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/cache/concurrent/ConcurrentCacheService.java @@ -52,6 +52,8 @@ import org.kaaproject.kaa.common.dto.event.EventClassFamilyDto; import org.kaaproject.kaa.common.hash.EndpointObjectHash; import org.kaaproject.kaa.server.common.core.configuration.BaseData; +import org.kaaproject.kaa.server.common.core.configuration.RawData; +import org.kaaproject.kaa.server.common.core.structure.Pair; import org.kaaproject.kaa.server.common.dao.ApplicationEventMapService; import org.kaaproject.kaa.server.common.dao.ApplicationService; import org.kaaproject.kaa.server.common.dao.CTLService; @@ -181,7 +183,7 @@ public class ConcurrentCacheService implements CacheService { private final CacheTemporaryMemorizer endpointKeyMemorizer = new CacheTemporaryMemorizer<>(); /** The merged configuration memorizer. */ - private final CacheTemporaryMemorizer, BaseData> mergedConfigurationMemorizer = new CacheTemporaryMemorizer<>(); + private final CacheTemporaryMemorizer, Pair> mergedConfigurationMemorizer = new CacheTemporaryMemorizer<>(); /** The delta memorizer. */ private final CacheTemporaryMemorizer deltaMemorizer = new CacheTemporaryMemorizer<>(); @@ -835,14 +837,13 @@ public void resetEndpointKey(EndpointObjectHash hash, PublicKey endpointKey){ */ @Override @Cacheable(value = "mergedConfigurations", key = "#key") - public BaseData getMergedConfiguration(final List key, - final Computable, BaseData> worker) { - return mergedConfigurationMemorizer.compute(key, new Computable, BaseData>() { - + public Pair getMergedConfiguration(final List key, + final Computable, Pair> worker) { + return mergedConfigurationMemorizer.compute(key, new Computable, Pair>() { @Override - public BaseData compute(List key) { + public Pair compute(List key) { LOG.debug("Fetching result for getMergedConfiguration"); - BaseData result = worker.compute(key); + Pair result = worker.compute(key); return result; } }); diff --git a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/delta/DefaultDeltaService.java b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/delta/DefaultDeltaService.java index f86b51af86..f9647156b1 100644 --- a/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/delta/DefaultDeltaService.java +++ b/server/node/src/main/java/org/kaaproject/kaa/server/operations/service/delta/DefaultDeltaService.java @@ -17,11 +17,7 @@ package org.kaaproject.kaa.server.operations.service.delta; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import org.apache.commons.lang.StringUtils; import org.codehaus.jackson.JsonNode; @@ -50,6 +46,7 @@ import org.kaaproject.kaa.server.common.core.schema.BaseSchema; import org.kaaproject.kaa.server.common.core.schema.OverrideSchema; import org.kaaproject.kaa.server.common.core.schema.RawSchema; +import org.kaaproject.kaa.server.common.core.structure.Pair; import org.kaaproject.kaa.server.common.dao.ConfigurationService; import org.kaaproject.kaa.server.common.dao.EndpointService; import org.kaaproject.kaa.server.common.dao.UserConfigurationService; @@ -203,8 +200,7 @@ public ConfigurationCacheEntry compute(DeltaCacheKey deltaKey) { try { LOG.debug("[{}] Calculating delta for {}", endpointId, deltaKey); ConfigurationCacheEntry deltaCache; - String jsonData; - String schema; + AbstractKaaData data; ConfigurationSchemaDto latestConfigurationSchema = cacheService.getConfSchemaByAppAndVersion(deltaKey.getAppConfigVersionKey()); EndpointUserConfigurationDto userConfiguration = findLatestUserConfiguration(userId, deltaKey); @@ -214,26 +210,16 @@ public ConfigurationCacheEntry compute(DeltaCacheKey deltaKey) { userConfigurationHash = EndpointObjectHash.fromString(userConfiguration.getBody()); } - BaseData mergedConfiguration = getMergedConfiguration(endpointId, userConfiguration, deltaKey, latestConfigurationSchema); + Pair mergedConfiguration = getMergedConfiguration(endpointId, userConfiguration, deltaKey, latestConfigurationSchema); if(useConfigurationRawSchema) { - // converting merged base schema to raw schema - String ctlSchema = cacheService.getFlatCtlSchemaById(latestConfigurationSchema.getCtlSchemaId()); - AbstractKaaData kaaData = new RawData(new RawSchema(ctlSchema), mergedConfiguration.getRawData()); - JsonNode json = new ObjectMapper().readTree(kaaData.getRawData()); - AvroUtils.removeUuids(json); - jsonData = json.toString(); - schema = kaaData.getSchema().getRawSchema(); - + data = mergedConfiguration.getV2(); } else { - AbstractKaaData kaaData = mergedConfiguration; - jsonData = kaaData.getRawData(); - schema = kaaData.getSchema().getRawSchema(); - + data = mergedConfiguration.getV1(); } - LOG.trace("[{}] Merged configuration {}", endpointId, jsonData); - deltaCache = buildBaseResyncDelta(endpointId, jsonData, schema, userConfigurationHash); + LOG.trace("[{}] Merged configuration {}", endpointId, data.getRawData()); + deltaCache = buildBaseResyncDelta(endpointId, data.getRawData(), data.getSchema().getRawSchema(), userConfigurationHash); if (cacheService.getConfByHash(deltaCache.getHash()) == null) { EndpointConfigurationDto newConfiguration = new EndpointConfigurationDto(); @@ -310,14 +296,16 @@ private BaseData processEndpointGroups(List endpointGroups, Li * @return the latest conf from cache * @throws GetDeltaException */ - private BaseData getMergedConfiguration(final String endpointId, final EndpointUserConfigurationDto userConfiguration, - final DeltaCacheKey cacheKey, ConfigurationSchemaDto latestConfigurationSchema) throws GetDeltaException { + private Pair getMergedConfiguration(final String endpointId, final EndpointUserConfigurationDto userConfiguration, + final DeltaCacheKey cacheKey, ConfigurationSchemaDto latestConfigurationSchema) throws GetDeltaException { + final List egsList = cacheKey.getEndpointGroups(); - BaseData mergedConfiguration = cacheService.getMergedConfiguration(egsList, - new Computable, BaseData>() { + // return Pair in order to cache both calculated configuration and optimize performance + Pair mergedConfiguration = cacheService.getMergedConfiguration(egsList, + new Computable, Pair>() { @Override - public BaseData compute(List key) { + public Pair compute(List key) { LOG.trace("[{}] getMergedConfiguration.compute begin", endpointId); try { List endpointGroups = new ArrayList<>(); @@ -344,7 +332,15 @@ public BaseData compute(List key) { configurationSchema = configurationService.findConfSchemaById(configuration.getSchemaId()); } } - return processEndpointGroups(endpointGroups, configurations, configurationSchema); + BaseData baseData = processEndpointGroups(endpointGroups, configurations, configurationSchema); + + // converting merged base schema to raw schema + String ctlSchema = cacheService.getFlatCtlSchemaById(latestConfigurationSchema.getCtlSchemaId()); + JsonNode json = new ObjectMapper().readTree(baseData.getRawData()); + AvroUtils.removeUuids(json); + RawData rawData = new RawData(new RawSchema(ctlSchema), json.toString()); + + return new Pair<>(baseData, rawData); } catch (OverrideException | IOException oe) { LOG.error("[{}] Unexpected exception occurred while merging configuration: ", endpointId, oe); throw new RuntimeException(oe); // NOSONAR @@ -359,8 +355,14 @@ public BaseData compute(List key) { OverrideSchema overrideSchema = new OverrideSchema(latestConfigurationSchema.getOverrideSchema()); try { LOG.trace("Merging group configuration with user configuration: {}", userConfiguration.getBody()); - mergedConfiguration = configurationMerger.override(mergedConfiguration, + BaseData baseData = configurationMerger.override(mergedConfiguration.getV1(), Collections.singletonList(new OverrideData(overrideSchema, userConfiguration.getBody()))); + + JsonNode json = new ObjectMapper().readTree(baseData.getRawData()); + AvroUtils.removeUuids(json); + RawData rawData = new RawData(new RawSchema(mergedConfiguration.getV2().getSchema().getRawSchema()), baseData.getRawData()); + + mergedConfiguration = new Pair<>(baseData, rawData); } catch (OverrideException | IOException oe) { LOG.error("[{}] Unexpected exception occurred while merging configuration: ", endpointId, oe); throw new GetDeltaException(oe); diff --git a/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/ESTestCacheService.java b/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/ESTestCacheService.java index ee04459fb4..44ea9e17f6 100644 --- a/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/ESTestCacheService.java +++ b/server/node/src/test/java/org/kaaproject/kaa/server/operations/service/event/ESTestCacheService.java @@ -35,6 +35,8 @@ import org.kaaproject.kaa.common.dto.event.ApplicationEventFamilyMapDto; import org.kaaproject.kaa.common.hash.EndpointObjectHash; import org.kaaproject.kaa.server.common.core.configuration.BaseData; +import org.kaaproject.kaa.server.common.core.configuration.RawData; +import org.kaaproject.kaa.server.common.core.structure.Pair; import org.kaaproject.kaa.server.common.dao.ApplicationEventMapService; import org.kaaproject.kaa.server.common.dao.ApplicationService; import org.kaaproject.kaa.server.common.dao.ConfigurationService; @@ -346,7 +348,7 @@ public void setSdkProfileService(SdkProfileService sdkKeyService) { } @Override - public BaseData getMergedConfiguration(List egsList, Computable, BaseData> worker) { + public Pair getMergedConfiguration(List egsList, Computable, Pair> worker) { // TODO Auto-generated method stub return null; }