Skip to content

Commit

Permalink
KAA-1423: Add Endpoint Specific Configuration Support. Process of upd…
Browse files Browse the repository at this point in the history
…ating configuration fixed.
  • Loading branch information
vchizhevsky committed Dec 15, 2016
1 parent 700c731 commit 3097964
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 35 deletions.
Expand Up @@ -23,7 +23,7 @@ public interface EndpointSpecificConfigurationDao<T extends EndpointSpecificConf

void removeByEndpointKeyHashAndConfigurationVersion(byte[] endpointKeyHash, Integer confSchemaVersion);

EndpointSpecificConfiguration findByEndpointKeyHashAndConfigurationVersion(byte[] endpointKeyHash, int configurationVersion);
EndpointSpecificConfiguration findByEndpointKeyHashAndConfigurationVersion(byte[] endpointKeyHash, int configurationSchemaVersion);

EndpointSpecificConfiguration save(EndpointSpecificConfigurationDto endpointSpecificConfigurationDto);

Expand Down
Expand Up @@ -85,8 +85,8 @@ public Optional<EndpointSpecificConfigurationDto> findActiveConfigurationByEndpo
public Optional<EndpointSpecificConfigurationDto> deleteActiveConfigurationByEndpointKeyHash(byte[] endpointKeyHash) {
Optional<EndpointSpecificConfigurationDto> configuration = findActiveConfigurationByEndpointKeyHash(endpointKeyHash);
if (configuration.isPresent()) {
int confSchemaVersion = configuration.get().getConfigurationSchemaVersion();
endpointSpecificConfigurationDao.removeByEndpointKeyHashAndConfigurationVersion(endpointKeyHash, confSchemaVersion);
int profileSchemaVersion = configuration.get().getConfigurationSchemaVersion();
endpointSpecificConfigurationDao.removeByEndpointKeyHashAndConfigurationVersion(endpointKeyHash, profileSchemaVersion);
}
return configuration;
}
Expand Down
Expand Up @@ -26,19 +26,19 @@ public class EndpointSpecificConfigurationDto implements Serializable {
private byte[] endpointKeyHash;
private Integer configurationSchemaVersion;
private String configuration;
private Long version;
private Long specificConfigurationVersion;

public EndpointSpecificConfigurationDto() {
}

/**
* All-args constructor.
*/
public EndpointSpecificConfigurationDto(byte[] endpointKeyHash, Integer configurationSchemaVersion, String configuration, Long version) {
public EndpointSpecificConfigurationDto(byte[] endpointKeyHash, Integer configurationSchemaVersion, String configuration, Long specificConfigurationVersion) {
this.endpointKeyHash = endpointKeyHash;
this.configurationSchemaVersion = configurationSchemaVersion;
this.configuration = configuration;
this.version = version;
this.specificConfigurationVersion = specificConfigurationVersion;
}

public Integer getConfigurationSchemaVersion() {
Expand All @@ -65,12 +65,12 @@ public void setEndpointKeyHash(byte[] endpointKeyHash) {
this.endpointKeyHash = endpointKeyHash;
}

public Long getVersion() {
return version;
public Long getSpecificConfigurationVersion() {
return specificConfigurationVersion;
}

public void setVersion(Long version) {
this.version = version;
public void setSpecificConfigurationVersion(Long specificConfigurationVersion) {
this.specificConfigurationVersion = specificConfigurationVersion;
}

@Override
Expand Down
Expand Up @@ -21,8 +21,8 @@
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getByteBuffer;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EPS_CONFIGURATION_COLUMN_FAMILY_NAME;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EPS_CONFIGURATION_CONFIGURATION_VERSION_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EPS_CONFIGURATION_KEY_HASH_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_CONFIGURATION_VERSION_PROPERTY;

import com.datastax.driver.core.querybuilder.Delete;
import com.datastax.driver.core.querybuilder.Select;
Expand Down Expand Up @@ -66,7 +66,7 @@ public void removeByEndpointKeyHashAndConfigurationVersion(byte[] endpointKeyHas
LOG.debug("Remove endpoint specific configuration by endpointKeyHash {} and confSchemaVersion {}", endpointKeyHash, confSchemaVersion);
Delete.Where deleteQuery = delete().from(getColumnFamilyName())
.where(eq(EPS_CONFIGURATION_KEY_HASH_PROPERTY, getByteBuffer(endpointKeyHash)))
.and(eq(EPS_CONFIGURATION_CONFIGURATION_VERSION_PROPERTY, confSchemaVersion));
.and(eq(EP_CONFIGURATION_VERSION_PROPERTY, confSchemaVersion));
LOG.trace("Remove endpoint specific configuration by endpointKeyHash and confSchemaVersion query {}", deleteQuery);
execute(deleteQuery);
}
Expand All @@ -76,7 +76,7 @@ public CassandraEndpointSpecificConfiguration findByEndpointKeyHashAndConfigurat
LOG.debug("Try to find endpoint specific configuration by endpointKeyHash {} and configurationVersion {}", endpointKeyHash, configurationVersion);
Select.Where where = select().from(getColumnFamilyName())
.where(eq(EPS_CONFIGURATION_KEY_HASH_PROPERTY, getByteBuffer(endpointKeyHash)))
.and(eq(EPS_CONFIGURATION_CONFIGURATION_VERSION_PROPERTY, configurationVersion));
.and(eq(EP_CONFIGURATION_VERSION_PROPERTY, configurationVersion));
LOG.trace("Try to find endpoint specific configuration by cql select {}", where);
CassandraEndpointSpecificConfiguration configuration = findOneByStatement(where);
LOG.trace("Found {} endpoint specific configuration", configuration);
Expand All @@ -86,7 +86,16 @@ public CassandraEndpointSpecificConfiguration findByEndpointKeyHashAndConfigurat
@Override
public EndpointSpecificConfiguration save(EndpointSpecificConfigurationDto dto) {
LOG.debug("Saving endpoint specific configuration {}", dto);
EndpointSpecificConfiguration configuration = save(new CassandraEndpointSpecificConfiguration(dto));
CassandraEndpointSpecificConfiguration configuration =
findByEndpointKeyHashAndConfigurationVersion(dto.getEndpointKeyHash(), dto.getConfigurationSchemaVersion());
if (configuration != null) {
configuration.setConfiguration(dto.getConfiguration());
configuration.setConfigurationVersion(dto.getConfigurationSchemaVersion());
configuration.setEndpointKeyHash(getByteBuffer(dto.getEndpointKeyHash()));
save(configuration);
} else {
configuration = save(new CassandraEndpointSpecificConfiguration(dto));
}
if (LOG.isTraceEnabled()) {
LOG.trace("Saved: {}", configuration);
} else {
Expand Down
Expand Up @@ -20,8 +20,8 @@
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getByteBuffer;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getBytes;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EPS_CONFIGURATION_CONFIGURATION_BODY_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EPS_CONFIGURATION_CONFIGURATION_VERSION_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EPS_CONFIGURATION_KEY_HASH_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_CONFIGURATION_VERSION_PROPERTY;

import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.Column;
Expand All @@ -48,7 +48,7 @@ public final class CassandraEndpointSpecificConfiguration implements EndpointSpe
@Column(name = EPS_CONFIGURATION_KEY_HASH_PROPERTY)
private ByteBuffer endpointKeyHash;
@ClusteringColumn
@Column(name = EPS_CONFIGURATION_CONFIGURATION_VERSION_PROPERTY)
@Column(name = EP_CONFIGURATION_VERSION_PROPERTY)
private Integer configurationVersion;
@Column(name = EPS_CONFIGURATION_CONFIGURATION_BODY_PROPERTY)
private String configuration;
Expand All @@ -68,7 +68,7 @@ public CassandraEndpointSpecificConfiguration(EndpointSpecificConfigurationDto d
this.endpointKeyHash = getByteBuffer(dto.getEndpointKeyHash());
this.configurationVersion = dto.getConfigurationSchemaVersion();
this.configuration = dto.getConfiguration();
this.version = dto.getVersion();
this.version = dto.getSpecificConfigurationVersion();
}

@Override
Expand All @@ -77,7 +77,7 @@ public EndpointSpecificConfigurationDto toDto() {
dto.setEndpointKeyHash(getBytes(this.getEndpointKeyHash()));
dto.setConfiguration(this.getConfiguration());
dto.setConfigurationSchemaVersion(this.getConfigurationVersion());
dto.setVersion(this.getVersion());
dto.setSpecificConfigurationVersion(this.getVersion());
return dto;
}

Expand Down
Expand Up @@ -213,7 +213,6 @@ public class CassandraModelConstants {
*/
public static final String EPS_CONFIGURATION_COLUMN_FAMILY_NAME = "ep_specific_conf";
public static final String EPS_CONFIGURATION_KEY_HASH_PROPERTY = ENDPOINT_KEY_HASH_PROPERTY;
public static final String EPS_CONFIGURATION_CONFIGURATION_VERSION_PROPERTY = EP_CONFIGURATION_VERSION_PROPERTY;
public static final String EPS_CONFIGURATION_CONFIGURATION_BODY_PROPERTY = BODY_PROPERTY;

/**
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.kaaproject.kaa.common.dto.EndpointSpecificConfigurationDto;
import org.kaaproject.kaa.server.common.dao.exception.KaaOptimisticLockingFailureException;
import org.kaaproject.kaa.server.common.dao.model.EndpointSpecificConfiguration;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -61,7 +60,7 @@ public void testFindByEndpointKeyHashAndConfigurationVersion() throws Exception
Assert.assertNull(found4);
}

@Test(expected = KaaOptimisticLockingFailureException.class)
@Test
public void testLocking() throws Exception {
saved1 = generateEpsConfigurationDto(KEY, 1, BODY, 9L);
saved2 = generateEpsConfigurationDto(KEY, 1, BODY, 9L);
Expand Down
Expand Up @@ -16,8 +16,8 @@

package org.kaaproject.kaa.server.common.nosql.mongo.dao;

import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_CONFIGURATION_VERSION;
import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SPECIFIC_CONFIGURATION;
import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SPECIFIC_CONFIGURATION_CONFIGURATION_VERSION;
import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SPECIFIC_CONFIGURATION_KEY_HASH;
import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;
Expand All @@ -42,22 +42,31 @@ public void removeByEndpointKeyHashAndConfigurationVersion(byte[] endpointKeyHas
LOG.debug("Remove endpoint specific configuration by endpoint key hash [{}] ", endpointKeyHash);
mongoTemplate.remove(
query(where(EP_SPECIFIC_CONFIGURATION_KEY_HASH).is(endpointKeyHash)
.and(EP_SPECIFIC_CONFIGURATION_CONFIGURATION_VERSION).is(confSchemaVersion)), getCollectionName());
.and(EP_CONFIGURATION_VERSION).is(confSchemaVersion)), getCollectionName());
}

@Override
public EndpointSpecificConfiguration findByEndpointKeyHashAndConfigurationVersion(byte[] endpointKeyHash, int configurationVersion) {
LOG.debug("Try to find endpoint specific configuration by endpointKeyHash {} and configurationVersion {}", endpointKeyHash, configurationVersion);
public EndpointSpecificConfiguration findByEndpointKeyHashAndConfigurationVersion(byte[] endpointKeyHash, int configurationSchemaVersion) {
LOG.debug("Try to find endpoint specific configuration by endpointKeyHash {} and configurationVersion {}", endpointKeyHash, configurationSchemaVersion);
EndpointSpecificConfiguration configuration = findOne(query(where(EP_SPECIFIC_CONFIGURATION_KEY_HASH).is(endpointKeyHash)
.and(EP_SPECIFIC_CONFIGURATION_CONFIGURATION_VERSION).is(configurationVersion)));
.and(EP_CONFIGURATION_VERSION).is(configurationSchemaVersion)));
LOG.trace("Found {} endpoint specific configuration", configuration);
return configuration;
}

@Override
public EndpointSpecificConfiguration save(EndpointSpecificConfigurationDto dto) {
LOG.debug("Saving endpoint specific configuration {}", dto);
MongoEndpointSpecificConfiguration configuration = save(new MongoEndpointSpecificConfiguration(dto));
MongoEndpointSpecificConfiguration configuration =
(MongoEndpointSpecificConfiguration) findByEndpointKeyHashAndConfigurationVersion(dto.getEndpointKeyHash(), dto.getConfigurationSchemaVersion());
if (configuration != null) {
configuration.setConfiguration(dto.getConfiguration());
configuration.setConfigurationVersion(dto.getConfigurationSchemaVersion());
configuration.setEndpointKeyHash(dto.getEndpointKeyHash());
save(configuration);
} else {
configuration = save(new MongoEndpointSpecificConfiguration(dto));
}
if (LOG.isTraceEnabled()) {
LOG.trace("Saved: {}", configuration);
} else {
Expand Down
Expand Up @@ -16,7 +16,7 @@

package org.kaaproject.kaa.server.common.nosql.mongo.dao.model;

import static org.kaaproject.kaa.server.common.dao.DaoConstants.OPT_LOCK;
import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_CONFIGURATION_VERSION;
import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SPECIFIC_CONFIGURATION;
import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SPECIFIC_CONFIGURATION_CONFIGURATION;
import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SPECIFIC_CONFIGURATION_CONFIGURATION_VERSION;
Expand Down Expand Up @@ -46,12 +46,12 @@ public class MongoEndpointSpecificConfiguration implements EndpointSpecificConfi
@Indexed
@Field(EP_SPECIFIC_CONFIGURATION_KEY_HASH)
private byte[] endpointKeyHash;
@Field(EP_SPECIFIC_CONFIGURATION_CONFIGURATION_VERSION)
@Field(EP_CONFIGURATION_VERSION)
private Integer configurationVersion;
@Field(EP_SPECIFIC_CONFIGURATION_CONFIGURATION)
private String configuration;
@Version
@Field(OPT_LOCK)
@Field(EP_SPECIFIC_CONFIGURATION_CONFIGURATION_VERSION)
private Long version;

public MongoEndpointSpecificConfiguration() {
Expand All @@ -64,7 +64,7 @@ public MongoEndpointSpecificConfiguration(EndpointSpecificConfigurationDto dto)
this.endpointKeyHash = dto.getEndpointKeyHash();
this.configurationVersion = dto.getConfigurationSchemaVersion();
this.configuration = dto.getConfiguration();
this.version = dto.getVersion();
this.version = dto.getSpecificConfigurationVersion();
generateId();
}

Expand All @@ -74,7 +74,7 @@ public EndpointSpecificConfigurationDto toDto() {
dto.setEndpointKeyHash(this.getEndpointKeyHash());
dto.setConfiguration(this.getConfiguration());
dto.setConfigurationSchemaVersion(this.getConfigurationVersion());
dto.setVersion(this.getVersion());
dto.setSpecificConfigurationVersion(this.getVersion());
return dto;
}

Expand Down
Expand Up @@ -135,7 +135,7 @@ public class MongoModelConstants {
*/
public static final String EP_SPECIFIC_CONFIGURATION = "endpoint_specific_configuration";
public static final String EP_SPECIFIC_CONFIGURATION_KEY_HASH = EP_ENDPOINT_KEY_HASH;
public static final String EP_SPECIFIC_CONFIGURATION_CONFIGURATION_VERSION = EP_CONFIGURATION_VERSION;
public static final String EP_SPECIFIC_CONFIGURATION_CONFIGURATION_VERSION = "endpoint_specific_configuration_version";
public static final String EP_SPECIFIC_CONFIGURATION_CONFIGURATION = BODY;

private MongoModelConstants() {
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.kaaproject.kaa.common.dto.EndpointSpecificConfigurationDto;
import org.kaaproject.kaa.server.common.dao.exception.KaaOptimisticLockingFailureException;
import org.kaaproject.kaa.server.common.dao.model.EndpointSpecificConfiguration;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
Expand Down Expand Up @@ -72,7 +71,7 @@ public void testFindByEndpointKeyHashAndConfigurationVersion() throws Exception
Assert.assertNull(found4);
}

@Test(expected = KaaOptimisticLockingFailureException.class)
@Test
public void testLocking() throws Exception {
saved1 = generateEndpointSpecificConfigurationDto(KEY, 1, BODY, 8L);
saved2 = generateEndpointSpecificConfigurationDto(KEY, 1, BODY, 8L);
Expand Down

0 comments on commit 3097964

Please sign in to comment.