Skip to content

Commit

Permalink
KAA-749: Merge of KAA-450
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvayka committed Feb 2, 2016
2 parents 556ffb4 + 9a2528e commit 1ec9876
Show file tree
Hide file tree
Showing 76 changed files with 1,421 additions and 586 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -34,6 +34,7 @@ Copyright 2014-2015 CyberVision, Inc.
<spring.version>4.0.2.RELEASE</spring.version>
<spring.security.version>3.2.5.RELEASE</spring.security.version>
<spring.data.version>Fowler-RELEASE</spring.data.version>
<spring.retry.version>1.1.2.RELEASE</spring.retry.version>
<ehcache.version>2.8.1</ehcache.version>
<ehcache-spring.version>1.2.0</ehcache-spring.version>
<hibernate.version>4.3.11.Final</hibernate.version>
Expand Down Expand Up @@ -804,6 +805,11 @@ Copyright 2014-2015 CyberVision, Inc.
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>${spring.retry.version}</version>
</dependency>
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions server/common/dao/pom.xml
Expand Up @@ -83,6 +83,10 @@
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
Expand Down
Expand Up @@ -30,6 +30,9 @@
import org.kaaproject.kaa.common.dto.PageLinkDto;
import org.kaaproject.kaa.common.dto.TopicListEntryDto;
import org.kaaproject.kaa.common.dto.UpdateNotificationDto;
import org.kaaproject.kaa.server.common.dao.exception.KaaOptimisticLockingFailureException;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;

/**
* The interface Endpoint service.
Expand Down Expand Up @@ -196,14 +199,17 @@ public interface EndpointService {
* @param endpointUserId the endpoint user id
* @param endpointAccessToken the endpoint access token
* @return the endpoint profile dto
*/
EndpointProfileDto attachEndpointToUser(String endpointUserId, String endpointAccessToken);
*/

@Retryable(maxAttempts = 10, backoff = @Backoff(delay = 100), value = {KaaOptimisticLockingFailureException.class})
EndpointProfileDto attachEndpointToUser(String endpointUserId, String endpointAccessToken) throws KaaOptimisticLockingFailureException;

/**
* Detach endpoint profile from user.
*
* @param detachEndpoint the detach endpoint
*/
@Retryable(maxAttempts = 10, backoff = @Backoff(delay = 100), value = {KaaOptimisticLockingFailureException.class})
void detachEndpointFromUser(EndpointProfileDto detachEndpoint);

/**
Expand Down
Expand Up @@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kaaproject.kaa.server.common.dao.lock;

package org.kaaproject.kaa.server.common.dao.exception;

import org.springframework.dao.OptimisticLockingFailureException;

public class KaaOptimisticLockingFailureException extends OptimisticLockingFailureException {

private static final long serialVersionUID = -6616568718875823310L;
private static final long serialVersionUID = 1985741897367919195L;

public KaaOptimisticLockingFailureException(String msg, Throwable cause) {
super(msg, cause);
Expand Down
Expand Up @@ -70,4 +70,5 @@ public interface EndpointUserDao<T extends EndpointUser> extends Dao<T, String>
* @return true, if successful
*/
boolean checkAccessToken(String externalUid, String tenantId, String accessToken);

}
Expand Up @@ -16,7 +16,8 @@
package org.kaaproject.kaa.server.common.dao.model;

import org.kaaproject.kaa.common.dto.EndpointConfigurationDto;
import org.kaaproject.kaa.common.dto.HasVersion;

