diff --git a/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/DaoConstants.java b/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/DaoConstants.java index bde3e5ae51..cb31555452 100644 --- a/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/DaoConstants.java +++ b/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/DaoConstants.java @@ -415,6 +415,8 @@ public class DaoConstants { public static final String LAST_PAGE_MESSAGE = "It is the last page"; public static final String PROFILE = "profile"; + public static final String OPT_LOCK = "opt_lock"; + public static final String APPLIED = "[applied]"; private DaoConstants() { throw new UnsupportedOperationException("Not supported"); diff --git a/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/impl/EndpointProfileDao.java b/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/impl/EndpointProfileDao.java index c1a6e32b60..dbe8c0c228 100644 --- a/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/impl/EndpointProfileDao.java +++ b/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/impl/EndpointProfileDao.java @@ -33,8 +33,20 @@ */ public interface EndpointProfileDao extends Dao { + /** + * + * @param dto + * @return + */ T save(EndpointProfileDto dto); + /** + * + * @param endpointKeyHash + * @return + */ + Long findVersionByKey(byte[] endpointKeyHash); + /** * Find endpoint profile by key hash. * @@ -73,7 +85,7 @@ public interface EndpointProfileDao extends Dao extends Dao findByEndpointUserId(String endpointUserId); - /** - * @deprecated The functionality of this method is not yet necessary. - */ - List findBySdkToken(String sdkToken); - /** * Checks whether there are any endpoint profiles with the given SDK token. * diff --git a/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/lock/KaaOptimisticLockingFailureException.java b/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/lock/KaaOptimisticLockingFailureException.java new file mode 100644 index 0000000000..098ccf7ad8 --- /dev/null +++ b/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/lock/KaaOptimisticLockingFailureException.java @@ -0,0 +1,14 @@ +package org.kaaproject.kaa.server.common.dao.lock; + +import org.springframework.dao.OptimisticLockingFailureException; + +public class KaaOptimisticLockingFailureException extends OptimisticLockingFailureException { + + public KaaOptimisticLockingFailureException(String msg, Throwable cause) { + super(msg, cause); + } + + public KaaOptimisticLockingFailureException(String msg) { + super(msg); + } +} diff --git a/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/lock/Retry.java b/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/lock/Retry.java new file mode 100644 index 0000000000..4bccbf80c1 --- /dev/null +++ b/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/lock/Retry.java @@ -0,0 +1,25 @@ +package org.kaaproject.kaa.server.common.dao.lock; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface Retry { + + /** + * The number of retry attempts + * + * @return retry attempts + */ + int times() default 1; + + /** + * Declare the exception types the retry will be issued on. + * + * @return exception types causing a retry + */ + Class[] on(); +} diff --git a/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/service/EndpointServiceImpl.java b/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/service/EndpointServiceImpl.java index 1136afad48..f06a716c13 100644 --- a/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/service/EndpointServiceImpl.java +++ b/server/common/dao/src/main/java/org/kaaproject/kaa/server/common/dao/service/EndpointServiceImpl.java @@ -16,21 +16,6 @@ package org.kaaproject.kaa.server.common.dao.service; -import static org.apache.commons.lang.StringUtils.isBlank; -import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.convertDtoList; -import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.getDto; -import static org.kaaproject.kaa.server.common.dao.service.Validator.isValidId; -import static org.kaaproject.kaa.server.common.dao.service.Validator.isValidObject; -import static org.kaaproject.kaa.server.common.dao.service.Validator.validateHash; -import static org.kaaproject.kaa.server.common.dao.service.Validator.validateObject; -import static org.kaaproject.kaa.server.common.dao.service.Validator.validateSqlId; -import static org.kaaproject.kaa.server.common.dao.service.Validator.validateSqlObject; -import static org.kaaproject.kaa.server.common.dao.service.Validator.validateString; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - import org.apache.commons.lang.StringUtils; import org.kaaproject.kaa.common.dto.ChangeDto; import org.kaaproject.kaa.common.dto.ChangeNotificationDto; @@ -59,6 +44,8 @@ import org.kaaproject.kaa.server.common.dao.impl.EndpointProfileDao; import org.kaaproject.kaa.server.common.dao.impl.EndpointUserDao; import org.kaaproject.kaa.server.common.dao.impl.ProfileFilterDao; +import org.kaaproject.kaa.server.common.dao.lock.KaaOptimisticLockingFailureException; +import org.kaaproject.kaa.server.common.dao.lock.Retry; import org.kaaproject.kaa.server.common.dao.model.EndpointConfiguration; import org.kaaproject.kaa.server.common.dao.model.EndpointProfile; import org.kaaproject.kaa.server.common.dao.model.EndpointUser; @@ -71,6 +58,20 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.commons.lang.StringUtils.isBlank; +import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.convertDtoList; +import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.getDto; +import static org.kaaproject.kaa.server.common.dao.service.Validator.isValidId; +import static org.kaaproject.kaa.server.common.dao.service.Validator.isValidObject; +import static org.kaaproject.kaa.server.common.dao.service.Validator.validateHash; +import static org.kaaproject.kaa.server.common.dao.service.Validator.validateObject; +import static org.kaaproject.kaa.server.common.dao.service.Validator.validateSqlId; +import static org.kaaproject.kaa.server.common.dao.service.Validator.validateSqlObject; +import static org.kaaproject.kaa.server.common.dao.service.Validator.validateString; + @Service public class EndpointServiceImpl implements EndpointService { @@ -271,14 +272,13 @@ public void removeEndpointProfileByAppId(String appId) { } @Override - @Transactional - public EndpointProfileDto saveEndpointProfile(EndpointProfileDto endpointProfileDto) { - validateObject(endpointProfileDto, "Can't find endpoint profile object. Invalid endpoint profile object" - + endpointProfileDto); + @Retry(times = 5, on = org.springframework.dao.OptimisticLockingFailureException.class) + public EndpointProfileDto saveEndpointProfile(EndpointProfileDto endpointProfileDto) throws KaaOptimisticLockingFailureException { + EndpointProfileDto profileDto = null; + validateObject(endpointProfileDto, "Can't find endpoint profile object. Invalid endpoint profile object" + endpointProfileDto); byte[] keyHash = endpointProfileDto.getEndpointKeyHash(); - EndpointProfileDto dto; validateHash(keyHash, "Incorrect key hash for endpoint profile."); - if(endpointProfileDto.getServerProfileBody() == null){ + if (endpointProfileDto.getServerProfileBody() == null) { ServerProfileSchemaDto serverProfileSchemaDto = serverProfileService.findLatestServerProfileSchema(endpointProfileDto.getApplicationId()); CTLSchemaDto schemaDto = ctlService.findCTLSchemaById(serverProfileSchemaDto.getCtlSchemaId()); LOG.debug("Set latest server profile schema [{}] and default record {} for endpoint with key [{}]", serverProfileSchemaDto.getVersion(), schemaDto.getBody(), keyHash); @@ -286,26 +286,14 @@ public EndpointProfileDto saveEndpointProfile(EndpointProfileDto endpointProfile endpointProfileDto.setServerProfileBody(schemaDto.getBody()); } if (isBlank(endpointProfileDto.getId())) { - //TODO: Improve this to avoid redundant requests to DB and invalid logic. - if (endpointProfileDao.getCountByKeyHash(keyHash) == 0) { + EndpointProfile storedProfile = endpointProfileDao.findEndpointIdByKeyHash(keyHash); + if (storedProfile != null) { + endpointProfileDto.setId(storedProfile.getId()); LOG.debug("Register new endpoint profile."); - dto = getDto(endpointProfileDao.save(endpointProfileDto)); - } else { - EndpointProfile storedProfile = endpointProfileDao.findByKeyHash(keyHash); - if (Arrays.equals(storedProfile.getEndpointKey(), endpointProfileDto.getEndpointKey())) { - LOG.debug("Got register profile for already existing profile {}. Will overwrite existing profile!", keyHash); - endpointProfileDto.setId(storedProfile.getId()); - dto = getDto(endpointProfileDao.save(endpointProfileDto)); - } else { - LOG.warn("Endpoint profile with key hash {} already exists.", keyHash); - throw new DatabaseProcessingException("Can't save endpoint profile with existing key hash."); - } } - } else { - LOG.debug("Update endpoint profile with id [{}]", endpointProfileDto.getId()); - dto = getDto(endpointProfileDao.save(endpointProfileDto)); } - return dto; + profileDto = getDto(endpointProfileDao.save(endpointProfileDto)); + return profileDto; } @Override diff --git a/server/common/dao/src/test/java/org/kaaproject/kaa/server/common/dao/service/ProfileServiceImplTest.java b/server/common/dao/src/test/java/org/kaaproject/kaa/server/common/dao/service/ProfileServiceImplTest.java index 9c181e95f9..1f66c4d3a5 100644 --- a/server/common/dao/src/test/java/org/kaaproject/kaa/server/common/dao/service/ProfileServiceImplTest.java +++ b/server/common/dao/src/test/java/org/kaaproject/kaa/server/common/dao/service/ProfileServiceImplTest.java @@ -283,6 +283,10 @@ public void updateDefaultProfileFilter() { profileService.activateProfileFilter(filter.getId(), "test"); } + @Test + public void tempTest() { + EndpointGroupDto defaultGroup = endpointService.findDefaultGroup("11"); + } @Test public void findProfileSchemaVersionsByAppIdTest() { String applicationId = generateApplicationDto(null).getId(); diff --git a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/AbstractCassandraDao.java b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/AbstractCassandraDao.java index 31ed1be312..83d08dd855 100644 --- a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/AbstractCassandraDao.java +++ b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/AbstractCassandraDao.java @@ -21,6 +21,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; +import com.datastax.driver.core.UserType; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.Result; @@ -37,6 +38,7 @@ public abstract class AbstractCassandraDao { private static final Logger LOG = LoggerFactory.getLogger(AbstractCassandraDao.class); + private static final String KAA = "kaa"; /** * Cassandra client classes. @@ -57,7 +59,7 @@ public abstract class AbstractCassandraDao { protected abstract String getColumnFamilyName(); - private Session getSession() { + protected Session getSession() { if (session == null) { session = cassandraClient.getSession(); } @@ -85,6 +87,10 @@ protected List findListByStatement(Statement statement) { return list; } + protected UserType getUserType(String userType) { + return getSession().getCluster().getMetadata().getKeyspace(KAA).getUserType(userType); + } + protected T findOneByStatement(Statement statement) { T object = null; if (statement != null) { @@ -116,6 +122,16 @@ public T save(T dto) { return dto; } + protected boolean wasApplied(ResultSet resultSet) { + boolean result = false; + if (resultSet != null) { + if (resultSet.wasApplied()) { + result = true; + } + } + return result; + } + protected void executeBatch(BatchStatement batch) { LOG.debug("Execute cassandra batch {}", batch); batch.setConsistencyLevel(getWriteConsistencyLevel()); diff --git a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/CassandraDaoUtil.java b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/CassandraDaoUtil.java index 6d36e6d4d5..8277c2a01a 100644 --- a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/CassandraDaoUtil.java +++ b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/CassandraDaoUtil.java @@ -132,6 +132,14 @@ public static String convertKeyHashToString(ByteBuffer endpointKeyHash) { return id; } + public static String convertKeyHashToString(byte[] endpointKeyHash) { + String id = null; + if (endpointKeyHash != null) { + id = Bytes.toHexString(endpointKeyHash); + } + return id; + } + /** * This method convert string representation of endpoint key hash to ByteBuffer object * if id eq null, than return null diff --git a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointProfileCassandraDao.java b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointProfileCassandraDao.java index c2fd5ca790..5516ad3f53 100644 --- a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointProfileCassandraDao.java +++ b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointProfileCassandraDao.java @@ -16,17 +16,70 @@ package org.kaaproject.kaa.server.common.nosql.cassandra.dao; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.UserType; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.google.common.base.Predicates; +import com.google.common.collect.Sets; +import org.apache.commons.codec.binary.Base64; +import org.kaaproject.kaa.common.dto.EndpointProfileBodyDto; +import org.kaaproject.kaa.common.dto.EndpointProfileDto; +import org.kaaproject.kaa.common.dto.EndpointProfilesBodyDto; +import org.kaaproject.kaa.common.dto.EndpointProfilesPageDto; +import org.kaaproject.kaa.common.dto.PageLinkDto; +import org.kaaproject.kaa.server.common.dao.DaoConstants; +import org.kaaproject.kaa.server.common.dao.exception.DatabaseProcessingException; +import org.kaaproject.kaa.server.common.dao.impl.EndpointProfileDao; +import org.kaaproject.kaa.server.common.dao.lock.KaaOptimisticLockingFailureException; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter.CassandraEPByAccessTokenDao; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter.CassandraEPByAppIdDao; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter.CassandraEPByEndpointGroupIdDao; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter.CassandraEPBySdkTokenDao; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPByAccessToken; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPByAppId; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPByEndpointGroupId; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPBySdkToken; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEndpointProfile; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEndpointUser; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.type.CassandraEndpointGroupState; +import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.type.CassandraEventClassFamilyVersionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.in; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.set; import static org.apache.commons.lang.StringUtils.isBlank; +import static org.kaaproject.kaa.server.common.dao.DaoConstants.OPT_LOCK; import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.getDto; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.convertKeyHashToString; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.convertStringToKeyHash; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getByteBuffer; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getBytes; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ENDPOINT_GROUP_STATE_CONFIGURATION_ID_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ENDPOINT_GROUP_STATE_ENDPOINT_GROUP_ID_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ENDPOINT_GROUP_STATE_PROFILE_FILTER_ID_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ENDPOINT_GROUP_STATE_USER_TYPE_NAME; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_ACCESS_TOKEN_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_APP_ID_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_BY_APP_ID_APPLICATION_ID_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_BY_APP_ID_COLUMN_FAMILY_NAME; @@ -37,56 +90,42 @@ import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_BY_SDK_TOKEN_COLUMN_FAMILY_NAME; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_BY_SDK_TOKEN_SDK_TOKEN_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_COLUMN_FAMILY_NAME; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_CONFIGURATION_SEQUENCE_NUMBER_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_CONFIGURATION_VERSION_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_CONFIG_GROUP_STATE_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_CONFIG_HASH_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_ECF_VERSION_STATE_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_ENDPOINT_ID_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_EP_KEY_HASH_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_NOTIFICATION_GROUP_STATE_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_NOTIFICATION_HASH_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_NOTIFICATION_SEQUENCE_NUMBER_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_NOTIFICATION_VERSION_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_PROFILE_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_PROFILE_VERSION_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_SERVER_HASH_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_SERVER_PROFILE_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_SERVER_PROFILE_VERSION_PROPERTY; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -import org.apache.commons.codec.binary.Base64; -import org.kaaproject.kaa.common.dto.EndpointProfileBodyDto; -import org.kaaproject.kaa.common.dto.EndpointProfileDto; -import org.kaaproject.kaa.common.dto.EndpointProfilesBodyDto; -import org.kaaproject.kaa.common.dto.EndpointProfilesPageDto; -import org.kaaproject.kaa.common.dto.PageLinkDto; -import org.kaaproject.kaa.server.common.dao.DaoConstants; -import org.kaaproject.kaa.server.common.dao.impl.EndpointProfileDao; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter.CassandraEPByAccessTokenDao; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter.CassandraEPByAppIdDao; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter.CassandraEPByEndpointGroupIdDao; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter.CassandraEPBySdkTokenDao; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPByAccessToken; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPByAppId; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPByEndpointGroupId; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPBySdkToken; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEndpointProfile; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEndpointUser; -import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.type.CassandraEndpointGroupState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Repository; - -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Statement; -import com.datastax.driver.core.querybuilder.QueryBuilder; -import com.google.common.base.Predicates; -import com.google.common.collect.Sets; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_SUBSCRIPTIONS_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_SYSTEM_NOTIFICATION_VERSION_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_USER_CONFIG_HASH_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_USER_ID_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_USER_NOTIFICATION_VERSION_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EVENT_CLASS_FAMILY_VERSION_STATE_ECF_ID_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EVENT_CLASS_FAMILY_VERSION_STATE_ECF_VERSION_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EVENT_CLASS_FAMILY_VERSION_STATE_USER_TYPE_NAME; @Repository(value = "endpointProfileDao") public class EndpointProfileCassandraDao extends AbstractCassandraDao implements EndpointProfileDao { private static final Logger LOG = LoggerFactory.getLogger(EndpointProfileCassandraDao.class); + private static final String UPDATE_PROFILE = "UPDATE kaa.ep_profile SET ecf_ver_state=?, cf_hash=?, nf_hash=?, cf_group_state=?, cf_seq_num=? , " + + "srv_pf_ver=?, sys_nf_ver=?, nf_seq_num=?, nf_ver=?, pf_ver=?, ucf_hash=?, server_hash=?, user_id=?, user_nf_ver=?, cf_ver=?, access_token=?, " + + "nf_group_state=?, pf=?, srv_pf=?, subscs=? WHERE ep_key_hash=? "; + + private static final String IF_CONDITION = "IF opt_lock = "; + + private ConcurrentHashMap statements = new ConcurrentHashMap(); @Autowired private CassandraEPByAppIdDao cassandraEPByAppIdDao; @@ -118,6 +157,27 @@ public CassandraEndpointProfile save(EndpointProfileDto dto) { } } + @Override + public Long findVersionByKey(byte[] endpointKeyHash) { + Long version = 0L; + CassandraEndpointProfile endpointProfile = findByKeyHash(endpointKeyHash); + if (endpointProfile != null) { + Long newVersion = endpointProfile.getVersion() + 1; + ResultSet resultSet = execute(QueryBuilder.update(getColumnFamilyName()).with(set(OPT_LOCK, newVersion)) + .where(eq(EP_EP_KEY_HASH_PROPERTY, getByteBuffer(endpointKeyHash))).onlyIf(eq(OPT_LOCK, endpointProfile.getVersion()))); + if (wasApplied(resultSet)) { + version = newVersion; + } else { + LOG.error("[{}] Can't update version of endpoint profile.", convertKeyHashToString(endpointKeyHash)); + new KaaOptimisticLockingFailureException("Can't update optimistic lock version of endpoint profile."); + } + return version; + } else { + LOG.error("[{}] Can't find endpoint profile by key hash.", convertKeyHashToString(endpointKeyHash)); + throw new DatabaseProcessingException("Can't find endpoint profile by key hash."); + } + } + @Override public CassandraEndpointProfile save(CassandraEndpointProfile profile) { profile.setId(convertKeyHashToString(profile.getEndpointKeyHash())); @@ -142,34 +202,81 @@ public CassandraEndpointProfile save(CassandraEndpointProfile profile) { return profile; } + private BoundStatement prepareStatement(String q, CassandraEndpointProfile pf) { + StringBuilder sb = new StringBuilder(q); + if (pf.getVersion() == 0L) { + sb.append(";"); + } else { + sb.append(IF_CONDITION).append(pf.getVersion()).append(";"); + } + String query = sb.toString(); + LOG.info("Get version {}", pf.getVersion()); + LOG.debug("Prepared query {}", query); + PreparedStatement ps = statements.get(query); + if (ps == null) { + ps = getSession().prepare(query); + statements.put(query, ps); + } + BoundStatement bs = ps.bind(); + bs.setList(EP_ECF_VERSION_STATE_PROPERTY, convertEventStateToUDT(pf.getEcfVersionStates())); + bs.setBytes(EP_CONFIG_HASH_PROPERTY, pf.getConfigurationHash()); + bs.setBytes(EP_NOTIFICATION_HASH_PROPERTY, pf.getNtHash()); + bs.setList(EP_CONFIG_GROUP_STATE_PROPERTY, convertGroupStateToUDT(pf.getCfGroupState())); + bs.setInt(EP_CONFIGURATION_SEQUENCE_NUMBER_PROPERTY, pf.getCfSequenceNumber()); + bs.setInt(EP_SERVER_PROFILE_VERSION_PROPERTY, pf.getServerProfileVersion()); + bs.setInt(EP_SYSTEM_NOTIFICATION_VERSION_PROPERTY, pf.getSystemNfVersion()); + bs.setInt(EP_NOTIFICATION_SEQUENCE_NUMBER_PROPERTY, pf.getNfSequenceNumber()); + bs.setInt(EP_NOTIFICATION_VERSION_PROPERTY, pf.getNotificationVersion()); + bs.setInt(EP_PROFILE_VERSION_PROPERTY, pf.getProfileVersion()); + bs.setBytes(EP_USER_CONFIG_HASH_PROPERTY, pf.getUserConfigurationHash()); + bs.setString(EP_SERVER_HASH_PROPERTY, pf.getServerHash()); + bs.setString(EP_USER_ID_PROPERTY, pf.getEndpointUserId()); + bs.setInt(EP_USER_NOTIFICATION_VERSION_PROPERTY, pf.getUserNfVersion()); + bs.setInt(EP_CONFIGURATION_VERSION_PROPERTY, pf.getConfigurationVersion()); + bs.setString(EP_ACCESS_TOKEN_PROPERTY, pf.getAccessToken()); + bs.setList(EP_NOTIFICATION_GROUP_STATE_PROPERTY, convertGroupStateToUDT(pf.getNfGroupState())); + bs.setString(EP_PROFILE_PROPERTY, pf.getProfile()); + bs.setString(EP_SERVER_PROFILE_PROPERTY, pf.getServerProfile()); + bs.setList(EP_SUBSCRIPTIONS_PROPERTY, pf.getSubscriptions()); + bs.setBytes(EP_EP_KEY_HASH_PROPERTY, pf.getEndpointKeyHash()); + return bs; + } + private CassandraEndpointProfile update(CassandraEndpointProfile profile) { LOG.debug("Updating endpoint profile with id {}", profile.getId()); ByteBuffer epKeyHash = profile.getEndpointKeyHash(); - CassandraEndpointProfile storedProfile = findByKeyHash(getBytes(epKeyHash)); + byte[] keyHash = getBytes(epKeyHash); + CassandraEndpointProfile storedProfile = findByKeyHash(keyHash); if (storedProfile != null) { List statementList = new ArrayList<>(); - statementList.add(getSaveQuery(profile)); - Set oldEndpointGroupIds = getEndpointProfilesGroupIdSet(storedProfile); - Set newEndpointGroupIds = getEndpointProfilesGroupIdSet(profile); - - Set removeEndpointGroupIds = Sets.filter(oldEndpointGroupIds, Predicates.not(Predicates.in(newEndpointGroupIds))); - Set addEndpointGroupIds = Sets.filter(newEndpointGroupIds, Predicates.not(Predicates.in(oldEndpointGroupIds))); - if (addEndpointGroupIds != null) { - for (String id : addEndpointGroupIds) { - statementList.add(cassandraEPByEndpointGroupIdDao.getSaveQuery(new CassandraEPByEndpointGroupId(id, epKeyHash))); + profile.setVersion(findVersionByKey(keyHash)); + BoundStatement bs = prepareStatement(UPDATE_PROFILE, profile); + if (wasApplied(execute(bs))) { + Set oldEndpointGroupIds = getEndpointProfilesGroupIdSet(storedProfile); + Set newEndpointGroupIds = getEndpointProfilesGroupIdSet(profile); + Set removeEndpointGroupIds = Sets.filter(oldEndpointGroupIds, Predicates.not(Predicates.in(newEndpointGroupIds))); + Set addEndpointGroupIds = Sets.filter(newEndpointGroupIds, Predicates.not(Predicates.in(oldEndpointGroupIds))); + if (addEndpointGroupIds != null) { + for (String id : addEndpointGroupIds) { + statementList.add(cassandraEPByEndpointGroupIdDao.getSaveQuery(new CassandraEPByEndpointGroupId(id, epKeyHash))); + } } - } - if (removeEndpointGroupIds != null) { - for (String id : removeEndpointGroupIds) { - statementList.add(delete().from(EP_BY_ENDPOINT_GROUP_ID_COLUMN_FAMILY_NAME) - .where(eq(EP_BY_ENDPOINT_GROUP_ID_ENDPOINT_GROUP_ID_PROPERTY, id)) - .and(eq(EP_BY_ENDPOINT_GROUP_ID_ENDPOINT_KEY_HASH_PROPERTY, epKeyHash))); + if (removeEndpointGroupIds != null) { + for (String id : removeEndpointGroupIds) { + statementList.add(delete().from(EP_BY_ENDPOINT_GROUP_ID_COLUMN_FAMILY_NAME) + .where(eq(EP_BY_ENDPOINT_GROUP_ID_ENDPOINT_GROUP_ID_PROPERTY, id)) + .and(eq(EP_BY_ENDPOINT_GROUP_ID_ENDPOINT_KEY_HASH_PROPERTY, epKeyHash))); + } } + executeBatch(statementList.toArray(new Statement[statementList.size()])); + } else { + LOG.error("[{}] Can't update endpoint profile with version {}. Endpoint profile already changed!", profile.getId(), profile.getVersion()); + throw new KaaOptimisticLockingFailureException("Can't update endpoint profile with version . Endpoint profile already changed!"); } - executeBatch(statementList.toArray(new Statement[statementList.size()])); LOG.debug("[{}] Endpoint profile updated", profile.getId()); } else { - LOG.warn("Stored profile is null. Can't update endpoint profile"); + LOG.error("[{}] Stored profile is null. Can't update endpoint profile.", profile.getId()); + throw new DatabaseProcessingException("Stored profile is null. Can't update endpoint profile."); } return profile; } @@ -199,17 +306,18 @@ public EndpointProfileBodyDto findBodyByKeyHash(byte[] endpointKeyHash) { } @Override - public long getCountByKeyHash(byte[] endpointKeyHash) { + public CassandraEndpointProfile findEndpointIdByKeyHash(byte[] endpointKeyHash) { LOG.debug("Try to check if endpoint profile exists with key hash [{}]", endpointKeyHash); - long count = 0; - ResultSet resultSet = execute(select().countAll().from(getColumnFamilyName()) + CassandraEndpointProfile profile = null; + ResultSet resultSet = execute(select(EP_ENDPOINT_ID_PROPERTY).from(getColumnFamilyName()) .where(eq(EP_EP_KEY_HASH_PROPERTY, getByteBuffer(endpointKeyHash)))); Row row = resultSet.one(); if (row != null) { - count = row.getLong(0); + profile = new CassandraEndpointProfile(); + profile.setId(row.getString(EP_ENDPOINT_ID_PROPERTY)); } - LOG.debug("{} endpoint profile exists with key hash [{}]", count); - return count; + LOG.debug("{} endpoint profile exists with key hash [{}]", endpointKeyHash, profile); + return profile; } private void removeByKeyHashFromEpByEndpointGroupId(byte[] endpointKeyHash) { @@ -377,27 +485,6 @@ public void removeById(ByteBuffer key) { } } - - /** - * @deprecated This method needs additional testing and thus is not - * recommended to use as of October, 2015. - */ - @Override - public List findBySdkToken(String sdkToken) { - LOG.debug("Trying to find endpoint profiles by SDK token {}", sdkToken); - - Statement query = select().from(EP_BY_SDK_TOKEN_COLUMN_FAMILY_NAME) - .where(eq(EP_BY_SDK_TOKEN_SDK_TOKEN_PROPERTY, sdkToken)); - - LOG.trace("Executing statement {}", query); - List profiles = this.findListByStatement(query); - if (LOG.isTraceEnabled()) { - LOG.trace("Endpoint profiles found: [{}]", Arrays.toString(profiles.toArray())); - } - - return profiles; - } - @Override public boolean checkSdkToken(String sdkToken) { LOG.debug("Checking for endpoint profiles with SDK token {}", sdkToken); @@ -420,6 +507,33 @@ public CassandraEndpointProfile updateServerProfile(byte[] keyHash, int version, return findById(key); } + private List convertGroupStateToUDT(List cfGroupState) { + List utList = null; + if (cfGroupState != null && !cfGroupState.isEmpty()) { + UserType ut = getUserType(ENDPOINT_GROUP_STATE_USER_TYPE_NAME); + utList = new ArrayList<>(); + for (CassandraEndpointGroupState state : cfGroupState) { + ut.newValue().setString(ENDPOINT_GROUP_STATE_ENDPOINT_GROUP_ID_PROPERTY, state.getEndpointGroupId()) + .setString(ENDPOINT_GROUP_STATE_PROFILE_FILTER_ID_PROPERTY, state.getProfileFilterId()) + .setString(ENDPOINT_GROUP_STATE_CONFIGURATION_ID_PROPERTY, state.getConfigurationId()); + } + } + return utList; + } + + private List convertEventStateToUDT(List eventStates) { + List utList = null; + if (eventStates != null && !eventStates.isEmpty()) { + UserType ut = getUserType(EVENT_CLASS_FAMILY_VERSION_STATE_USER_TYPE_NAME); + utList = new ArrayList<>(); + for (CassandraEventClassFamilyVersionState state : eventStates) { + ut.newValue().setString(EVENT_CLASS_FAMILY_VERSION_STATE_ECF_ID_PROPERTY, state.getEcfId()) + .setInt(EVENT_CLASS_FAMILY_VERSION_STATE_ECF_VERSION_PROPERTY, state.getVersion()); + } + } + return utList; + } + private Set getEndpointProfilesGroupIdSet(CassandraEndpointProfile profile) { Set groupIdSet = new HashSet<>(); List groupStateSet = new LinkedList<>(); diff --git a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/filter/CassandraEPByAppIdDao.java b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/filter/CassandraEPByAppIdDao.java index f7df45e11b..e3f03a58b9 100644 --- a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/filter/CassandraEPByAppIdDao.java +++ b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/filter/CassandraEPByAppIdDao.java @@ -22,6 +22,7 @@ import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPByAppId; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getByteBuffer; import org.slf4j.Logger; @@ -35,7 +36,6 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.gte; -import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_BY_APP_ID_APPLICATION_ID_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.EP_BY_APP_ID_ENDPOINT_KEY_HASH_PROPERTY; diff --git a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/model/CassandraEndpointProfile.java b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/model/CassandraEndpointProfile.java index 34a634b24f..64d16f8e1b 100644 --- a/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/model/CassandraEndpointProfile.java +++ b/server/common/nosql/cassandra-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/model/CassandraEndpointProfile.java @@ -24,15 +24,18 @@ import org.kaaproject.kaa.common.dto.EndpointGroupStateDto; import org.kaaproject.kaa.common.dto.EndpointProfileDto; import org.kaaproject.kaa.common.dto.EventClassFamilyVersionStateDto; +import org.kaaproject.kaa.server.common.dao.DaoConstants; import org.kaaproject.kaa.server.common.dao.impl.DaoUtil; import org.kaaproject.kaa.server.common.dao.model.EndpointProfile; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.type.CassandraEndpointGroupState; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.type.CassandraEventClassFamilyVersionState; +import javax.persistence.Version; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.List; +import static org.kaaproject.kaa.server.common.dao.DaoConstants.OPT_LOCK; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.convertDtoToModelList; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.convertECFVersionDtoToModelList; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getByteBuffer; @@ -130,6 +133,8 @@ public final class CassandraEndpointProfile implements EndpointProfile, Serializ private String sdkToken; @Column(name = EP_SERVER_PROFILE_PROPERTY) private String serverProfile; + @Column(name = OPT_LOCK) + private long version; public CassandraEndpointProfile() { } @@ -388,6 +393,14 @@ public void setApplicationId(String applicationId) { this.applicationId = applicationId; } + public long getVersion() { + return version; + } + + public void setVersion(long version) { + this.version = version; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/common/nosql/cassandra-dao/src/main/resources/cassandra.cql b/server/common/nosql/cassandra-dao/src/main/resources/cassandra.cql index 660bea1d34..0bd7f3bfcd 100644 --- a/server/common/nosql/cassandra-dao/src/main/resources/cassandra.cql +++ b/server/common/nosql/cassandra-dao/src/main/resources/cassandra.cql @@ -84,7 +84,8 @@ CREATE TABLE IF NOT EXISTS kaa.ep_profile ( ecf_ver_state list < frozen < ecf_ver_state > >, server_hash text, sdk_token text, - srv_pf text + srv_pf text, + opt_lock bigint ); CREATE TABLE IF NOT EXISTS kaa.app_eps ( diff --git a/server/common/nosql/cassandra-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointProfileCassandraDaoTest.java b/server/common/nosql/cassandra-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointProfileCassandraDaoTest.java index 8541a32538..11b766c4c4 100644 --- a/server/common/nosql/cassandra-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointProfileCassandraDaoTest.java +++ b/server/common/nosql/cassandra-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/cassandra/dao/EndpointProfileCassandraDaoTest.java @@ -19,7 +19,6 @@ import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; -import org.kaaproject.kaa.common.dto.CTLDataDto; import org.kaaproject.kaa.common.dto.EndpointGroupDto; import org.kaaproject.kaa.common.dto.EndpointGroupStateDto; import org.kaaproject.kaa.common.dto.EndpointProfileBodyDto; @@ -28,8 +27,11 @@ import org.kaaproject.kaa.common.dto.EndpointProfilesPageDto; import org.kaaproject.kaa.common.dto.EndpointUserDto; import org.kaaproject.kaa.common.dto.PageLinkDto; +import org.kaaproject.kaa.server.common.dao.lock.KaaOptimisticLockingFailureException; import org.kaaproject.kaa.server.common.dao.model.EndpointProfile; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEndpointProfile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -38,7 +40,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "/cassandra-client-test-context.xml") @@ -50,6 +55,10 @@ public class EndpointProfileCassandraDaoTest extends AbstractCassandraTest { private static final String TEST_OFFSET = "0"; private static final int GENERATED_PROFILES_COUNT = 5; + private static final Logger LOG = LoggerFactory.getLogger(EndpointProfileCassandraDaoTest.class); + + private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(10); + @Test public void testFindByEndpointGroupId() throws Exception { List endpointProfileList = new ArrayList<>(); @@ -132,8 +141,8 @@ public void testFindBodyByKeyHash() throws Exception { @Test public void testUpdate() throws Exception { - List cfGroupStateSave = new ArrayList(); - List cfGroupStateUpdate = new ArrayList(); + List cfGroupStateSave = new ArrayList<>(); + List cfGroupStateUpdate = new ArrayList<>(); PageLinkDto pageLink; EndpointProfilesPageDto found; String endpointProfileId = "11"; @@ -178,10 +187,52 @@ public void testFindByKeyHash() throws Exception { } @Test - public void testGetCountByKeyHash() throws Exception { + public void testFindEndpointIdByKeyHash() throws Exception { + EndpointProfileDto endpointProfile = generateEndpointProfile(null, null, null, null); + EndpointProfile ep = endpointProfileDao.findEndpointIdByKeyHash(endpointProfile.getEndpointKeyHash()); + Assert.assertEquals(endpointProfile.getId(), ep.getId()); + Assert.assertNull(endpointProfile.getEndpointKey()); + Assert.assertNull(ep.getEndpointKey()); + Assert.assertNull(ep.getEndpointUserId()); + Assert.assertNull(ep.getServerProfile()); + Assert.assertNull(ep.getSubscriptions()); + } + + @Test + public void testOptimisticLock() throws Exception { EndpointProfileDto endpointProfile = generateEndpointProfile(null, null, null, null); - long count = endpointProfileDao.getCountByKeyHash(endpointProfile.getEndpointKeyHash()); - Assert.assertEquals(1L, count); + endpointProfile.setAccessToken("Ololo"); + endpointProfileDao.save(endpointProfile); + long version = endpointProfileDao.findVersionByKey(endpointProfile.getEndpointKeyHash()); + Assert.assertEquals(1L, version); + } + + @Test(expected = KaaOptimisticLockingFailureException.class) + public void testOptimisticLockWithConcurrency() throws Exception { + final EndpointProfileDto endpointProfile = generateEndpointProfile(null, null, null, null); + final AtomicInteger errorCount = new AtomicInteger(); + List> tasks = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + final int id = i; + tasks.add(EXECUTOR.submit(new Runnable() { + @Override + public void run() { + try { + CassandraEndpointProfile ep = new CassandraEndpointProfile(endpointProfile); + ep.setEndpointUserId("Ololo " + id); + endpointProfileDao.save(ep.toDto()); + } catch (KaaOptimisticLockingFailureException ex) { + errorCount.incrementAndGet(); + LOG.error("Catch optimistic exception."); + throw ex; + } + } + })); + } + for (Future future : tasks) { + future.get(); + } + Assert.assertTrue(errorCount.get() > 0); } @Test diff --git a/server/common/nosql/cassandra-dao/src/test/resources/logback.xml b/server/common/nosql/cassandra-dao/src/test/resources/logback.xml index 4eef205f64..0bb4b129f4 100644 --- a/server/common/nosql/cassandra-dao/src/test/resources/logback.xml +++ b/server/common/nosql/cassandra-dao/src/test/resources/logback.xml @@ -21,12 +21,24 @@ - + + ${server_log_dir}/kaa-cassandra-dao.log + + kaa-dao.%d{yyyy-MM-dd}.log + 30 + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + - + + \ No newline at end of file diff --git a/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointProfileMongoDao.java b/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointProfileMongoDao.java index 4578942b86..f08df50de3 100644 --- a/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointProfileMongoDao.java +++ b/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointProfileMongoDao.java @@ -16,6 +16,26 @@ package org.kaaproject.kaa.server.common.nosql.mongo.dao; +import com.mongodb.DBObject; +import org.kaaproject.kaa.common.dto.EndpointProfileBodyDto; +import org.kaaproject.kaa.common.dto.EndpointProfileDto; +import org.kaaproject.kaa.common.dto.EndpointProfilesBodyDto; +import org.kaaproject.kaa.common.dto.EndpointProfilesPageDto; +import org.kaaproject.kaa.common.dto.PageLinkDto; +import org.kaaproject.kaa.server.common.dao.DaoConstants; +import org.kaaproject.kaa.server.common.dao.impl.EndpointProfileDao; +import org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoEndpointProfile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.stereotype.Repository; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.kaaproject.kaa.server.common.dao.DaoConstants.OPT_LOCK; import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.convertDtoList; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.ENDPOINT_GROUP_ID; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.ENDPOINT_PROFILE; @@ -28,30 +48,11 @@ import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SERVER_PROFILE_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SERVER_PROFILE_VERSION_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_USER_ID; +import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.ID; import static org.springframework.data.mongodb.core.query.Criteria.where; import static org.springframework.data.mongodb.core.query.Query.query; import static org.springframework.data.mongodb.core.query.Update.update; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.kaaproject.kaa.common.dto.EndpointProfileBodyDto; -import org.kaaproject.kaa.common.dto.EndpointProfileDto; -import org.kaaproject.kaa.common.dto.EndpointProfilesBodyDto; -import org.kaaproject.kaa.common.dto.EndpointProfilesPageDto; -import org.kaaproject.kaa.common.dto.PageLinkDto; -import org.kaaproject.kaa.server.common.dao.DaoConstants; -import org.kaaproject.kaa.server.common.dao.impl.EndpointProfileDao; -import org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoEndpointProfile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.mongodb.core.query.Criteria; -import org.springframework.data.mongodb.core.query.Query; -import org.springframework.stereotype.Repository; - -import com.mongodb.DBObject; - @Repository public class EndpointProfileMongoDao extends AbstractMongoDao implements EndpointProfileDao { @@ -118,6 +119,15 @@ public EndpointProfilesBodyDto findBodyByEndpointGroupId(PageLinkDto pageLink) { return endpointProfilesBodyDto; } + @Override + public Long findVersionByKey(byte[] endpointKeyHash) { + LOG.debug("Find endpoint profile version by key hash [{}] ", endpointKeyHash); + Query query = query(where(EP_ENDPOINT_KEY_HASH).is(endpointKeyHash)); + query.fields().include(OPT_LOCK); + DBObject result = mongoTemplate.getDb().getCollection(getCollectionName()).findOne(query.getQueryObject()); + return (Long) result.get(OPT_LOCK); + } + @Override public MongoEndpointProfile findByKeyHash(byte[] endpointKeyHash) { LOG.debug("Find endpoint profile by endpoint key hash [{}] ", endpointKeyHash); @@ -140,10 +150,11 @@ public EndpointProfileBodyDto findBodyByKeyHash(byte[] endpointKeyHash) { } @Override - public long getCountByKeyHash(byte[] endpointKeyHash) { + public MongoEndpointProfile findEndpointIdByKeyHash(byte[] endpointKeyHash) { LOG.debug("Get count of endpoint profiles by endpoint key hash [{}] ", endpointKeyHash); - DBObject dbObject = query(where(EP_ENDPOINT_KEY_HASH).is(endpointKeyHash)).getQueryObject(); - return mongoTemplate.getDb().getCollection(getCollectionName()).count(dbObject); + Query query = query(where(EP_ENDPOINT_KEY_HASH).is(endpointKeyHash)); + query.fields().include(ID); + return findOne(query); } @Override @@ -193,12 +204,6 @@ public MongoEndpointProfile save(EndpointProfileDto dto) { return save(new MongoEndpointProfile(dto)); } - @Override - public List findBySdkToken(String sdkToken) { - LOG.debug("Searching for endpoint profiles by SDK token {} ", sdkToken); - return find(query(where(EP_SDK_TOKEN).is(sdkToken))); - } - @Override public boolean checkSdkToken(String sdkToken) { LOG.debug("Checking for endpoint profiles with SDK token {}", sdkToken); diff --git a/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoEndpointProfile.java b/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoEndpointProfile.java index 8f84412571..cd4c4fab8a 100644 --- a/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoEndpointProfile.java +++ b/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoEndpointProfile.java @@ -24,6 +24,7 @@ import org.kaaproject.kaa.server.common.dao.impl.DaoUtil; import org.kaaproject.kaa.server.common.dao.model.EndpointProfile; import org.springframework.data.annotation.Id; +import org.springframework.data.annotation.Version; import org.springframework.data.mongodb.core.index.Indexed; import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.data.mongodb.core.mapping.Field; @@ -32,6 +33,7 @@ import java.util.Arrays; import java.util.List; +import static org.kaaproject.kaa.server.common.dao.DaoConstants.OPT_LOCK; import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.getArrayCopy; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.ENDPOINT_PROFILE; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_ACCESS_TOKEN; @@ -53,8 +55,8 @@ import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_PROFILE_VERSION; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SDK_TOKEN; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SERVER_HASH; -import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SERVER_PROFILE_VERSION_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SERVER_PROFILE_PROPERTY; +import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SERVER_PROFILE_VERSION_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_SYSTEM_NF_VERSION; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_USER_CONFIGURATION_HASH; import static org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoModelConstants.EP_USER_ID; @@ -123,12 +125,19 @@ public final class MongoEndpointProfile implements EndpointProfile, Serializable private String sdkToken; @Field(EP_SERVER_PROFILE_PROPERTY) private String serverProfile; + @Version + @Field(OPT_LOCK) + private Long optVersion; public MongoEndpointProfile() { } public MongoEndpointProfile(EndpointProfileDto dto) { + this(dto, null); + } + + public MongoEndpointProfile(EndpointProfileDto dto, Long version) { this.id = dto.getId(); this.applicationId = dto.getApplicationId(); this.endpointKey = dto.getEndpointKey(); @@ -156,6 +165,9 @@ public MongoEndpointProfile(EndpointProfileDto dto) { this.serverHash = dto.getServerHash(); this.sdkToken = dto.getSdkToken(); this.serverProfile = dto.getServerProfileBody(); + if (version != null) { + this.optVersion = version; + } } @Override @@ -274,7 +286,7 @@ public int getProfileVersion() { public void setProfileVersion(int profileVersion) { this.profileVersion = profileVersion; } - + public int getServerProfileVersion() { return serverProfileVersion; } diff --git a/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoModelConstants.java b/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoModelConstants.java index 02cbc15979..e205c860fe 100644 --- a/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoModelConstants.java +++ b/server/common/nosql/mongo-dao/src/main/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/model/MongoModelConstants.java @@ -115,7 +115,4 @@ public class MongoModelConstants { public static final String USER_CONF_SCHEMA_VERSION = "schema_version"; public static final String USER_CONF_BODY = BODY; - /** - * - */ } diff --git a/server/common/nosql/mongo-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointProfileMongoDaoTest.java b/server/common/nosql/mongo-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointProfileMongoDaoTest.java index 0b975027db..ce2c679efc 100644 --- a/server/common/nosql/mongo-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointProfileMongoDaoTest.java +++ b/server/common/nosql/mongo-dao/src/test/java/org/kaaproject/kaa/server/common/nosql/mongo/dao/EndpointProfileMongoDaoTest.java @@ -30,6 +30,7 @@ import org.kaaproject.kaa.common.dto.EndpointProfilesBodyDto; import org.kaaproject.kaa.common.dto.EndpointProfilesPageDto; import org.kaaproject.kaa.common.dto.PageLinkDto; +import org.kaaproject.kaa.server.common.dao.model.EndpointProfile; import org.kaaproject.kaa.server.common.nosql.mongo.dao.model.MongoEndpointProfile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,17 +169,24 @@ public void saveEndpointProfileTest() { @Test public void convertToDtoTest() { EndpointProfileDto endpointProfile = generateEndpointProfileDto(null, null); + endpointProfile.setAccessToken("Trololo"); + endpointProfileDao.save(endpointProfile); Assert.assertNotNull(endpointProfile); MongoEndpointProfile converted = new MongoEndpointProfile(endpointProfile); Assert.assertEquals(endpointProfile, converted.toDto()); } @Test - public void getCountByKeyHash() { + public void testFindEndpointIdByKeyHash() { EndpointProfileDto endpointProfile = generateEndpointProfileDto(null, null); Assert.assertNotNull(endpointProfile); - long count = endpointProfileDao.getCountByKeyHash(endpointProfile.getEndpointKeyHash()); - Assert.assertEquals(1, count); + EndpointProfile ep = endpointProfileDao.findEndpointIdByKeyHash(endpointProfile.getEndpointKeyHash()); + Assert.assertEquals(endpointProfile.getId(), ep.getId()); + Assert.assertNull(endpointProfile.getEndpointKey()); + Assert.assertNull(ep.getEndpointKey()); + Assert.assertNull(ep.getEndpointUserId()); + Assert.assertNull(ep.getServerProfile()); + Assert.assertNull(ep.getSubscriptions()); } @Test diff --git a/server/common/nosql/pom.xml b/server/common/nosql/pom.xml index 1f11fc94aa..90417d6321 100644 --- a/server/common/nosql/pom.xml +++ b/server/common/nosql/pom.xml @@ -32,8 +32,8 @@ UTF-8 ${basedir}/../../.. - 2.1.3 - 2.1.2 + 2.1.9 + 2.1.9 2.0.2.2 2.0.2.2 2.1.2