Skip to content

Commit

Permalink
KAA-876: Fixed Cassandra DAO for registration and device mgmt
Browse files Browse the repository at this point in the history
  • Loading branch information
ashvayka committed Apr 22, 2016
1 parent 2b58330 commit 8a4e2b5
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 55 deletions.
Expand Up @@ -16,15 +16,16 @@


package org.kaaproject.kaa.server.common.nosql.cassandra.dao; package org.kaaproject.kaa.server.common.nosql.cassandra.dao;


import static org.apache.commons.lang.StringUtils.isBlank;

import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;


import org.apache.commons.lang.StringUtils;
import org.kaaproject.kaa.common.dto.credentials.EndpointRegistrationDto; import org.kaaproject.kaa.common.dto.credentials.EndpointRegistrationDto;
import org.kaaproject.kaa.server.common.dao.impl.EndpointRegistrationDao; import org.kaaproject.kaa.server.common.dao.impl.EndpointRegistrationDao;
import org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter.CassandraEPRegistrationByCredentialsIDDao; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter.CassandraEPRegistrationByEndpointIDDao;
import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPRegistrationByCredentialsID; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPRegistrationByEndpointID;
import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEndpointRegistration; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEndpointRegistration;
import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -42,13 +43,13 @@
* @since v0.9.0 * @since v0.9.0
*/ */
@Repository("endpointRegistrationDao") @Repository("endpointRegistrationDao")
public class EndpointRegistrationCassandraDao extends AbstractCassandraDao<CassandraEndpointRegistration, String> implements public class EndpointRegistrationCassandraDao extends AbstractCassandraDao<CassandraEndpointRegistration, String>
EndpointRegistrationDao<CassandraEndpointRegistration> { implements EndpointRegistrationDao<CassandraEndpointRegistration> {


private static final Logger LOG = LoggerFactory.getLogger(EndpointRegistrationCassandraDao.class); private static final Logger LOG = LoggerFactory.getLogger(EndpointRegistrationCassandraDao.class);


@Autowired @Autowired
private CassandraEPRegistrationByCredentialsIDDao byCredentialsID; private CassandraEPRegistrationByEndpointIDDao byEndpointID;


@Override @Override
protected Class<CassandraEndpointRegistration> getColumnFamilyClass() { protected Class<CassandraEndpointRegistration> getColumnFamilyClass() {
Expand All @@ -63,19 +64,20 @@ protected String getColumnFamilyName() {
@Override @Override
public CassandraEndpointRegistration save(EndpointRegistrationDto endpointRegistration) { public CassandraEndpointRegistration save(EndpointRegistrationDto endpointRegistration) {
LOG.debug("Saving [{}]", endpointRegistration.toString()); LOG.debug("Saving [{}]", endpointRegistration.toString());
if (StringUtils.isBlank(endpointRegistration.getId())) {
endpointRegistration.setId(endpointRegistration.getEndpointId());
}
return this.save(new CassandraEndpointRegistration(endpointRegistration)); return this.save(new CassandraEndpointRegistration(endpointRegistration));
} }


@Override @Override
public CassandraEndpointRegistration save(CassandraEndpointRegistration endpointRegistration) { public CassandraEndpointRegistration save(CassandraEndpointRegistration endpointRegistration) {
if (isBlank(endpointRegistration.getId())) {
endpointRegistration.generateId();
}
endpointRegistration = super.save(endpointRegistration); endpointRegistration = super.save(endpointRegistration);
List<Statement> statements = new ArrayList<>(); List<Statement> statements = new ArrayList<>();
statements.add(this.getSaveQuery(endpointRegistration)); statements.add(this.getSaveQuery(endpointRegistration));
if (endpointRegistration.getCredentialsId() != null) { if (endpointRegistration.getEndpointId() != null) {
statements.add(this.byCredentialsID.getSaveQuery(CassandraEPRegistrationByCredentialsID.fromEndpointRegistration(endpointRegistration))); statements.add(
this.byEndpointID.getSaveQuery(CassandraEPRegistrationByEndpointID.fromEndpointRegistration(endpointRegistration)));
} }
this.executeBatch(statements.toArray(new Statement[statements.size()])); this.executeBatch(statements.toArray(new Statement[statements.size()]));
return endpointRegistration; return endpointRegistration;
Expand All @@ -84,25 +86,39 @@ public CassandraEndpointRegistration save(CassandraEndpointRegistration endpoint
@Override @Override
public Optional<CassandraEndpointRegistration> findByEndpointId(String endpointId) { public Optional<CassandraEndpointRegistration> findByEndpointId(String endpointId) {
LOG.debug("Searching for endpoint registration by endpoint ID [{}]", endpointId); LOG.debug("Searching for endpoint registration by endpoint ID [{}]", endpointId);
Clause clause = QueryBuilder.eq(CassandraModelConstants.EP_REGISTRATION_ENDPOINT_ID_PROPERTY, endpointId); Optional<String> credentialsId = this.byEndpointID.getCredentialsIdByEndpointId(endpointId);
Statement statement = QueryBuilder.select().from(this.getColumnFamilyName()).where(clause); if (credentialsId.isPresent()) {
return Optional.ofNullable(this.findOneByStatement(statement)); LOG.debug("[{}] Endpoint credentials ID by endpoint ID: {}", endpointId, credentialsId.get());
Clause clause = QueryBuilder.eq(CassandraModelConstants.EP_REGISTRATION_CREDENTIALS_ID_PROPERTY, credentialsId.get());
Statement statement = QueryBuilder.select().from(this.getColumnFamilyName()).where(clause);
return Optional.ofNullable(this.findOneByStatement(statement));
} else {
LOG.debug("[{}] No credentials ID found by endpoint ID: {}", endpointId);
return Optional.empty();
}
} }


@Override @Override
public Optional<CassandraEndpointRegistration> findByCredentialsId(String credentialsId) { public Optional<CassandraEndpointRegistration> findByCredentialsId(String credentialsId) {
LOG.debug("Searching for endpoint registration by credentials ID [{}]", credentialsId); LOG.debug("Searching for endpoint registration by credentials ID [{}]", credentialsId);
String endpointId = this.byCredentialsID.getEndpointIdByCredentialsId(credentialsId); Clause clause = QueryBuilder.eq(CassandraModelConstants.EP_REGISTRATION_CREDENTIALS_ID_PROPERTY, credentialsId);
Clause clause = QueryBuilder.eq(CassandraModelConstants.EP_REGISTRATION_ENDPOINT_ID_PROPERTY, endpointId);
Statement statement = QueryBuilder.select().from(this.getColumnFamilyName()).where(clause); Statement statement = QueryBuilder.select().from(this.getColumnFamilyName()).where(clause);
return Optional.ofNullable(this.findOneByStatement(statement)); return Optional.ofNullable(this.findOneByStatement(statement));
} }


@Override @Override
public void removeByEndpointId(String endpointId) { public void removeByEndpointId(String endpointId) {
LOG.debug("Removing endpoint registration by endpoint ID", endpointId); LOG.debug("Removing endpoint registration by endpoint ID", endpointId);
Clause clause = QueryBuilder.eq(CassandraModelConstants.EP_REGISTRATION_ENDPOINT_ID_PROPERTY, endpointId); Optional<String> credentialsId = this.byEndpointID.getCredentialsIdByEndpointId(endpointId);
Statement statement = QueryBuilder.delete().from(this.getColumnFamilyName()).where(clause); if (credentialsId.isPresent()) {
this.execute(statement); Clause clause = QueryBuilder.eq(CassandraModelConstants.EP_REGISTRATION_BY_ENDPOINT_ID_ENDPOINT_ID_PROPERTY, endpointId);
Statement statement = QueryBuilder.delete().from(this.byEndpointID.getColumnFamilyName()).where(clause);
this.execute(statement);
clause = QueryBuilder.eq(CassandraModelConstants.EP_REGISTRATION_CREDENTIALS_ID_PROPERTY, credentialsId.get());
statement = QueryBuilder.delete().from(this.getColumnFamilyName()).where(clause);
this.execute(statement);
} else {
LOG.debug("[{}] No credentials ID found by endpoint ID: {}", endpointId);
}
} }
} }
Expand Up @@ -16,8 +16,10 @@


package org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter; package org.kaaproject.kaa.server.common.nosql.cassandra.dao.filter;


import java.util.Optional;

import org.kaaproject.kaa.server.common.nosql.cassandra.dao.AbstractCassandraDao; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.AbstractCassandraDao;
import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPRegistrationByCredentialsID; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPRegistrationByEndpointID;
import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants; import org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraModelConstants;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;


Expand All @@ -31,21 +33,26 @@
* @since v0.9.0 * @since v0.9.0
*/ */
@Repository @Repository
public class CassandraEPRegistrationByCredentialsIDDao extends AbstractCassandraDao<CassandraEPRegistrationByCredentialsID, String> { public class CassandraEPRegistrationByEndpointIDDao extends AbstractCassandraDao<CassandraEPRegistrationByEndpointID, String> {


@Override @Override
protected Class<CassandraEPRegistrationByCredentialsID> getColumnFamilyClass() { protected Class<CassandraEPRegistrationByEndpointID> getColumnFamilyClass() {
return CassandraEPRegistrationByCredentialsID.class; return CassandraEPRegistrationByEndpointID.class;
} }


@Override @Override
protected String getColumnFamilyName() { public String getColumnFamilyName() {
return CassandraModelConstants.EP_REGISTRATIONS_BY_CREDENTIALS_ID_COLUMN_FAMILY_NAME; return CassandraModelConstants.EP_REGISTRATIONS_BY_ENDPOINT_ID_COLUMN_FAMILY_NAME;
} }


public String getEndpointIdByCredentialsId(String credentialsId) { public Optional<String> getCredentialsIdByEndpointId(String endpointId) {
Clause clause = QueryBuilder.eq(CassandraModelConstants.EP_REGISTRATION_BY_CREDENTIALS_ID_CREDENTIALS_ID_PROPERTY, credentialsId); Clause clause = QueryBuilder.eq(CassandraModelConstants.EP_REGISTRATION_BY_ENDPOINT_ID_ENDPOINT_ID_PROPERTY, endpointId);
Statement statement = QueryBuilder.select().from(this.getColumnFamilyName()).where(clause); Statement statement = QueryBuilder.select().from(this.getColumnFamilyName()).where(clause);
return this.findOneByStatement(statement).getEndpointId(); CassandraEPRegistrationByEndpointID result = this.findOneByStatement(statement);
if (result != null) {
return Optional.of(result.getCredentialsId());
} else {
return Optional.empty();
}
} }
} }
Expand Up @@ -24,7 +24,6 @@
import org.apache.commons.lang.builder.ToStringStyle; import org.apache.commons.lang.builder.ToStringStyle;
import org.kaaproject.kaa.server.common.dao.model.EndpointRegistration; import org.kaaproject.kaa.server.common.dao.model.EndpointRegistration;


import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.Column; import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey; import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table; import com.datastax.driver.mapping.annotations.Table;
Expand All @@ -35,32 +34,32 @@
* *
* @since v0.9.0 * @since v0.9.0
*/ */
@Table(name = CassandraModelConstants.EP_REGISTRATIONS_BY_CREDENTIALS_ID_COLUMN_FAMILY_NAME) @Table(name = CassandraModelConstants.EP_REGISTRATIONS_BY_ENDPOINT_ID_COLUMN_FAMILY_NAME)
public final class CassandraEPRegistrationByCredentialsID implements Serializable { public final class CassandraEPRegistrationByEndpointID implements Serializable {


@Transient @Transient
private static final long serialVersionUID = 1000L; private static final long serialVersionUID = 1000L;


@PartitionKey @PartitionKey
@Column(name = CassandraModelConstants.EP_REGISTRATION_BY_CREDENTIALS_ID_CREDENTIALS_ID_PROPERTY) @Column(name = CassandraModelConstants.EP_REGISTRATION_BY_ENDPOINT_ID_ENDPOINT_ID_PROPERTY)
private String endpointId;

@Column(name = CassandraModelConstants.EP_REGISTRATION_BY_ENDPOINT_ID_CREDENTIALS_ID_PROPERTY)
private String credentialsId; private String credentialsId;


@ClusteringColumn
@Column(name = CassandraModelConstants.EP_REGISTRATION_BY_CREDENTIALS_ID_ENDPOINT_ID_PROPERTY)
private String endpointId;


public static CassandraEPRegistrationByCredentialsID fromEndpointRegistration(EndpointRegistration endpointRegistration) { public static CassandraEPRegistrationByEndpointID fromEndpointRegistration(EndpointRegistration endpointRegistration) {
String credentialsId = endpointRegistration.getCredentialsId();
String endpointId = endpointRegistration.getEndpointId(); String endpointId = endpointRegistration.getEndpointId();
return new CassandraEPRegistrationByCredentialsID(credentialsId, endpointId); String credentialsId = endpointRegistration.getCredentialsId();
return new CassandraEPRegistrationByEndpointID(endpointId, credentialsId);
} }


public CassandraEPRegistrationByCredentialsID() { public CassandraEPRegistrationByEndpointID() {
} }


public CassandraEPRegistrationByCredentialsID(String credentialsId, String endpointId) { public CassandraEPRegistrationByEndpointID(String endpointId, String credentialsId) {
this.credentialsId = credentialsId;
this.endpointId = endpointId; this.endpointId = endpointId;
this.credentialsId = credentialsId;
} }


public String getCredentialsId() { public String getCredentialsId() {
Expand Down
Expand Up @@ -16,6 +16,8 @@


package org.kaaproject.kaa.server.common.nosql.cassandra.dao.model; package org.kaaproject.kaa.server.common.nosql.cassandra.dao.model;


import static org.apache.commons.lang.StringUtils.isBlank;

import java.io.Serializable; import java.io.Serializable;


import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.EqualsBuilder;
Expand Down Expand Up @@ -51,11 +53,11 @@ public final class CassandraEndpointRegistration implements EndpointRegistration
private String applicationId; private String applicationId;


@PartitionKey @PartitionKey
@Column(name = CassandraModelConstants.EP_REGISTRATION_ENDPOINT_ID_PROPERTY)
private String endpointId;

@Column(name = CassandraModelConstants.EP_REGISTRATION_CREDENTIALS_ID_PROPERTY) @Column(name = CassandraModelConstants.EP_REGISTRATION_CREDENTIALS_ID_PROPERTY)
private String credentialsId; private String credentialsId;

@Column(name = CassandraModelConstants.EP_REGISTRATION_ENDPOINT_ID_PROPERTY)
private String endpointId;


@Column(name = CassandraModelConstants.EP_REGISTRATION_SERVER_PROFILE_VERSION_PROPERTY) @Column(name = CassandraModelConstants.EP_REGISTRATION_SERVER_PROFILE_VERSION_PROPERTY)
private Integer serverProfileVersion; private Integer serverProfileVersion;
Expand Down Expand Up @@ -155,4 +157,11 @@ public boolean equals(Object other) {
public String toString() { public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
} }

public String generateId() {
if (isBlank(id)) {
id = credentialsId;
}
return id;
}
} }
Expand Up @@ -208,11 +208,11 @@ private CassandraModelConstants() {
public static final String EP_REGISTRATION_SERVER_PROFILE_BODY_PROPERTY = EP_SERVER_PROFILE_PROPERTY; public static final String EP_REGISTRATION_SERVER_PROFILE_BODY_PROPERTY = EP_SERVER_PROFILE_PROPERTY;


/** /**
* {@link org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPRegistrationByCredentialsID} * {@link org.kaaproject.kaa.server.common.nosql.cassandra.dao.model.CassandraEPRegistrationByEndpointID}
*/ */
public static final String EP_REGISTRATIONS_BY_CREDENTIALS_ID_COLUMN_FAMILY_NAME = "creds_id_ep_registration"; public static final String EP_REGISTRATIONS_BY_ENDPOINT_ID_COLUMN_FAMILY_NAME = "creds_id_ep_registration";
public static final String EP_REGISTRATION_BY_CREDENTIALS_ID_CREDENTIALS_ID_PROPERTY = EP_REGISTRATION_CREDENTIALS_ID_PROPERTY; public static final String EP_REGISTRATION_BY_ENDPOINT_ID_CREDENTIALS_ID_PROPERTY = EP_REGISTRATION_CREDENTIALS_ID_PROPERTY;
public static final String EP_REGISTRATION_BY_CREDENTIALS_ID_ENDPOINT_ID_PROPERTY = EP_REGISTRATION_ENDPOINT_ID_PROPERTY; public static final String EP_REGISTRATION_BY_ENDPOINT_ID_ENDPOINT_ID_PROPERTY = EP_REGISTRATION_ENDPOINT_ID_PROPERTY;


/** /**
* Cassandra Credentials constants * Cassandra Credentials constants
Expand Down
Expand Up @@ -133,15 +133,15 @@ CREATE TABLE IF NOT EXISTS kaa.tl_entry (
CREATE TABLE IF NOT EXISTS kaa.ep_registration ( CREATE TABLE IF NOT EXISTS kaa.ep_registration (
id text, id text,
app_id text, app_id text,
ep_id text PRIMARY KEY, ep_id text,
creds_id text, creds_id text PRIMARY KEY,
srv_pf_ver int, srv_pf_ver int,
srv_pf text srv_pf text
); );


CREATE TABLE IF NOT EXISTS kaa.creds_id_ep_registration ( CREATE TABLE IF NOT EXISTS kaa.creds_id_ep_registration (
creds_id text PRIMARY KEY, ep_id text PRIMARY KEY,
ep_id text creds_id text
); );


CREATE TABLE IF NOT EXISTS kaa.credentials ( CREATE TABLE IF NOT EXISTS kaa.credentials (
Expand Down
Expand Up @@ -2,7 +2,7 @@ cluster_name=Kaa Cluster


keyspace_name=kaa keyspace_name=kaa


node_list=127.0.0.1:9042 node_list=127.0.0.1:9142


use_ssl=false use_ssl=false


Expand Down
Expand Up @@ -282,7 +282,7 @@ listen_address: 127.0.0.1


start_native_transport: true start_native_transport: true
# port for the CQL native transport to listen for clients on # port for the CQL native transport to listen for clients on
native_transport_port: 9042 native_transport_port: 9142


# Whether to start the thrift rpc server. # Whether to start the thrift rpc server.
start_rpc: true start_rpc: true
Expand Down Expand Up @@ -599,4 +599,4 @@ encryption_options:
# protocol: TLS # protocol: TLS
# algorithm: SunX509 # algorithm: SunX509
# store_type: JKS # store_type: JKS
# cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA] # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]

0 comments on commit 8a4e2b5

Please sign in to comment.