Skip to content

Commit

Permalink
KAA-749: Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvayka committed Feb 2, 2016
1 parent 1ec9876 commit 61bdf0d
Show file tree
Hide file tree
Showing 51 changed files with 285 additions and 595 deletions.
Expand Up @@ -16,8 +16,7 @@
package org.kaaproject.kaa.server.common.dao.model; package org.kaaproject.kaa.server.common.dao.model;


import org.kaaproject.kaa.common.dto.EndpointConfigurationDto; import org.kaaproject.kaa.common.dto.EndpointConfigurationDto;
import org.kaaproject.kaa.common.dto.HasVersion;


public interface EndpointConfiguration extends ToDto<EndpointConfigurationDto>, HasVersion { public interface EndpointConfiguration extends ToDto<EndpointConfigurationDto> {


} }
Expand Up @@ -16,8 +16,7 @@
package org.kaaproject.kaa.server.common.dao.model; package org.kaaproject.kaa.server.common.dao.model;


import org.kaaproject.kaa.common.dto.EndpointNotificationDto; import org.kaaproject.kaa.common.dto.EndpointNotificationDto;
import org.kaaproject.kaa.common.dto.HasVersion;


public interface EndpointNotification extends ToDto<EndpointNotificationDto>, HasVersion { public interface EndpointNotification extends ToDto<EndpointNotificationDto> {


} }
Expand Up @@ -16,9 +16,8 @@
package org.kaaproject.kaa.server.common.dao.model; package org.kaaproject.kaa.server.common.dao.model;


import org.kaaproject.kaa.common.dto.EndpointUserConfigurationDto; import org.kaaproject.kaa.common.dto.EndpointUserConfigurationDto;
import org.kaaproject.kaa.common.dto.HasVersion;


