Skip to content

Commit

Permalink
Fix code style in datamigration and some fixes with lombok
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill380 committed Oct 3, 2016
1 parent c7b5eaa commit 9970924
Show file tree
Hide file tree
Showing 19 changed files with 237 additions and 150 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Expand Up @@ -31,7 +31,7 @@ Copyright 2014-2016 CyberVision, Inc.
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<main.dir>${basedir}</main.dir> <main.dir>${basedir}</main.dir>
<avro-ui.version>0.2.1</avro-ui.version> <avro-ui.version>0.2.1</avro-ui.version>
<lombok.version>1.16.10.0</lombok.version> <lombok.version>1.16.10</lombok.version>
<akka.version>2.4.1</akka.version> <akka.version>2.4.1</akka.version>
<spring.version>4.2.5.RELEASE</spring.version> <spring.version>4.2.5.RELEASE</spring.version>
<spring.security.version>3.2.9.RELEASE</spring.security.version> <spring.security.version>3.2.9.RELEASE</spring.security.version>
Expand Down Expand Up @@ -1334,7 +1334,7 @@ Copyright 2014-2016 CyberVision, Inc.
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok-maven</artifactId> <artifactId>lombok</artifactId>
<version>${lombok.version}</version> <version>${lombok.version}</version>
</dependency> </dependency>
<dependency> <dependency>
Expand Down
5 changes: 4 additions & 1 deletion server/upgrade/data-migration-0.9.0-0.10.0/pom.xml
Expand Up @@ -74,8 +74,11 @@
<artifactId>cassandra-driver-core</artifactId> <artifactId>cassandra-driver-core</artifactId>
<version>3.0.2</version> <version>3.0.2</version>
</dependency> </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies> </dependencies>

