diff --git a/server/common/dto/src/main/java/org/kaaproject/kaa/common/dto/EndpointSpecificConfigurationDto.java b/server/common/dto/src/main/java/org/kaaproject/kaa/common/dto/EndpointSpecificConfigurationDto.java index 0d2fabad81..6b6538d138 100644 --- a/server/common/dto/src/main/java/org/kaaproject/kaa/common/dto/EndpointSpecificConfigurationDto.java +++ b/server/common/dto/src/main/java/org/kaaproject/kaa/common/dto/EndpointSpecificConfigurationDto.java @@ -26,7 +26,7 @@ public class EndpointSpecificConfigurationDto implements Serializable { private byte[] endpointKeyHash; private Integer configurationSchemaVersion; private String configuration; - private Long specificConfigurationVersion; + private Long version; public EndpointSpecificConfigurationDto() { } @@ -34,11 +34,11 @@ public EndpointSpecificConfigurationDto() { /** * All-args constructor. */ - public EndpointSpecificConfigurationDto(byte[] endpointKeyHash, Integer configurationSchemaVersion, String configuration, Long specificConfigurationVersion) { + public EndpointSpecificConfigurationDto(byte[] endpointKeyHash, Integer configurationSchemaVersion, String configuration, Long version) { this.endpointKeyHash = endpointKeyHash; this.configurationSchemaVersion = configurationSchemaVersion; this.configuration = configuration; - this.specificConfigurationVersion = specificConfigurationVersion; + this.version = version; } public Integer getConfigurationSchemaVersion() { @@ -65,12 +65,12 @@ public void setEndpointKeyHash(byte[] endpointKeyHash) { this.endpointKeyHash = endpointKeyHash; } - public Long getSpecificConfigurationVersion() { - return specificConfigurationVersion; + public Long getVersion() { + return version; } - public void setSpecificConfigurationVersion(Long specificConfigurationVersion) { - this.specificConfigurationVersion = specificConfigurationVersion; + public void setVersion(Long version) { + this.version = version; } @Override diff --git a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointSpecificConfigurationCassandraDao.java b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointSpecificConfigurationCassandraDao.java index 4de2ea331b..5190b58228 100644 --- a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointSpecificConfigurationCassandraDao.java +++ b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointSpecificConfigurationCassandraDao.java @@ -89,13 +89,9 @@ public EndpointSpecificConfiguration save(EndpointSpecificConfigurationDto dto) CassandraEndpointSpecificConfiguration configuration = findByEndpointKeyHashAndConfigurationVersion(dto.getEndpointKeyHash(), dto.getConfigurationSchemaVersion()); if (configuration != null) { - configuration.setConfiguration(dto.getConfiguration()); - configuration.setConfigurationVersion(dto.getConfigurationSchemaVersion()); - configuration.setEndpointKeyHash(getByteBuffer(dto.getEndpointKeyHash())); - save(configuration); - } else { - configuration = save(new CassandraEndpointSpecificConfiguration(dto)); + dto.setVersion(configuration.getVersion()); } + configuration = save(new CassandraEndpointSpecificConfiguration(dto)); if (LOG.isTraceEnabled()) { LOG.trace("Saved: {}", configuration); } else { diff --git a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/model/CassandraEndpointSpecificConfiguration.java b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/model/CassandraEndpointSpecificConfiguration.java index 4585d7be90..e18c3358e5 100644 --- a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/model/CassandraEndpointSpecificConfiguration.java +++ b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/model/CassandraEndpointSpecificConfiguration.java @@ -68,7 +68,7 @@ public CassandraEndpointSpecificConfiguration(EndpointSpecificConfigurationDto d this.endpointKeyHash = getByteBuffer(dto.getEndpointKeyHash()); this.configurationVersion = dto.getConfigurationSchemaVersion(); this.configuration = dto.getConfiguration(); - this.version = dto.getSpecificConfigurationVersion(); + this.version = dto.getVersion(); } @Override @@ -77,7 +77,7 @@ public EndpointSpecificConfigurationDto toDto() { dto.setEndpointKeyHash(getBytes(this.getEndpointKeyHash())); dto.setConfiguration(this.getConfiguration()); dto.setConfigurationSchemaVersion(this.getConfigurationVersion()); - dto.setSpecificConfigurationVersion(this.getVersion()); + dto.setVersion(this.getVersion()); return dto; } diff --git a/server/common/nosql/cassandra-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointSpecificConfigurationCassandraDaoTest.java b/server/common/nosql/cassandra-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointSpecificConfigurationCassandraDaoTest.java index 25adba6d5f..354ea0c0af 100644 --- a/server/common/nosql/cassandra-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointSpecificConfigurationCassandraDaoTest.java +++ b/server/common/nosql/cassandra-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointSpecificConfigurationCassandraDaoTest.java @@ -22,11 +22,19 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.kaaproject.kaa.common.dto.EndpointSpecificConfigurationDto; +import org.kaaproject.kaa.server.common.dao.exception.KaaOptimisticLockingFailureException; import org.kaaproject.kaa.server.common.dao.model.EndpointSpecificConfiguration; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "/cassandra-client-test-context.xml") @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) @@ -38,6 +46,7 @@ public class EndpointSpecificConfigurationCassandraDaoTest extends AbstractCassa private EndpointSpecificConfigurationDto saved1; private EndpointSpecificConfigurationDto saved2; private EndpointSpecificConfigurationDto saved3; + private ExecutorService executorService = Executors.newFixedThreadPool(10); @Test public void testRemoveByEndpointKeyHashAndConfigurationVersion() throws Exception { @@ -60,10 +69,21 @@ public void testFindByEndpointKeyHashAndConfigurationVersion() throws Exception Assert.assertNull(found4); } - @Test - public void testLocking() throws Exception { - saved1 = generateEpsConfigurationDto(KEY, 1, BODY, 9L); - saved2 = generateEpsConfigurationDto(KEY, 1, BODY, 9L); + @Test(expected = KaaOptimisticLockingFailureException.class) + public void testLocking() throws Throwable { + List> tasks = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + tasks.add(executorService.submit((Runnable) () -> { + endpointSpecificConfigurationDao.save(saved1); + })); + } + for (Future future : tasks) { + try { + future.get(); + } catch (ExecutionException ex) { + throw ex.getCause(); + } + } } @Before diff --git a/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointSpecificConfigurationMongoDao.java b/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointSpecificConfigurationMongoDao.java index 9b4cbf7607..9fcd7b3f89 100644 --- a/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointSpecificConfigurationMongoDao.java +++ b/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointSpecificConfigurationMongoDao.java @@ -60,13 +60,9 @@ public EndpointSpecificConfiguration save(EndpointSpecificConfigurationDto dto) MongoEndpointSpecificConfiguration configuration = (MongoEndpointSpecificConfiguration) findByEndpointKeyHashAndConfigurationVersion(dto.getEndpointKeyHash(), dto.getConfigurationSchemaVersion()); if (configuration != null) { - configuration.setConfiguration(dto.getConfiguration()); - configuration.setConfigurationVersion(dto.getConfigurationSchemaVersion()); - configuration.setEndpointKeyHash(dto.getEndpointKeyHash()); - save(configuration); - } else { - configuration = save(new MongoEndpointSpecificConfiguration(dto)); + dto.setVersion(configuration.getVersion()); } + configuration = save(new MongoEndpointSpecificConfiguration(dto)); if (LOG.isTraceEnabled()) { LOG.trace("Saved: {}", configuration); } else { diff --git a/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoEndpointSpecificConfiguration.java b/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoEndpointSpecificConfiguration.java index f184fd28a9..b058bcc7f3 100644 --- a/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoEndpointSpecificConfiguration.java +++ b/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoEndpointSpecificConfiguration.java @@ -64,7 +64,7 @@ public MongoEndpointSpecificConfiguration(EndpointSpecificConfigurationDto dto) this.endpointKeyHash = dto.getEndpointKeyHash(); this.configurationVersion = dto.getConfigurationSchemaVersion(); this.configuration = dto.getConfiguration(); - this.version = dto.getSpecificConfigurationVersion(); + this.version = dto.getVersion(); generateId(); } @@ -74,7 +74,7 @@ public EndpointSpecificConfigurationDto toDto() { dto.setEndpointKeyHash(this.getEndpointKeyHash()); dto.setConfiguration(this.getConfiguration()); dto.setConfigurationSchemaVersion(this.getConfigurationVersion()); - dto.setSpecificConfigurationVersion(this.getVersion()); + dto.setVersion(this.getVersion()); return dto; } diff --git a/server/common/nosql/mongo-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointSpecificConfigurationMongoDaoTest.java b/server/common/nosql/mongo-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointSpecificConfigurationMongoDaoTest.java index db8c8171f4..169f670dfa 100644 --- a/server/common/nosql/mongo-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointSpecificConfigurationMongoDaoTest.java +++ b/server/common/nosql/mongo-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointSpecificConfigurationMongoDaoTest.java @@ -24,11 +24,19 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.kaaproject.kaa.common.dto.EndpointSpecificConfigurationDto; +import org.kaaproject.kaa.server.common.dao.exception.KaaOptimisticLockingFailureException; import org.kaaproject.kaa.server.common.dao.model.EndpointSpecificConfiguration; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "/mongo-dao-test-context.xml") @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) @@ -39,6 +47,7 @@ public class EndpointSpecificConfigurationMongoDaoTest extends AbstractMongoTest private EndpointSpecificConfigurationDto saved1; private EndpointSpecificConfigurationDto saved2; private EndpointSpecificConfigurationDto saved3; + private ExecutorService executorService = Executors.newFixedThreadPool(10); @BeforeClass public static void init() throws Exception { @@ -71,10 +80,21 @@ public void testFindByEndpointKeyHashAndConfigurationVersion() throws Exception Assert.assertNull(found4); } - @Test - public void testLocking() throws Exception { - saved1 = generateEndpointSpecificConfigurationDto(KEY, 1, BODY, 8L); - saved2 = generateEndpointSpecificConfigurationDto(KEY, 1, BODY, 8L); + @Test(expected = KaaOptimisticLockingFailureException.class) + public void testLocking() throws Throwable { + List> tasks = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + tasks.add(executorService.submit((Runnable) () -> { + endpointSpecificConfigurationDao.save(saved1); + })); + } + for (Future future : tasks) { + try { + future.get(); + } catch (ExecutionException ex) { + throw ex.getCause(); + } + } } @Before