public interface EndpointUserConfiguration extends ToDto<EndpointUserConfigurationDto>, HasVersion { public interface EndpointUserConfiguration extends ToDto<EndpointUserConfigurationDto> {


String getUserId(); String getUserId();


Expand Down
Expand Up @@ -15,12 +15,11 @@
*/ */
package org.kaaproject.kaa.server.common.dao.model; package org.kaaproject.kaa.server.common.dao.model;


import org.kaaproject.kaa.common.dto.HasVersion;
import org.kaaproject.kaa.common.dto.TopicListEntryDto;

import java.util.List; import java.util.List;


public interface TopicListEntry extends ToDto<TopicListEntryDto>, HasVersion { import org.kaaproject.kaa.common.dto.TopicListEntryDto;

public interface TopicListEntry extends ToDto<TopicListEntryDto> {


List<String> getTopicIds(); List<String> getTopicIds();
} }
Expand Up @@ -21,13 +21,12 @@


import static org.kaaproject.kaa.common.dto.Util.getArrayCopy; import static org.kaaproject.kaa.common.dto.Util.getArrayCopy;


public class EndpointConfigurationDto implements HasVersion, Serializable { public class EndpointConfigurationDto implements Serializable {


private static final long serialVersionUID = 5662111748223086520L; private static final long serialVersionUID = 5662111748223086520L;


private byte[] configurationHash; private byte[] configurationHash;
private byte[] configuration; private byte[] configuration;
private Long version;


public byte[] getConfigurationHash() { public byte[] getConfigurationHash() {
return configurationHash; return configurationHash;
Expand Down Expand Up @@ -76,14 +75,4 @@ public String toString() {
", configuration=" + Arrays.toString(configuration) + ", configuration=" + Arrays.toString(configuration) +
'}'; '}';
} }

@Override
public Long getVersion() {
return version;
}

@Override
public void setVersion(Long version) {
this.version = version;
}
} }
Expand Up @@ -23,14 +23,13 @@
import static org.kaaproject.kaa.common.dto.Util.getArrayCopy; import static org.kaaproject.kaa.common.dto.Util.getArrayCopy;




public class EndpointNotificationDto implements HasId, HasVersion, Serializable { public class EndpointNotificationDto implements HasId, Serializable {


private static final long serialVersionUID = -5548269571722364843L; private static final long serialVersionUID = -5548269571722364843L;


private String id; private String id;
private byte[] endpointKeyHash; private byte[] endpointKeyHash;
private NotificationDto notificationDto; private NotificationDto notificationDto;
private Long version;


public String getId() { public String getId() {
return id; return id;
Expand All @@ -55,16 +54,6 @@ public byte[] getEndpointKeyHash() {
public void setEndpointKeyHash(byte[] endpointKeyHash) { public void setEndpointKeyHash(byte[] endpointKeyHash) {
this.endpointKeyHash = getArrayCopy(endpointKeyHash); this.endpointKeyHash = getArrayCopy(endpointKeyHash);
} }

@Override
public Long getVersion() {
return version;
}

@Override
public void setVersion(Long version) {
this.version = version;
}


@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
Expand Down
Expand Up @@ -17,15 +17,14 @@


import java.io.Serializable; import java.io.Serializable;


public class EndpointUserConfigurationDto implements HasVersion, Serializable { public class EndpointUserConfigurationDto implements Serializable {


private static final long serialVersionUID = -1463982688020241482L; private static final long serialVersionUID = -1463982688020241482L;


private String userId; private String userId;
private String appToken; private String appToken;
private Integer schemaVersion; private Integer schemaVersion;
private String body; private String body;
private Long version;


public String getUserId() { public String getUserId() {
return userId; return userId;
Expand Down Expand Up @@ -59,16 +58,6 @@ public void setBody(String body) {
this.body = body; this.body = body;
} }


@Override
public Long getVersion() {
return version;
}

@Override
public void setVersion(Long version) {
this.version = version;
}

@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) if (this == o)
Expand Down
Expand Up @@ -20,7 +20,7 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;


public final class TopicListEntryDto implements HasVersion, Serializable { public final class TopicListEntryDto implements Serializable {


private static final long serialVersionUID = 2771583997490244417L; private static final long serialVersionUID = 2771583997490244417L;


Expand All @@ -30,8 +30,6 @@ public final class TopicListEntryDto implements HasVersion, Serializable {


private List<TopicDto> topics; private List<TopicDto> topics;


private Long version;

public TopicListEntryDto(int simpleHash, byte[] hash, List<TopicDto> topics) { public TopicListEntryDto(int simpleHash, byte[] hash, List<TopicDto> topics) {
this.simpleHash = simpleHash; this.simpleHash = simpleHash;
this.hash = hash; this.hash = hash;
Expand Down Expand Up @@ -62,16 +60,6 @@ public void setTopics(List<TopicDto> topics) {
this.topics = topics; this.topics = topics;
} }


@Override
public Long getVersion() {
return version;
}

@Override
public void setVersion(Long version) {
this.version = version;
}

@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o) {
Expand Down
Expand Up @@ -26,7 +26,6 @@
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;


import org.kaaproject.kaa.common.dto.HasVersion;
import org.kaaproject.kaa.server.common.dao.exception.KaaOptimisticLockingFailureException; import org.kaaproject.kaa.server.common.dao.exception.KaaOptimisticLockingFailureException;
import org.kaaproject.kaa.server.common.nosql.cassandra.dao.client.CassandraClient; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.client.CassandraClient;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -49,7 +48,7 @@
import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.Result; import com.datastax.driver.mapping.Result;


public abstract class AbstractCassandraDao<T extends HasVersion, K> { public abstract class AbstractCassandraDao<T, K> {


private static final Logger LOG = LoggerFactory.getLogger(AbstractCassandraDao.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractCassandraDao.class);
private static final String KAA = "kaa"; private static final String KAA = "kaa";
Expand Down Expand Up @@ -129,21 +128,13 @@ protected Statement getSaveQuery(T dto) {
} }


public T save(T entity) { public T save(T entity) {
if (entity.getVersion() == null) { LOG.debug("Save entity {}", entity);
entity.setVersion(0l); Statement saveStatement = getSaveQuery(entity);
LOG.debug("Save entity {}", entity); saveStatement.setConsistencyLevel(getWriteConsistencyLevel());
Statement saveStatement = getSaveQuery(entity); execute(saveStatement);
saveStatement.setConsistencyLevel(getWriteConsistencyLevel()); return entity;
execute(saveStatement);
return entity;
} else {
LOG.debug("Update entity {}", entity);
return updateLocked(entity);
}
} }


protected abstract T updateLocked(T entity);

protected T updateLockedImpl(Long version, Assignment[] assignments, Clause... whereClauses) { protected T updateLockedImpl(Long version, Assignment[] assignments, Clause... whereClauses) {
version = (version == null) ? 0l : version; version = (version == null) ? 0l : version;
Assignments assigns = update(getColumnFamilyName()).onlyIf(eq(OPT_LOCK, version)).with(set(OPT_LOCK, version + 1)); Assignments assigns = update(getColumnFamilyName()).onlyIf(eq(OPT_LOCK, version)).with(set(OPT_LOCK, version + 1));
Expand Down
@@ -0,0 +1,28 @@
package org.kaaproject.kaa.server.common.nosql.cassandra.dao;

import org.kaaproject.kaa.common.dto.HasVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.driver.core.Statement;

public abstract class AbstractVersionableCassandraDao<T extends HasVersion, K> extends AbstractCassandraDao<T, K> {

private static final Logger LOG = LoggerFactory.getLogger(AbstractVersionableCassandraDao.class);

public T save(T entity) {
if (entity.getVersion() == null) {
entity.setVersion(0l);
LOG.debug("Save entity {}", entity);
Statement saveStatement = getSaveQuery(entity);
saveStatement.setConsistencyLevel(getWriteConsistencyLevel());
execute(saveStatement);
return entity;
} else {
LOG.debug("Update entity {}", entity);
return updateLocked(entity);
}
}

protected abstract T updateLocked(T entity);
}
Expand Up @@ -16,18 +16,19 @@


package org.kaaproject.kaa.server.common.nosql.cassandra.dao; package org.kaaproject.kaa.server.common.nosql.cassandra.dao;


import com.datastax.driver.core.utils.Bytes; import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import org.kaaproject.kaa.common.dto.EndpointGroupStateDto; import org.kaaproject.kaa.common.dto.EndpointGroupStateDto;
import org.kaaproject.kaa.common.dto.EventClassFamilyVersionStateDto; import org.kaaproject.kaa.common.dto.EventClassFamilyVersionStateDto;
import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants;
import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.type.CassandraEndpointGroupState; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.type.CassandraEndpointGroupState;
import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.type.CassandraEventClassFamilyVersionState; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.type.CassandraEventClassFamilyVersionState;


import java.nio.ByteBuffer; import com.datastax.driver.core.utils.Bytes;
import java.util.ArrayList;
import java.util.List;

import static org.apache.commons.lang.StringUtils.isNotBlank;


public class CassandraDaoUtil { public class CassandraDaoUtil {


Expand Down
Expand Up @@ -16,14 +16,9 @@


package org.kaaproject.kaa.server.common.nosql.cassandra.dao; package org.kaaproject.kaa.server.common.nosql.cassandra.dao;


import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getByteBuffer; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getByteBuffer;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getBytes; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getBytes;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ENDPOINT_CONFIGURATION_COLUMN_FAMILY_NAME; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ENDPOINT_CONFIGURATION_COLUMN_FAMILY_NAME;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ENDPOINT_CONFIGURATION_CONF_HASH_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ENDPOINT_CONFIGURATION_CONF_ID_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ENDPOINT_CONFIGURATION_CONF_PROPERTY;


import java.nio.ByteBuffer; import java.nio.ByteBuffer;


Expand All @@ -34,10 +29,9 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;


import com.datastax.driver.core.querybuilder.Assignment;

@Repository(value = "endpointConfigurationDao") @Repository(value = "endpointConfigurationDao")
public class EndpointConfigurationCassandraDao extends AbstractCassandraDao<CassandraEndpointConfiguration, ByteBuffer> implements EndpointConfigurationDao<CassandraEndpointConfiguration> { public class EndpointConfigurationCassandraDao extends AbstractCassandraDao<CassandraEndpointConfiguration, ByteBuffer>
implements EndpointConfigurationDao<CassandraEndpointConfiguration> {


private static final Logger LOG = LoggerFactory.getLogger(EndpointConfigurationCassandraDao.class); private static final Logger LOG = LoggerFactory.getLogger(EndpointConfigurationCassandraDao.class);


Expand All @@ -50,16 +44,6 @@ protected Class<?> getColumnFamilyClass() {
protected String getColumnFamilyName() { protected String getColumnFamilyName() {
return ENDPOINT_CONFIGURATION_COLUMN_FAMILY_NAME; return ENDPOINT_CONFIGURATION_COLUMN_FAMILY_NAME;
} }

@Override
protected CassandraEndpointConfiguration updateLocked(
CassandraEndpointConfiguration entity) {
return updateLockedImpl(entity.getVersion(),
new Assignment[]{set(ENDPOINT_CONFIGURATION_CONF_PROPERTY, entity.getConfiguration()),
set(ENDPOINT_CONFIGURATION_CONF_ID_PROPERTY, entity.getId())},
eq(ENDPOINT_CONFIGURATION_CONF_HASH_PROPERTY, getByteBuffer(getBytes(entity.getConfigurationHash())))
);
}


@Override @Override
public CassandraEndpointConfiguration findByHash(final byte[] hash) { public CassandraEndpointConfiguration findByHash(final byte[] hash) {
Expand Down Expand Up @@ -98,5 +82,4 @@ public void removeById(ByteBuffer key) {
} }
} }



} }
Expand Up @@ -19,20 +19,10 @@
import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; import static com.datastax.driver.core.querybuilder.QueryBuilder.delete;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getByteBuffer; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.CassandraDaoUtil.getByteBuffer;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_APPLICATION_ID_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_BODY_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_COLUMN_FAMILY_NAME; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_COLUMN_FAMILY_NAME;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_ENDPOINT_KEY_HASH_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_ENDPOINT_KEY_HASH_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_EXPIRED_AT_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_ID_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_LAST_MOD_TIME_PROPERTY; import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_LAST_MOD_TIME_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_NOTIFICATION_TYPE_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_SCHEMA_ID_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_SEQ_NUM_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_TOPIC_ID_PROPERTY;
import static org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants.ET_NF_VERSION_PROPERTY;


import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
Expand All @@ -47,7 +37,6 @@
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;


import com.datastax.driver.core.Statement; import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Assignment;
import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select; import com.datastax.driver.core.querybuilder.Select;


Expand All @@ -69,27 +58,6 @@ protected String getColumnFamilyName() {
return ET_NF_COLUMN_FAMILY_NAME; return ET_NF_COLUMN_FAMILY_NAME;
} }


@Override
protected CassandraEndpointNotification updateLocked(
CassandraEndpointNotification entity) {
return updateLockedImpl(
entity.getVersion(),
new Assignment[] {
set(ET_NF_ID_PROPERTY, entity.getId()),
set(ET_NF_SEQ_NUM_PROPERTY, entity.getSeqNum()),
set(ET_NF_NOTIFICATION_TYPE_PROPERTY, entity.getType().name()),
set(ET_NF_APPLICATION_ID_PROPERTY,
entity.getApplicationId()),
set(ET_NF_SCHEMA_ID_PROPERTY, entity.getSchemaId()),
set(ET_NF_VERSION_PROPERTY, entity.getNfVersion()),
set(ET_NF_BODY_PROPERTY, entity.getBody()),
set(ET_NF_EXPIRED_AT_PROPERTY, entity.getExpiredAt()),
set(ET_NF_TOPIC_ID_PROPERTY, entity.getTopicId()) },
eq(ET_NF_ENDPOINT_KEY_HASH_PROPERTY,
entity.getEndpointKeyHash()),
eq(ET_NF_LAST_MOD_TIME_PROPERTY, entity.getLastModifyTime()));
}

@Override @Override
public List<CassandraEndpointNotification> findNotificationsByKeyHash(byte[] keyHash) { public List<CassandraEndpointNotification> findNotificationsByKeyHash(byte[] keyHash) {
LOG.debug("Try to find endpoint notifications by endpoint key hash {}", keyHash); LOG.debug("Try to find endpoint notifications by endpoint key hash {}", keyHash);
Expand Down
Expand Up @@ -118,7 +118,7 @@




@Repository(value = "endpointProfileDao") @Repository(value = "endpointProfileDao")
public class EndpointProfileCassandraDao extends AbstractCassandraDao<CassandraEndpointProfile, ByteBuffer> implements EndpointProfileDao<CassandraEndpointProfile> { public class EndpointProfileCassandraDao extends AbstractVersionableCassandraDao<CassandraEndpointProfile, ByteBuffer> implements EndpointProfileDao<CassandraEndpointProfile> {


private static final Logger LOG = LoggerFactory.getLogger(EndpointProfileCassandraDao.class); private static final Logger LOG = LoggerFactory.getLogger(EndpointProfileCassandraDao.class);


Expand Down

0 comments on commit 61bdf0d

Please sign in to comment.