<build> <build>
<finalName>db-migration</finalName> <finalName>db-migration</finalName>
<plugins> <plugins>
Expand Down
Expand Up @@ -36,18 +36,36 @@ public abstract class AbstractCtlMigration {
protected DataDefinition dd; protected DataDefinition dd;
protected Long idShift; protected Long idShift;



/**
* Create entity that responsible for data migration from old tables to new ctl based ones.
*
* @param connection the connection to relational database
*/
public AbstractCtlMigration(Connection connection) { public AbstractCtlMigration(Connection connection) {
this.connection = connection; this.connection = connection;
runner = new QueryRunner(); runner = new QueryRunner();
dd = new DataDefinition(connection); dd = new DataDefinition(connection);
} }



/**
* Do necessary operation to database before execution of {@link AbstractCtlMigration#transform()}
* method.
*
* @throws SQLException the sql exception
*/
public void beforeTransform() throws SQLException { public void beforeTransform() throws SQLException {
// delete relation between <table_prefix>_schems to schems // delete relation between <table_prefix>_schems to schems
dd.dropUnnamedFk(getPrefixTableName() + "_schems", "schems"); dd.dropUnnamedFk(getPrefixTableName() + "_schems", "schems");
} }




/**
* Do main part of data migration from old tables to new ctl based ones.
*
* @throws SQLException the sql exception
*/
protected List<Schema> transform() throws SQLException { protected List<Schema> transform() throws SQLException {
// fetch schemas of appropriate feature like configuration // fetch schemas of appropriate feature like configuration
List<Schema> schemas = runner.query(connection, "select " List<Schema> schemas = runner.query(connection, "select "
Expand Down Expand Up @@ -76,6 +94,12 @@ protected List<Schema> transform() throws SQLException {
} }




/**
* Do necessary operation to database after execution of {@link AbstractCtlMigration#transform()}
* method.
*
* @throws SQLException the sql exception
*/
public void afterTransform() throws SQLException { public void afterTransform() throws SQLException {
dd.alterTable(getPrefixTableName() + "_schems") dd.alterTable(getPrefixTableName() + "_schems")
.add(Constraint.constraint("FK_" + getPrefixTableName() + "_base_schems_id") .add(Constraint.constraint("FK_" + getPrefixTableName() + "_base_schems_id")
Expand Down
Expand Up @@ -32,12 +32,23 @@ public class BaseSchemaRecordsCreation {
protected QueryRunner runner; protected QueryRunner runner;
protected DataDefinition dd; protected DataDefinition dd;


/**
* Create new instance of BaseSchemaRecordsCreation.
*
* @param connection the connection to relational database
*/
public BaseSchemaRecordsCreation(Connection connection) { public BaseSchemaRecordsCreation(Connection connection) {
this.connection = connection; this.connection = connection;
runner = new QueryRunner(); runner = new QueryRunner();
dd = new DataDefinition(connection); dd = new DataDefinition(connection);
} }


/**
* Final phase of migration -- add created ctl based schemas to database.
*
* @param ctlToSchemas mapping of common type to a couple of schemas
* @throws SQLException the sql exception
*/
public void create(Map<Ctl, List<Schema>> ctlToSchemas) throws SQLException { public void create(Map<Ctl, List<Schema>> ctlToSchemas) throws SQLException {
List<Object[]> params = new ArrayList<>(); List<Object[]> params = new ArrayList<>();
int schemaCounter = 0; int schemaCounter = 0;
Expand All @@ -57,7 +68,8 @@ public void create(Map<Ctl, List<Schema>> ctlToSchemas) throws SQLException {
} }
} }


runner.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[schemaCounter][])); runner.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)",
params.toArray(new Object[schemaCounter][]));
} }


} }
Expand Up @@ -56,6 +56,11 @@ public class CtlAggregation {
private Set<Ctl> ctls; private Set<Ctl> ctls;




/**
* Create a new instance that aggregate the same CTL schemas.
*
* @param connection the connection to relational database
*/
public CtlAggregation(Connection connection) { public CtlAggregation(Connection connection) {
this.connection = connection; this.connection = connection;
runner = new QueryRunner(); runner = new QueryRunner();
Expand All @@ -65,9 +70,9 @@ public CtlAggregation(Connection connection) {
} }




/* /**
* Return schemas and CTLs map that further used for creating new records in base_schema table * Return schemas and CTLs map that further used for creating new records in base_schema table.
* */ */
public Map<Ctl, List<Schema>> aggregate(List<Schema> schemas) public Map<Ctl, List<Schema>> aggregate(List<Schema> schemas)
throws SQLException, ConfigurationGenerationException, IOException { throws SQLException, ConfigurationGenerationException, IOException {
Long currentCtlMetaId = runner.query(connection, Long currentCtlMetaId = runner.query(connection,
Expand Down Expand Up @@ -102,7 +107,7 @@ public Map<Ctl, List<Schema>> aggregate(List<Schema> schemas)
DefaultRecordGenerationAlgorithm<RawData> algorithm = DefaultRecordGenerationAlgorithm<RawData> algorithm =
new DefaultRecordGenerationAlgorithmImpl<>(rawSchema, new RawDataFactory()); new DefaultRecordGenerationAlgorithmImpl<>(rawSchema, new RawDataFactory());
String defaultRecord = algorithm.getRootData().getRawData(); String defaultRecord = algorithm.getRootData().getRawData();
Long tenantId = null; Long tenantId;
if (schema.getAppId() != null) { if (schema.getAppId() != null) {
tenantId = runner.query( tenantId = runner.query(
connection, connection,
Expand Down
Expand Up @@ -91,8 +91,8 @@ protected List<Schema> transform() throws SQLException {
"SELECT id, schems, created_time, created_username FROM " "SELECT id, schems, created_time, created_username FROM "
+ EVENT_SCHEMA_VERSION_TABLE_NAME, + EVENT_SCHEMA_VERSION_TABLE_NAME,
new BeanListHandler<EventSchemaVersion>(EventSchemaVersion.class)); new BeanListHandler<EventSchemaVersion>(EventSchemaVersion.class));
List<EventClass> oldECs = runner.query( final List<EventClass> oldECs = runner.query(connection,
connection, "SELECT id, schems, version FROM " + EVENT_CLASS_TABLE_NAME "SELECT id, schems, version FROM " + EVENT_CLASS_TABLE_NAME
+ " WHERE schems not like '{\"type\":\"enum\"%'", + " WHERE schems not like '{\"type\":\"enum\"%'",
new BeanListHandler<>(EventClass.class)); new BeanListHandler<>(EventClass.class));


Expand Down
Expand Up @@ -20,6 +20,7 @@
import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static com.datastax.driver.core.querybuilder.QueryBuilder.set; import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
import static com.datastax.driver.core.querybuilder.QueryBuilder.update; import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
import static java.lang.Long.parseLong;


import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Cluster;
Expand Down Expand Up @@ -50,14 +51,21 @@ public class CtlNotificationMigration extends AbstractCtlMigration {
private String dbName; private String dbName;
private String nosql; private String nosql;


public CtlNotificationMigration(Connection connection, String host, String db, String nosql) { /**
* Create entity that responsible for data migration from old tables notification to
* new ctl based ones.
*
* @param connection the connection to relational database
* @param options the options for configuring NoSQL databases
*/
public CtlNotificationMigration(Connection connection, Options options) {
super(connection); super(connection);
client = new MongoClient(host); client = new MongoClient(options.getHost());
cluster = Cluster.builder() cluster = Cluster.builder()
.addContactPoint(host) .addContactPoint(options.getHost())
.build(); .build();
dbName = db; dbName = options.getDbName();
this.nosql = nosql; this.nosql = options.getDbName();
} }




Expand All @@ -73,15 +81,24 @@ protected List<Schema> transform() throws SQLException {
FindIterable<Document> cursor = notification.find(); FindIterable<Document> cursor = notification.find();
for (Document document : cursor) { for (Document document : cursor) {
Object id = document.get("_id"); Object id = document.get("_id");
Long schemaId = Long.parseLong((String) document.get("notification_schema_id")); Long schemaId = parseLong((String) document.get("notification_schema_id"));
notification.updateMany(Filters.eq("_id", id), Filters.eq("$set", Filters.eq("notification_schema_id", schemaId + idShift))); notification.updateMany(
Filters.eq("_id", id),
Filters.eq("$set", Filters.eq("notification_schema_id", schemaId + idShift))
);
} }


cursor = enpNotification.find(); cursor = enpNotification.find();
for (Document document : cursor) { for (Document document : cursor) {
Object id = document.get("_id"); Object id = document.get("_id");
Long schemaId = Long.parseLong((String) document.get("notification.notification_schema_id")); Long schemaId = parseLong((String) document.get("notification.notification_schema_id"));
notification.updateMany(Filters.eq("_id", id), Filters.eq("$set", Filters.eq("notification.notification_schema_id", schemaId + idShift))); notification.updateMany(
Filters.eq("_id", id),
Filters.eq("$set", Filters.eq(
"notification.notification_schema_id",
schemaId + idShift)
)
);
} }
} else { } else {
Session session = cluster.connect(dbName); Session session = cluster.connect(dbName);
Expand All @@ -91,7 +108,7 @@ protected List<Schema> transform() throws SQLException {
ResultSet results = session.execute(select().from("notification")); ResultSet results = session.execute(select().from("notification"));
for (Row row : results) { for (Row row : results) {
String id = row.getString("nf_id"); String id = row.getString("nf_id");
Long schemaId = Long.parseLong(row.getString("schema_id")); Long schemaId = parseLong(row.getString("schema_id"));
String[] ids = id.split("::"); String[] ids = id.split("::");


batchStatement.add( batchStatement.add(
Expand All @@ -108,7 +125,7 @@ protected List<Schema> transform() throws SQLException {
results = session.execute(select().from("ep_nfs")); results = session.execute(select().from("ep_nfs"));
for (Row row : results) { for (Row row : results) {
String id = row.getString("nf_id"); String id = row.getString("nf_id");
Long schemaId = Long.parseLong(row.getString("schema_id")); Long schemaId = parseLong(row.getString("schema_id"));
String[] ids = id.split("::"); String[] ids = id.split("::");
ByteBuffer epKeyHash = Bytes.fromHexString(ids[0]); ByteBuffer epKeyHash = Bytes.fromHexString(ids[0]);
Date lastModTime = new Date(Long.valueOf(ids[1])); Date lastModTime = new Date(Long.valueOf(ids[1]));
Expand Down
Expand Up @@ -26,17 +26,23 @@
import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoDatabase;


import org.bson.Document; import org.bson.Document;
import org.kaaproject.kaa.server.datamigration.utils.Options;


public class EndpointProfileMigration { public class EndpointProfileMigration {
private String host; private String host;
private String dbName; private String dbName;
private String nosql; private String nosql;




public EndpointProfileMigration(String host, String db, String nosql) { /**
dbName = db; * Create new instance of object that responsible for migration endpoint profiles.
this.host = host; *
this.nosql = nosql; * @param options the options
*/
public EndpointProfileMigration(Options options) {
dbName = options.getDbName();
this.host = options.getHost();
this.nosql = options.getNoSql();
} }


/** /**
Expand Down
Expand Up @@ -39,6 +39,11 @@ public class MigrateData {
private static final Logger LOG = LoggerFactory.getLogger(MigrateData.class); private static final Logger LOG = LoggerFactory.getLogger(MigrateData.class);
private static Connection conn; private static Connection conn;


/**
* The entry point of migrate data application.
*
* @param args the input options
*/
public static void main(String[] args) { public static void main(String[] args) {
Options options = new Options(); Options options = new Options();


Expand Down Expand Up @@ -83,18 +88,24 @@ public static void main(String[] args) {
List<Schema> schemas = new ArrayList<>(); List<Schema> schemas = new ArrayList<>();
conn = DataSources.getDataSource(options).getConnection(); conn = DataSources.getDataSource(options).getConnection();
QueryRunner runner = new QueryRunner(); QueryRunner runner = new QueryRunner();
Long maxId = runner.query(conn, "select max(id) as max_id from base_schems", rs -> rs.next() ? rs.getLong("max_id") : null); Long maxId = runner.query(conn, "select max(id) as max_id from base_schems",
rs -> rs.next() ? rs.getLong("max_id") : null);

BaseSchemaIdCounter.setInitValue(maxId); BaseSchemaIdCounter.setInitValue(maxId);
UpdateUuidsMigration updateUuidsMigration = new UpdateUuidsMigration(conn, options.getHost(), options.getDbName(), options.getNoSql());
EndpointProfileMigration endpointProfileMigration = new EndpointProfileMigration(options.getHost(), options.getDbName(), options.getNoSql()); final UpdateUuidsMigration updateUuidsMigration = new UpdateUuidsMigration(conn, options);

final EndpointProfileMigration endpointProfileMigration =
new EndpointProfileMigration(options);

List<AbstractCtlMigration> migrationList = new ArrayList<>(); List<AbstractCtlMigration> migrationList = new ArrayList<>();
migrationList.add(new CtlConfigurationMigration(conn)); migrationList.add(new CtlConfigurationMigration(conn));
migrationList.add(new CtlEventsMigration(conn)); migrationList.add(new CtlEventsMigration(conn));
migrationList.add(new CtlNotificationMigration(conn, options.getHost(), options.getDbName(), options.getNoSql())); migrationList.add(new CtlNotificationMigration(conn, options));
migrationList.add(new CtlLogMigration(conn)); migrationList.add(new CtlLogMigration(conn));


CtlAggregation aggregation = new CtlAggregation(conn); final CtlAggregation aggregation = new CtlAggregation(conn);
BaseSchemaRecordsCreation recordsCreation = new BaseSchemaRecordsCreation(conn); final BaseSchemaRecordsCreation recordsCreation = new BaseSchemaRecordsCreation(conn);


// convert uuids from latin1 to base64 // convert uuids from latin1 to base64
updateUuidsMigration.transform(); updateUuidsMigration.transform();
Expand Down
Expand Up @@ -54,26 +54,35 @@ public class UpdateUuidsMigration {
private String dbName; private String dbName;
private String nosql; private String nosql;


public UpdateUuidsMigration(Connection connection, String host, String db, String nosql) { /**
* Create a new instance of UpdateUuidsMigration.
*
* @param connection the connection to relational database
* @param options the options for configuring NoSQL databases
*/
public UpdateUuidsMigration(Connection connection, Options options) {
this.connection = connection; this.connection = connection;
client = new MongoClient(host); client = new MongoClient(options.getHost());
cluster = Cluster.builder() cluster = Cluster.builder()
.addContactPoint(host) .addContactPoint(options.getHost())
.build(); .build();
dbName = db; dbName = options.getDbName();
this.nosql = nosql; this.nosql = options.getNoSql();
} }


public void transform() throws IOException, SQLException { public void transform() throws IOException, SQLException {
QueryRunner run = new QueryRunner(); QueryRunner run = new QueryRunner();
ResultSetHandler<List<Configuration>> rsHandler = new BeanListHandler<Configuration>(Configuration.class); ResultSetHandler<List<Configuration>> rsHandler = new BeanListHandler<>(Configuration.class);
List<Configuration> configs = run.query(connection, "SELECT * FROM configuration", rsHandler); List<Configuration> configs = run.query(connection, "SELECT * FROM configuration", rsHandler);
for (Configuration config : configs) { for (Configuration config : configs) {
JsonNode json = new ObjectMapper().readTree(config.getConfigurationBody()); JsonNode json = new ObjectMapper().readTree(config.getConfigurationBody());
JsonNode jsonEncoded = encodeUuids(json); JsonNode jsonEncoded = encodeUuids(json);
byte[] encodedConfigurationBody = jsonEncoded.toString().getBytes(); byte[] encodedConfigurationBody = jsonEncoded.toString().getBytes();


int updates = run.update(connection, "UPDATE configuration SET configuration_body=? WHERE id=?", encodedConfigurationBody, config.getId()); int updates = run.update(connection,
"UPDATE configuration SET configuration_body=? WHERE id=?",
encodedConfigurationBody, config.getId()
);
if (updates != 1) { if (updates != 1) {
System.err.println("Error: failed to update configuration: " + config); System.err.println("Error: failed to update configuration: " + config);
} }
Expand All @@ -87,7 +96,10 @@ public void transform() throws IOException, SQLException {
String body = (String) d.get("body"); String body = (String) d.get("body");
JsonNode json = new ObjectMapper().readTree(body); JsonNode json = new ObjectMapper().readTree(body);
JsonNode jsonEncoded = encodeUuids(json); JsonNode jsonEncoded = encodeUuids(json);
userConfiguration.updateOne(Filters.eq("_id", d.get("_id")), Filters.eq("$set", Filters.eq("body", jsonEncoded))); userConfiguration.updateOne(
Filters.eq("_id", d.get("_id")),
Filters.eq("$set", Filters.eq("body", jsonEncoded))
);
} }


} else { } else {
Expand Down

0 comments on commit 9970924

Please sign in to comment.