Skip to content

Commit

Permalink
KAA-25: Added optimistic lock for update endpoint profile.
Browse files Browse the repository at this point in the history
  • Loading branch information
Igor Khanenko committed Dec 16, 2015
1 parent 62859fa commit 62bf615
Show file tree
Hide file tree
Showing 19 changed files with 458 additions and 181 deletions.
Expand Up @@ -415,6 +415,8 @@ public class DaoConstants {


public static final String LAST_PAGE_MESSAGE = "It is the last page"; public static final String LAST_PAGE_MESSAGE = "It is the last page";
public static final String PROFILE = "profile"; public static final String PROFILE = "profile";
public static final String OPT_LOCK = "opt_lock";
public static final String APPLIED = "[applied]";


private DaoConstants() { private DaoConstants() {
throw new UnsupportedOperationException("Not supported"); throw new UnsupportedOperationException("Not supported");
Expand Down
Expand Up @@ -33,8 +33,20 @@
*/ */
public interface EndpointProfileDao<T extends EndpointProfile> extends Dao<T, ByteBuffer> { public interface EndpointProfileDao<T extends EndpointProfile> extends Dao<T, ByteBuffer> {


/**
*
* @param dto
* @return
*/
T save(EndpointProfileDto dto); T save(EndpointProfileDto dto);


/**
*
* @param endpointKeyHash
* @return
*/
Long findVersionByKey(byte[] endpointKeyHash);

/** /**
* Find endpoint profile by key hash. * Find endpoint profile by key hash.
* *
Expand Down Expand Up @@ -73,7 +85,7 @@ public interface EndpointProfileDao<T extends EndpointProfile> extends Dao<T, By
* @param endpointKeyHash the endpoint key hash * @param endpointKeyHash the endpoint key hash
* @return the count of endpoint profile * @return the count of endpoint profile
*/ */
long getCountByKeyHash(byte[] endpointKeyHash); T findEndpointIdByKeyHash(byte[] endpointKeyHash);


/** /**
* Remove endpoint profile by key hash. * Remove endpoint profile by key hash.
Expand Down Expand Up @@ -106,11 +118,6 @@ public interface EndpointProfileDao<T extends EndpointProfile> extends Dao<T, By
*/ */
List<T> findByEndpointUserId(String endpointUserId); List<T> findByEndpointUserId(String endpointUserId);


/**
* @deprecated The functionality of this method is not yet necessary.
*/
List<T> findBySdkToken(String sdkToken);

/** /**
* Checks whether there are any endpoint profiles with the given SDK token. * Checks whether there are any endpoint profiles with the given SDK token.
* *
Expand Down
@@ -0,0 +1,14 @@
package org.kaaproject.kaa.server.common.dao.lock;

import org.springframework.dao.OptimisticLockingFailureException;

public class KaaOptimisticLockingFailureException extends OptimisticLockingFailureException {

public KaaOptimisticLockingFailureException(String msg, Throwable cause) {
super(msg, cause);
}

public KaaOptimisticLockingFailureException(String msg) {
super(msg);
}
}
@@ -0,0 +1,25 @@
package org.kaaproject.kaa.server.common.dao.lock;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Retry {

/**
* The number of retry attempts
*
* @return retry attempts
*/
int times() default 1;

/**
* Declare the exception types the retry will be issued on.
*
* @return exception types causing a retry
*/
Class<? extends Exception>[] on();
}
Expand Up @@ -16,21 +16,6 @@


package org.kaaproject.kaa.server.common.dao.service; package org.kaaproject.kaa.server.common.dao.service;


import static org.apache.commons.lang.StringUtils.isBlank;
import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.convertDtoList;
import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.getDto;
import static org.kaaproject.kaa.server.common.dao.service.Validator.isValidId;
import static org.kaaproject.kaa.server.common.dao.service.Validator.isValidObject;
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateHash;
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateObject;
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateSqlId;
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateSqlObject;
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateString;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.kaaproject.kaa.common.dto.ChangeDto; import org.kaaproject.kaa.common.dto.ChangeDto;
import org.kaaproject.kaa.common.dto.ChangeNotificationDto; import org.kaaproject.kaa.common.dto.ChangeNotificationDto;
Expand Down Expand Up @@ -59,6 +44,8 @@
import org.kaaproject.kaa.server.common.dao.impl.EndpointProfileDao; import org.kaaproject.kaa.server.common.dao.impl.EndpointProfileDao;
import org.kaaproject.kaa.server.common.dao.impl.EndpointUserDao; import org.kaaproject.kaa.server.common.dao.impl.EndpointUserDao;
import org.kaaproject.kaa.server.common.dao.impl.ProfileFilterDao; import org.kaaproject.kaa.server.common.dao.impl.ProfileFilterDao;
import org.kaaproject.kaa.server.common.dao.lock.KaaOptimisticLockingFailureException;
import org.kaaproject.kaa.server.common.dao.lock.Retry;
import org.kaaproject.kaa.server.common.dao.model.EndpointConfiguration; import org.kaaproject.kaa.server.common.dao.model.EndpointConfiguration;
import org.kaaproject.kaa.server.common.dao.model.EndpointProfile; import org.kaaproject.kaa.server.common.dao.model.EndpointProfile;
import org.kaaproject.kaa.server.common.dao.model.EndpointUser; import org.kaaproject.kaa.server.common.dao.model.EndpointUser;
Expand All @@ -71,6 +58,20 @@
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;


import java.util.ArrayList;
import java.util.List;

import static org.apache.commons.lang.StringUtils.isBlank;
import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.convertDtoList;
import static org.kaaproject.kaa.server.common.dao.impl.DaoUtil.getDto;
import static org.kaaproject.kaa.server.common.dao.service.Validator.isValidId;
import static org.kaaproject.kaa.server.common.dao.service.Validator.isValidObject;
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateHash;
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateObject;
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateSqlId;
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateSqlObject;
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateString;

@Service @Service
public class EndpointServiceImpl implements EndpointService { public class EndpointServiceImpl implements EndpointService {


Expand Down Expand Up @@ -271,41 +272,28 @@ public void removeEndpointProfileByAppId(String appId) {
} }


@Override @Override
@Transactional @Retry(times = 5, on = org.springframework.dao.OptimisticLockingFailureException.class)
public EndpointProfileDto saveEndpointProfile(EndpointProfileDto endpointProfileDto) { public EndpointProfileDto saveEndpointProfile(EndpointProfileDto endpointProfileDto) throws KaaOptimisticLockingFailureException {
validateObject(endpointProfileDto, "Can't find endpoint profile object. Invalid endpoint profile object" EndpointProfileDto profileDto = null;
+ endpointProfileDto); validateObject(endpointProfileDto, "Can't find endpoint profile object. Invalid endpoint profile object" + endpointProfileDto);
byte[] keyHash = endpointProfileDto.getEndpointKeyHash(); byte[] keyHash = endpointProfileDto.getEndpointKeyHash();
EndpointProfileDto dto;
validateHash(keyHash, "Incorrect key hash for endpoint profile."); validateHash(keyHash, "Incorrect key hash for endpoint profile.");
if(endpointProfileDto.getServerProfileBody() == null){ if (endpointProfileDto.getServerProfileBody() == null) {
ServerProfileSchemaDto serverProfileSchemaDto = serverProfileService.findLatestServerProfileSchema(endpointProfileDto.getApplicationId()); ServerProfileSchemaDto serverProfileSchemaDto = serverProfileService.findLatestServerProfileSchema(endpointProfileDto.getApplicationId());
CTLSchemaDto schemaDto = ctlService.findCTLSchemaById(serverProfileSchemaDto.getCtlSchemaId()); CTLSchemaDto schemaDto = ctlService.findCTLSchemaById(serverProfileSchemaDto.getCtlSchemaId());
LOG.debug("Set latest server profile schema [{}] and default record {} for endpoint with key [{}]", serverProfileSchemaDto.getVersion(), schemaDto.getBody(), keyHash); LOG.debug("Set latest server profile schema [{}] and default record {} for endpoint with key [{}]", serverProfileSchemaDto.getVersion(), schemaDto.getBody(), keyHash);
endpointProfileDto.setServerProfileVersion(serverProfileSchemaDto.getVersion()); endpointProfileDto.setServerProfileVersion(serverProfileSchemaDto.getVersion());
endpointProfileDto.setServerProfileBody(schemaDto.getBody()); endpointProfileDto.setServerProfileBody(schemaDto.getBody());
} }
if (isBlank(endpointProfileDto.getId())) { if (isBlank(endpointProfileDto.getId())) {
//TODO: Improve this to avoid redundant requests to DB and invalid logic. EndpointProfile storedProfile = endpointProfileDao.findEndpointIdByKeyHash(keyHash);
if (endpointProfileDao.getCountByKeyHash(keyHash) == 0) { if (storedProfile != null) {
endpointProfileDto.setId(storedProfile.getId());
LOG.debug("Register new endpoint profile."); LOG.debug("Register new endpoint profile.");
dto = getDto(endpointProfileDao.save(endpointProfileDto));
} else {
EndpointProfile storedProfile = endpointProfileDao.findByKeyHash(keyHash);
if (Arrays.equals(storedProfile.getEndpointKey(), endpointProfileDto.getEndpointKey())) {
LOG.debug("Got register profile for already existing profile {}. Will overwrite existing profile!", keyHash);
endpointProfileDto.setId(storedProfile.getId());
dto = getDto(endpointProfileDao.save(endpointProfileDto));
} else {
LOG.warn("Endpoint profile with key hash {} already exists.", keyHash);
throw new DatabaseProcessingException("Can't save endpoint profile with existing key hash.");
}
} }
} else {
LOG.debug("Update endpoint profile with id [{}]", endpointProfileDto.getId());
dto = getDto(endpointProfileDao.save(endpointProfileDto));
} }
return dto; profileDto = getDto(endpointProfileDao.save(endpointProfileDto));
return profileDto;
} }


@Override @Override
Expand Down
Expand Up @@ -283,6 +283,10 @@ public void updateDefaultProfileFilter() {
profileService.activateProfileFilter(filter.getId(), "test"); profileService.activateProfileFilter(filter.getId(), "test");
} }


@Test
public void tempTest() {
EndpointGroupDto defaultGroup = endpointService.findDefaultGroup("11");
}
@Test @Test
public void findProfileSchemaVersionsByAppIdTest() { public void findProfileSchemaVersionsByAppIdTest() {
String applicationId = generateApplicationDto(null).getId(); String applicationId = generateApplicationDto(null).getId();
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session; import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement; import com.datastax.driver.core.Statement;
import com.datastax.driver.core.UserType;
import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.Result; import com.datastax.driver.mapping.Result;
Expand All @@ -37,6 +38,7 @@
public abstract class AbstractCassandraDao<T, K> { public abstract class AbstractCassandraDao<T, K> {


private static final Logger LOG = LoggerFactory.getLogger(AbstractCassandraDao.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractCassandraDao.class);
private static final String KAA = "kaa";


/** /**
* Cassandra client classes. * Cassandra client classes.
Expand All @@ -57,7 +59,7 @@ public abstract class AbstractCassandraDao<T, K> {


protected abstract String getColumnFamilyName(); protected abstract String getColumnFamilyName();


private Session getSession() { protected Session getSession() {
if (session == null) { if (session == null) {
session = cassandraClient.getSession(); session = cassandraClient.getSession();
} }
Expand Down Expand Up @@ -85,6 +87,10 @@ protected List<T> findListByStatement(Statement statement) {
return list; return list;
} }


protected UserType getUserType(String userType) {
return getSession().getCluster().getMetadata().getKeyspace(KAA).getUserType(userType);
}

protected T findOneByStatement(Statement statement) { protected T findOneByStatement(Statement statement) {
T object = null; T object = null;
if (statement != null) { if (statement != null) {
Expand Down Expand Up @@ -116,6 +122,16 @@ public T save(T dto) {
return dto; return dto;
} }


protected boolean wasApplied(ResultSet resultSet) {
boolean result = false;
if (resultSet != null) {
if (resultSet.wasApplied()) {
result = true;
}
}
return result;
}

protected void executeBatch(BatchStatement batch) { protected void executeBatch(BatchStatement batch) {
LOG.debug("Execute cassandra batch {}", batch); LOG.debug("Execute cassandra batch {}", batch);
batch.setConsistencyLevel(getWriteConsistencyLevel()); batch.setConsistencyLevel(getWriteConsistencyLevel());
Expand Down
Expand Up @@ -132,6 +132,14 @@ public static String convertKeyHashToString(ByteBuffer endpointKeyHash) {
return id; return id;
} }


public static String convertKeyHashToString(byte[] endpointKeyHash) {
String id = null;
if (endpointKeyHash != null) {
id = Bytes.toHexString(endpointKeyHash);
}
return id;
}

/** /**
* This method convert string representation of endpoint key hash to ByteBuffer object * This method convert string representation of endpoint key hash to ByteBuffer object
* if id eq null, than return null * if id eq null, than return null
Expand Down

0 comments on commit 62bf615

Please sign in to comment.