Skip to content

Commit

Permalink
Cassandra notification upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
nocs00 committed Aug 26, 2016
1 parent 6e268b7 commit be37a7c
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 7 deletions.
4 changes: 4 additions & 0 deletions server/upgrade/data-migration-0.9.0-0.10.0/pom.xml
Expand Up @@ -52,6 +52,10 @@
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Expand Up @@ -16,7 +16,6 @@

package org.kaaproject.data_migration;

import com.mongodb.MongoClient;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.kaaproject.data_migration.model.Schema;
Expand Down Expand Up @@ -58,7 +57,10 @@ protected List<Schema> transform() throws SQLException {

// delete the fetched ids from schema table
String toDelete = schemas.stream().map(s -> s.getId().toString()).collect(joining(", "));
runner.update(connection, "delete from schems where id in (" + toDelete + ")");
String NOT_EMPTY_ID_SET = "^[\\s]*([0-9]+(\\,\\s)?)+";
if (toDelete.matches(NOT_EMPTY_ID_SET)) {
runner.update(connection, "delete from schems where id in (" + toDelete + ")");
}

// shift ids in order to avoid PK constraint violation during adding record to base_schema
Long shift = runner.query(connection, "select max(id) as max_id from "+ getPrefixTableName() + "_schems", rs -> rs.next() ? rs.getLong("max_id") : null);
Expand Down
Expand Up @@ -88,7 +88,12 @@ public Map<Ctl, List<Schema>> aggregate(List<Schema> schemas) throws SQLExceptio
RawSchema rawSchema = new RawSchema(schemaBody.toString());
DefaultRecordGenerationAlgorithm<RawData> algotithm = new DefaultRecordGenerationAlgorithmImpl<>(rawSchema, new RawDataFactory());
String defaultRecord = algotithm.getRootData().getRawData();
Long tenantId = runner.query(connection, "select tenant_id from application where id = " + schema.getAppId(), rs -> rs.next() ? rs.getLong("tenant_id") : null);
Long tenantId = null;
if (schema.getAppId() != null) {
tenantId = runner.query(connection, "select tenant_id from application where id = " + schema.getAppId(), rs -> rs.next() ? rs.getLong("tenant_id") : null);
} else {
tenantId = runner.query(connection, "select tenant_id from events_class where id = " + schema.getId() , rs -> rs.next() ? rs.getLong("tenant_id") : null);
}

Ctl ctl = new Ctl(currentCtlId, new CtlMetaInfo(currentCTLMetaId, fqn, schema.getAppId(), tenantId), defaultRecord);

Expand Down
@@ -1,29 +1,46 @@
package org.kaaproject.data_migration;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.utils.Bytes;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.kaaproject.data_migration.model.Schema;
import org.kaaproject.data_migration.utils.Options;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;

import static com.mongodb.client.model.Filters.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
import static com.datastax.driver.core.querybuilder.QueryBuilder.update;



public class CTLNotificationMigration extends AbstractCTLMigration {

private MongoClient client;
private Cluster cluster;
private String dbName;
private String nosql;

public CTLNotificationMigration(Connection connection, String host, String db, String nosql) {
super(connection);
client = new MongoClient(host);
cluster = Cluster.builder()
.addContactPoint(host)
.build();
dbName = db;
this.nosql = nosql;
}
Expand All @@ -42,18 +59,56 @@ protected List<Schema> transform() throws SQLException {
for (Document document : cursor) {
Object id = document.get("_id");
Long schemaId = Long.parseLong((String) document.get("notification_schema_id"));
notification.updateMany(eq("_id", id), eq("$set", eq("notification_schema_id", schemaId + idShift)));
notification.updateMany(Filters.eq("_id", id), Filters.eq("$set", Filters.eq("notification_schema_id", schemaId + idShift)));
}

cursor = enpNotification.find();
for (Document document : cursor) {
Object id = document.get("_id");
Long schemaId = Long.parseLong((String) document.get("notification.notification_schema_id"));
notification.updateMany(eq("_id", id), eq("$set", eq("notification.notification_schema_id", schemaId + idShift)));
notification.updateMany(Filters.eq("_id", id), Filters.eq("$set", Filters.eq("notification.notification_schema_id", schemaId + idShift)));
}
} else {
Session session = cluster.connect(dbName);
BatchStatement batchStatement = new BatchStatement();

//notification
ResultSet results = session.execute(select().from("notification"));
for (Row row : results) {
String id = row.getString("nf_id");
Long schemaId = Long.parseLong(row.getString("schema_id"));
String[] ids = id.split("::");

batchStatement.add(
update("notification")
.with(set("schema_id", String.valueOf(schemaId + idShift)))
.where(eq("topic_id", ids[0]))
.and(eq("nf_type", ids[1]))
.and(eq("nf_version", Integer.valueOf(ids[2])))
.and(eq("seq_num", Integer.valueOf(ids[3])))
);
}

//endpoint notification
results = session.execute(select().from("ep_nfs"));
for (Row row : results) {
String id = row.getString("nf_id");
Long schemaId = Long.parseLong(row.getString("schema_id"));
String[] ids = id.split("::");
ByteBuffer epKeyHash = Bytes.fromHexString(ids[0]);
Date lastModTime = new Date(Long.valueOf(ids[1]));

batchStatement.add(
update("ep_nfs")
.with(set("schema_id", String.valueOf(schemaId + idShift)))
.where(eq("ep_key_hash", epKeyHash))
.and(eq("last_mod_time", lastModTime))
);
}

//TODO update for cassandra
session.execute(batchStatement);
session.close();
cluster.close();
}
return res;
}
Expand Down

0 comments on commit be37a7c

Please sign in to comment.