Skip to content

Commit

Permalink
Cached raw data during merging conf schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill380 committed Jul 20, 2016
1 parent be1b436 commit 9e07dd7
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 38 deletions.
Expand Up @@ -35,6 +35,8 @@
import org.kaaproject.kaa.common.dto.event.ApplicationEventFamilyMapDto;
import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.common.core.configuration.BaseData;
import org.kaaproject.kaa.server.common.core.configuration.RawData;
import org.kaaproject.kaa.server.common.core.structure.Pair;
import org.kaaproject.kaa.server.common.dao.ApplicationEventMapService;
import org.kaaproject.kaa.server.common.dao.ApplicationService;
import org.kaaproject.kaa.server.common.dao.ConfigurationService;
Expand Down Expand Up @@ -159,7 +161,7 @@ public interface CacheService {
* @param worker the worker
* @return the merged configuration
*/
BaseData getMergedConfiguration(List<EndpointGroupStateDto> egsList, Computable<List<EndpointGroupStateDto>, BaseData> worker);
Pair<BaseData, RawData> getMergedConfiguration(List<EndpointGroupStateDto> egsList, Computable<List<EndpointGroupStateDto>, Pair<BaseData, RawData>> worker);

/**
* Sets the merged configuration.
Expand Down
Expand Up @@ -52,6 +52,8 @@
import org.kaaproject.kaa.common.dto.event.EventClassFamilyDto;
import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.common.core.configuration.BaseData;
import org.kaaproject.kaa.server.common.core.configuration.RawData;
import org.kaaproject.kaa.server.common.core.structure.Pair;
import org.kaaproject.kaa.server.common.dao.ApplicationEventMapService;
import org.kaaproject.kaa.server.common.dao.ApplicationService;
import org.kaaproject.kaa.server.common.dao.CTLService;
Expand Down Expand Up @@ -181,7 +183,7 @@ public class ConcurrentCacheService implements CacheService {
private final CacheTemporaryMemorizer<EndpointObjectHash, PublicKey> endpointKeyMemorizer = new CacheTemporaryMemorizer<>();

/** The merged configuration memorizer. */
private final CacheTemporaryMemorizer<List<EndpointGroupStateDto>, BaseData> mergedConfigurationMemorizer = new CacheTemporaryMemorizer<>();
private final CacheTemporaryMemorizer<List<EndpointGroupStateDto>, Pair<BaseData, RawData>> mergedConfigurationMemorizer = new CacheTemporaryMemorizer<>();

/** The delta memorizer. */
private final CacheTemporaryMemorizer<DeltaCacheKey, ConfigurationCacheEntry> deltaMemorizer = new CacheTemporaryMemorizer<>();
Expand Down Expand Up @@ -835,14 +837,13 @@ public void resetEndpointKey(EndpointObjectHash hash, PublicKey endpointKey){
*/
@Override
@Cacheable(value = "mergedConfigurations", key = "#key")
public BaseData getMergedConfiguration(final List<EndpointGroupStateDto> key,
final Computable<List<EndpointGroupStateDto>, BaseData> worker) {
return mergedConfigurationMemorizer.compute(key, new Computable<List<EndpointGroupStateDto>, BaseData>() {

public Pair<BaseData, RawData> getMergedConfiguration(final List<EndpointGroupStateDto> key,
final Computable<List<EndpointGroupStateDto>, Pair<BaseData, RawData>> worker) {
return mergedConfigurationMemorizer.compute(key, new Computable<List<EndpointGroupStateDto>, Pair<BaseData, RawData>>() {
@Override
public BaseData compute(List<EndpointGroupStateDto> key) {
public Pair<BaseData, RawData> compute(List<EndpointGroupStateDto> key) {
LOG.debug("Fetching result for getMergedConfiguration");
BaseData result = worker.compute(key);
Pair<BaseData, RawData> result = worker.compute(key);
return result;
}
});
Expand Down
Expand Up @@ -17,11 +17,7 @@
package org.kaaproject.kaa.server.operations.service.delta;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.*;

import org.apache.commons.lang.StringUtils;
import org.codehaus.jackson.JsonNode;
Expand Down Expand Up @@ -50,6 +46,7 @@
import org.kaaproject.kaa.server.common.core.schema.BaseSchema;
import org.kaaproject.kaa.server.common.core.schema.OverrideSchema;
import org.kaaproject.kaa.server.common.core.schema.RawSchema;
import org.kaaproject.kaa.server.common.core.structure.Pair;
import org.kaaproject.kaa.server.common.dao.ConfigurationService;
import org.kaaproject.kaa.server.common.dao.EndpointService;
import org.kaaproject.kaa.server.common.dao.UserConfigurationService;
Expand Down Expand Up @@ -203,8 +200,7 @@ public ConfigurationCacheEntry compute(DeltaCacheKey deltaKey) {
try {
LOG.debug("[{}] Calculating delta for {}", endpointId, deltaKey);
ConfigurationCacheEntry deltaCache;
String jsonData;
String schema;
AbstractKaaData<?> data;
ConfigurationSchemaDto latestConfigurationSchema = cacheService.getConfSchemaByAppAndVersion(deltaKey.getAppConfigVersionKey());

EndpointUserConfigurationDto userConfiguration = findLatestUserConfiguration(userId, deltaKey);
Expand All @@ -214,26 +210,16 @@ public ConfigurationCacheEntry compute(DeltaCacheKey deltaKey) {
userConfigurationHash = EndpointObjectHash.fromString(userConfiguration.getBody());
}

BaseData mergedConfiguration = getMergedConfiguration(endpointId, userConfiguration, deltaKey, latestConfigurationSchema);
Pair<BaseData, RawData> mergedConfiguration = getMergedConfiguration(endpointId, userConfiguration, deltaKey, latestConfigurationSchema);

if(useConfigurationRawSchema) {
// converting merged base schema to raw schema
String ctlSchema = cacheService.getFlatCtlSchemaById(latestConfigurationSchema.getCtlSchemaId());
AbstractKaaData<?> kaaData = new RawData(new RawSchema(ctlSchema), mergedConfiguration.getRawData());
JsonNode json = new ObjectMapper().readTree(kaaData.getRawData());
AvroUtils.removeUuids(json);
jsonData = json.toString();
schema = kaaData.getSchema().getRawSchema();

data = mergedConfiguration.getV2();
} else {
AbstractKaaData<?> kaaData = mergedConfiguration;
jsonData = kaaData.getRawData();
schema = kaaData.getSchema().getRawSchema();

data = mergedConfiguration.getV1();
}

LOG.trace("[{}] Merged configuration {}", endpointId, jsonData);
deltaCache = buildBaseResyncDelta(endpointId, jsonData, schema, userConfigurationHash);
LOG.trace("[{}] Merged configuration {}", endpointId, data.getRawData());
deltaCache = buildBaseResyncDelta(endpointId, data.getRawData(), data.getSchema().getRawSchema(), userConfigurationHash);

if (cacheService.getConfByHash(deltaCache.getHash()) == null) {
EndpointConfigurationDto newConfiguration = new EndpointConfigurationDto();
Expand Down Expand Up @@ -310,14 +296,16 @@ private BaseData processEndpointGroups(List<EndpointGroupDto> endpointGroups, Li
* @return the latest conf from cache
* @throws GetDeltaException
*/
private BaseData getMergedConfiguration(final String endpointId, final EndpointUserConfigurationDto userConfiguration,
final DeltaCacheKey cacheKey, ConfigurationSchemaDto latestConfigurationSchema) throws GetDeltaException {
private Pair<BaseData, RawData> getMergedConfiguration(final String endpointId, final EndpointUserConfigurationDto userConfiguration,
final DeltaCacheKey cacheKey, ConfigurationSchemaDto latestConfigurationSchema) throws GetDeltaException {

final List<EndpointGroupStateDto> egsList = cacheKey.getEndpointGroups();
BaseData mergedConfiguration = cacheService.getMergedConfiguration(egsList,
new Computable<List<EndpointGroupStateDto>, BaseData>() {
// return Pair in order to cache both calculated configuration and optimize performance
Pair<BaseData, RawData> mergedConfiguration = cacheService.getMergedConfiguration(egsList,
new Computable<List<EndpointGroupStateDto>, Pair<BaseData, RawData>>() {

@Override
public BaseData compute(List<EndpointGroupStateDto> key) {
public Pair<BaseData, RawData> compute(List<EndpointGroupStateDto> key) {
LOG.trace("[{}] getMergedConfiguration.compute begin", endpointId);
try {
List<EndpointGroupDto> endpointGroups = new ArrayList<>();
Expand All @@ -344,7 +332,15 @@ public BaseData compute(List<EndpointGroupStateDto> key) {
configurationSchema = configurationService.findConfSchemaById(configuration.getSchemaId());
}
}
return processEndpointGroups(endpointGroups, configurations, configurationSchema);
BaseData baseData = processEndpointGroups(endpointGroups, configurations, configurationSchema);

// converting merged base schema to raw schema
String ctlSchema = cacheService.getFlatCtlSchemaById(latestConfigurationSchema.getCtlSchemaId());
JsonNode json = new ObjectMapper().readTree(baseData.getRawData());
AvroUtils.removeUuids(json);
RawData rawData = new RawData(new RawSchema(ctlSchema), json.toString());

return new Pair<>(baseData, rawData);
} catch (OverrideException | IOException oe) {
LOG.error("[{}] Unexpected exception occurred while merging configuration: ", endpointId, oe);
throw new RuntimeException(oe); // NOSONAR
Expand All @@ -359,8 +355,14 @@ public BaseData compute(List<EndpointGroupStateDto> key) {
OverrideSchema overrideSchema = new OverrideSchema(latestConfigurationSchema.getOverrideSchema());
try {
LOG.trace("Merging group configuration with user configuration: {}", userConfiguration.getBody());
mergedConfiguration = configurationMerger.override(mergedConfiguration,
BaseData baseData = configurationMerger.override(mergedConfiguration.getV1(),
Collections.singletonList(new OverrideData(overrideSchema, userConfiguration.getBody())));

JsonNode json = new ObjectMapper().readTree(baseData.getRawData());
AvroUtils.removeUuids(json);
RawData rawData = new RawData(new RawSchema(mergedConfiguration.getV2().getSchema().getRawSchema()), baseData.getRawData());

mergedConfiguration = new Pair<>(baseData, rawData);
} catch (OverrideException | IOException oe) {
LOG.error("[{}] Unexpected exception occurred while merging configuration: ", endpointId, oe);
throw new GetDeltaException(oe);
Expand Down
Expand Up @@ -35,6 +35,8 @@
import org.kaaproject.kaa.common.dto.event.ApplicationEventFamilyMapDto;
import org.kaaproject.kaa.common.hash.EndpointObjectHash;
import org.kaaproject.kaa.server.common.core.configuration.BaseData;
import org.kaaproject.kaa.server.common.core.configuration.RawData;
import org.kaaproject.kaa.server.common.core.structure.Pair;
import org.kaaproject.kaa.server.common.dao.ApplicationEventMapService;
import org.kaaproject.kaa.server.common.dao.ApplicationService;
import org.kaaproject.kaa.server.common.dao.ConfigurationService;
Expand Down Expand Up @@ -346,7 +348,7 @@ public void setSdkProfileService(SdkProfileService sdkKeyService) {
}

@Override
public BaseData getMergedConfiguration(List<EndpointGroupStateDto> egsList, Computable<List<EndpointGroupStateDto>, BaseData> worker) {
public Pair<BaseData, RawData> getMergedConfiguration(List<EndpointGroupStateDto> egsList, Computable<List<EndpointGroupStateDto>, Pair<BaseData, RawData>> worker) {
// TODO Auto-generated method stub
return null;
}
Expand Down

0 comments on commit 9e07dd7

Please sign in to comment.