public interface EndpointConfiguration extends ToDto<EndpointConfigurationDto>{
public interface EndpointConfiguration extends ToDto<EndpointConfigurationDto>, HasVersion {

}
Expand Up @@ -16,7 +16,8 @@
package org.kaaproject.kaa.server.common.dao.model;

import org.kaaproject.kaa.common.dto.EndpointNotificationDto;
import org.kaaproject.kaa.common.dto.HasVersion;

public interface EndpointNotification extends ToDto<EndpointNotificationDto>{
public interface EndpointNotification extends ToDto<EndpointNotificationDto>, HasVersion {

}
Expand Up @@ -18,8 +18,9 @@
import java.util.List;

import org.kaaproject.kaa.common.dto.EndpointProfileDto;
import org.kaaproject.kaa.common.dto.HasVersion;

public interface EndpointProfile extends ToDto<EndpointProfileDto>{
public interface EndpointProfile extends ToDto<EndpointProfileDto>, HasVersion {

byte[] getEndpointKey();

Expand Down
Expand Up @@ -18,8 +18,9 @@
import java.util.List;

import org.kaaproject.kaa.common.dto.EndpointUserDto;
import org.kaaproject.kaa.common.dto.HasVersion;

public interface EndpointUser extends ToDto<EndpointUserDto>{
public interface EndpointUser extends ToDto<EndpointUserDto>, HasVersion {

List<String> getEndpointIds();

Expand Down
Expand Up @@ -16,8 +16,9 @@
package org.kaaproject.kaa.server.common.dao.model;

import org.kaaproject.kaa.common.dto.EndpointUserConfigurationDto;
import org.kaaproject.kaa.common.dto.HasVersion;

public interface EndpointUserConfiguration extends ToDto<EndpointUserConfigurationDto> {
public interface EndpointUserConfiguration extends ToDto<EndpointUserConfigurationDto>, HasVersion {

String getUserId();

Expand Down
Expand Up @@ -15,8 +15,9 @@
*/
package org.kaaproject.kaa.server.common.dao.model;

import org.kaaproject.kaa.common.dto.HasVersion;
import org.kaaproject.kaa.common.dto.NotificationDto;

public interface Notification extends ToDto<NotificationDto>{
public interface Notification extends ToDto<NotificationDto>, HasVersion {

}
Expand Up @@ -15,11 +15,12 @@
*/
package org.kaaproject.kaa.server.common.dao.model;

import org.kaaproject.kaa.common.dto.HasVersion;
import org.kaaproject.kaa.common.dto.TopicListEntryDto;

import java.util.List;

public interface TopicListEntry extends ToDto<TopicListEntryDto> {
public interface TopicListEntry extends ToDto<TopicListEntryDto>, HasVersion {

List<String> getTopicIds();
}
Expand Up @@ -28,6 +28,7 @@
import static org.kaaproject.kaa.server.common.dao.service.Validator.validateString;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang.StringUtils;
Expand All @@ -54,23 +55,19 @@
import org.kaaproject.kaa.server.common.dao.ServerProfileService;
import org.kaaproject.kaa.server.common.dao.exception.DatabaseProcessingException;
import org.kaaproject.kaa.server.common.dao.exception.IncorrectParameterException;
import org.kaaproject.kaa.server.common.dao.impl.ConfigurationDao;
import org.kaaproject.kaa.server.common.dao.exception.KaaOptimisticLockingFailureException;
import org.kaaproject.kaa.server.common.dao.impl.EndpointConfigurationDao;
import org.kaaproject.kaa.server.common.dao.impl.EndpointGroupDao;
import org.kaaproject.kaa.server.common.dao.impl.EndpointProfileDao;
import org.kaaproject.kaa.server.common.dao.impl.EndpointUserDao;
import org.kaaproject.kaa.server.common.dao.impl.ProfileFilterDao;
import org.kaaproject.kaa.server.common.dao.impl.TopicDao;
import org.kaaproject.kaa.server.common.dao.impl.TopicListEntryDao;
import org.kaaproject.kaa.server.common.dao.lock.KaaOptimisticLockingFailureException;
import org.kaaproject.kaa.server.common.dao.model.EndpointConfiguration;
import org.kaaproject.kaa.server.common.dao.model.EndpointProfile;
import org.kaaproject.kaa.server.common.dao.model.EndpointUser;
import org.kaaproject.kaa.server.common.dao.model.TopicListEntry;
import org.kaaproject.kaa.server.common.dao.model.sql.Configuration;
import org.kaaproject.kaa.server.common.dao.model.sql.EndpointGroup;
import org.kaaproject.kaa.server.common.dao.model.sql.ModelUtils;
import org.kaaproject.kaa.server.common.dao.model.sql.ProfileFilter;
import org.kaaproject.kaa.server.common.dao.model.sql.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -87,12 +84,6 @@ public class EndpointServiceImpl implements EndpointService {
@Autowired
private EndpointGroupDao<EndpointGroup> endpointGroupDao;
@Autowired
private ConfigurationDao<Configuration> configurationDao;
@Autowired
private ProfileFilterDao<ProfileFilter> profileFilterDao;
@Autowired
private ProfileFilterDao<ProfileFilter> verifierDao;
@Autowired
private HistoryService historyService;
@Autowired
private ServerProfileService serverProfileService;
Expand Down Expand Up @@ -282,7 +273,6 @@ public void removeEndpointProfileByAppId(String appId) {

@Override
public EndpointProfileDto saveEndpointProfile(EndpointProfileDto endpointProfileDto) throws KaaOptimisticLockingFailureException {
EndpointProfileDto profileDto = null;
validateObject(endpointProfileDto, "Can't save endpoint profile object. Invalid endpoint profile object " + endpointProfileDto);
byte[] keyHash = endpointProfileDto.getEndpointKeyHash();
validateHash(keyHash, "Incorrect key hash for endpoint profile.");
Expand All @@ -293,15 +283,27 @@ public EndpointProfileDto saveEndpointProfile(EndpointProfileDto endpointProfile
endpointProfileDto.setServerProfileVersion(serverProfileSchemaDto.getVersion());
endpointProfileDto.setServerProfileBody(schemaDto.getDefaultRecord());
}
EndpointProfileDto dto;
if (isBlank(endpointProfileDto.getId())) {
EndpointProfile storedProfile = endpointProfileDao.findEndpointIdByKeyHash(keyHash);
if (storedProfile != null) {
endpointProfileDto.setId(storedProfile.getId());
LOG.debug("Register new endpoint profile.");
EndpointProfile storedProfile = endpointProfileDao.findByKeyHash(keyHash);
if (storedProfile == null) {
dto = getDto(endpointProfileDao.save(endpointProfileDto));
} else {
if (Arrays.equals(storedProfile.getEndpointKey(), endpointProfileDto.getEndpointKey())) {
LOG.debug("Got register profile for already existing profile {}. Will overwrite existing profile!", keyHash);
endpointProfileDto.setId(storedProfile.getId());
endpointProfileDto.setVersion(storedProfile.getVersion());
dto = getDto(endpointProfileDao.save(endpointProfileDto));
} else {
LOG.warn("Endpoint profile with key hash {} already exists.", keyHash);
throw new DatabaseProcessingException("Can't save endpoint profile with existing key hash.");
}
}
} else {
LOG.debug("Update endpoint profile with id [{}]", endpointProfileDto.getId());
dto = getDto(endpointProfileDao.save(endpointProfileDto));
}
profileDto = getDto(endpointProfileDao.save(endpointProfileDto));
return profileDto;
return dto;
}

@Override
Expand All @@ -322,14 +324,14 @@ public EndpointProfileDto attachEndpointToUser(String userExternalId, String ten
endpointUser.setEndpointIds(endpointIds);
}
endpointIds.add(profile.getId());
endpointUserDao.save(endpointUser);
endpointUser = endpointUserDao.save(endpointUser);
profile.setEndpointUserId(endpointUser.getId());
LOG.trace("Save endpoint user {} and endpoint profile {}", endpointUser, profile);
return saveEndpointProfile(profile);
}

@Override
public EndpointProfileDto attachEndpointToUser(String endpointUserId, String endpointAccessToken) {
@Override
public EndpointProfileDto attachEndpointToUser(String endpointUserId, String endpointAccessToken) throws KaaOptimisticLockingFailureException {
LOG.info("Try to attach endpoint with access token {} to user with {}", endpointAccessToken, endpointUserId);
validateString(endpointUserId, "Incorrect endpointUserId " + endpointUserId);
EndpointUser endpointUser = endpointUserDao.findById(endpointUserId);
Expand All @@ -339,14 +341,18 @@ public EndpointProfileDto attachEndpointToUser(String endpointUserId, String end
LOG.trace("[{}] Found endpoint profile by with access token {} ", endpointAccessToken, endpoint);
if (endpoint != null) {
if (endpoint.getEndpointUserId() == null || endpointUserId.equals(endpoint.getEndpointUserId())) {
LOG.debug("Attach endpoint profile with id {} to endpoint user with id {} ", endpoint.getId(), endpointUser.getId());
List<String> endpointIds = endpointUser.getEndpointIds();
if (endpointIds != null && endpointIds.contains(endpoint.getId())) {
LOG.warn("Endpoint is already assigned to current user {}. Unassign it first!.", endpoint.getEndpointUserId());
throw new DatabaseProcessingException("Endpoint is already assigned to current user.");
}
if (endpointIds == null) {
endpointIds = new ArrayList<>();
endpointUser.setEndpointIds(endpointIds);
}
LOG.debug("Attach endpoint profile with id {} to endpoint user with id {} ", endpoint.getId(), endpointUser.getId());
endpointIds.add(endpoint.getId());
endpointUserDao.save(endpointUser);
endpointUser = endpointUserDao.save(endpointUser);
endpoint.setEndpointUserId(endpointUser.getId());
endpoint = endpointProfileDao.save(endpoint);
return getDto(endpoint);
Expand All @@ -371,8 +377,11 @@ public void detachEndpointFromUser(EndpointProfileDto detachEndpoint) {
EndpointUser endpointUser = endpointUserDao.findById(endpointUserId);
if (endpointUser != null) {
List<String> endpointIds = endpointUser.getEndpointIds();
if (endpointIds != null) {
if (endpointIds != null && endpointIds.contains(detachEndpoint.getId())) {
endpointIds.remove(detachEndpoint.getId());
} else {
LOG.warn("Endpoint is not assigned to current user {}!", endpointUserId);
throw new DatabaseProcessingException("Endpoint is not assigned to current user.");
}
endpointUserDao.save(endpointUser);
detachEndpoint.setEndpointUserId(null);
Expand Down
Expand Up @@ -126,7 +126,7 @@ public UpdateNotificationDto<NotificationDto> saveNotification(NotificationDto d
if (isNotBlank(schemaId) && isNotBlank(topicId)) {
NotificationSchema schema = notificationSchemaDao.findById(schemaId);
if (schema != null) {
dto.setVersion(schema.getVersion());
dto.setNfVersion(schema.getVersion());
dto.setApplicationId(schema.getApplicationId());
dto.setType(schema.getType());
} else {
Expand Down Expand Up @@ -290,7 +290,7 @@ public UpdateNotificationDto<EndpointNotificationDto> saveUnicastNotification(En
notificationDto.setSecNum(-1);
NotificationSchema schema = notificationSchemaDao.findById(schemaId);
if (schema != null) {
notificationDto.setVersion(schema.getVersion());
notificationDto.setNfVersion(schema.getVersion());
notificationDto.setApplicationId(schema.getApplicationId());
notificationDto.setType(schema.getType());
try {
Expand Down
4 changes: 3 additions & 1 deletion server/common/dao/src/main/resources/common-dao-context.xml
Expand Up @@ -29,7 +29,9 @@
http://www.springframework.org/schema/util/spring-util-3.0.xsd">

<context:annotation-config/>

<aop:aspectj-autoproxy />
<bean class="org.springframework.retry.annotation.RetryConfiguration" />

<context:component-scan base-package="org.kaaproject.kaa.server.common.dao"/>

<util:properties id="dao" location="classpath:dao.properties"/>
Expand Down

0 comments on commit 1ec9876

Please sign in to comment.