Skip to content

Commit

Permalink
Transfer same functional for migration to base class - AbstractCTLMig…
Browse files Browse the repository at this point in the history
…ration
  • Loading branch information
Kirill380 committed Aug 11, 2016
1 parent 5147bb8 commit 91a53e1
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 103 deletions.
@@ -0,0 +1,71 @@
package org.kaaproject.data_migration;


import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.kaaproject.data_migration.model.Schema;
import org.kaaproject.data_migration.utils.datadefinition.DataDefinition;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

import static java.util.stream.Collectors.joining;
import static org.kaaproject.data_migration.utils.datadefinition.Constraint.constraint;
import static org.kaaproject.data_migration.utils.datadefinition.ReferenceOptions.CASCADE;

public abstract class AbstractCTLMigration {
protected Connection connection;
protected QueryRunner runner;
protected DataDefinition dd;

public AbstractCTLMigration(Connection connection) {
this.connection = connection;
runner = new QueryRunner();
dd = new DataDefinition(connection);
}

public void beforeTransform() throws SQLException {
// delete relation between <feature>_schems to schems
dd.dropUnnamedFK(getName() + "_schems", "schems");

// change FK constraint between table that contains data and appropriate <feature>_schems table
dd.dropUnnamedFK(getName(), getName() + "_schems");
dd.alterTable(getName())
.add(constraint("FK_" + getName() + "_schems_id")
.foreignKey(getName() + "_schems_id")
.references(getName() + "_schems", "id")
.onDelete(CASCADE)
.onUpdate(CASCADE)
)
.execute();
}


protected List<Schema> transform() throws SQLException {
// fetch schemas of appropriate feature like configuration
List<Schema> schemas = runner.query(connection, "select f.id as id, created_time as createdTime, created_username as createdUsername, description, name, schems, version, application_id as appId " +
"from " + getName() + "_schems f join schems s on f.id = s.id", new BeanListHandler<>(Schema.class));

// delete the fetched ids from schema table
String toDelete = schemas.stream().map(s -> s.getId().toString()).collect(joining(", "));
runner.update(connection, "delete from schems where id in (" + toDelete + ")");

// set Type of schema
schemas.forEach( s -> s.setType(getType()));

return schemas;
}


public void afterTransform() throws SQLException {

}



protected abstract String getName();

protected abstract Schema.SchemaType getType();
}
@@ -1,14 +1,13 @@
package org.kaaproject.data_migration;

import org.apache.avro.Schema;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.kaaproject.data_migration.model.ConfigurationSchema;
import org.apache.commons.dbutils.handlers.ScalarHandler;
import org.kaaproject.data_migration.model.Schema;
import org.kaaproject.data_migration.model.Ctl;
import org.kaaproject.data_migration.model.CtlMetaInfo;
import org.kaaproject.data_migration.utils.datadefinition.DataDefinition;
import org.kaaproject.kaa.server.common.admin.AdminClient;
import org.kaaproject.kaa.server.common.core.algorithms.generation.ConfigurationGenerationException;
import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithm;
import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithmImpl;
Expand All @@ -22,108 +21,23 @@
import java.util.*;

import static java.util.stream.Collectors.joining;
import static org.kaaproject.data_migration.utils.Constants.HOST;
import static org.kaaproject.data_migration.utils.Constants.PORT;
import static org.kaaproject.data_migration.utils.datadefinition.Constraint.constraint;
import static org.kaaproject.data_migration.utils.datadefinition.ReferenceOptions.CASCADE;

public class CTLConfigurationMigration {
private Connection connection;
private final int NUM_OF_BASE_SCHEMA_FIELDS = 8;
private AdminClient client;
private QueryRunner runner;
private DataDefinition dd;
public class CTLConfigurationMigration extends AbstractCTLMigration {

public CTLConfigurationMigration(Connection connection) {
this.connection = connection;
this.client = new AdminClient(HOST, PORT);
runner = new QueryRunner();
dd = new DataDefinition(connection);
}

public CTLConfigurationMigration(Connection connection, String host, int port) {
this.connection = connection;
this.client = new AdminClient(host, port);
runner = new QueryRunner();
dd = new DataDefinition(connection);
public CTLConfigurationMigration(Connection connection) {
super(connection);
}


private void beforeTransform() throws SQLException {
dd.dropUnnamedFK("configuration_schems", "schems");
dd.dropUnnamedFK("configuration", "configuration_schems");
dd.alterTable("configuration")
.add(constraint("FK_configuration_schems_id")
.foreignKey("configuration_schems_id")
.references("configuration_schems", "id")
.onDelete(CASCADE)
.onUpdate(CASCADE)
)
.execute();
@Override
protected String getName() {
return "configuration";
}


public void transform() throws SQLException {
try {
beforeTransform();
List<ConfigurationSchema> schemas = runner.query(connection,
"select conf.id as id, created_time as createdTime, created_username as createdUsername, description, name, schems, version, application_id as appId " +
"from configuration_schems conf join schems s on conf.id = s.id", new BeanListHandler<ConfigurationSchema>(ConfigurationSchema.class));

String toDelete = schemas.stream().map(s -> s.getId().toString()).collect(joining(", "));
runner.update(connection, "delete from schems where id in (" + toDelete + ")");

Long shift = runner.query(connection, "select max(id) as max_id from base_schems", rs -> rs.next() ? rs.getLong("max_id") : null);
runner.update(connection, "update configuration_schems set id = id + " + shift + " order by id desc");
schemas.forEach(s -> s.setId(s.getId() + shift));

Map<Ctl, ConfigurationSchema> confSchemasToCTL = new HashMap<>();
Long currentCTLMetaId = runner.query(connection, "select max(id) as max_id from ctl_metainfo", rs -> rs.next() ? rs.getLong("max_id") : null);
Long currentCtlId = runner.query(connection, "select max(id) as max_id from ctl", rs -> rs.next() ? rs.getLong("max_id") : null);

// CTL creation
//TODO add check for already existed CTL schema to avoid constrain violation
for (ConfigurationSchema schema : schemas) {
currentCTLMetaId++;
currentCtlId++;
Schema schemaBody = new Schema.Parser().parse(schema.getSchems());
String fqn = schemaBody.getFullName();
RawSchema rawSchema = new RawSchema(schemaBody.toString());
DefaultRecordGenerationAlgorithm<RawData> algotithm = new DefaultRecordGenerationAlgorithmImpl<>(rawSchema, new RawDataFactory());
String defaultRecord = algotithm.getRootData().getRawData();
Long tenantId = runner.query(connection, "select tenant_id from application where id = " + schema.getAppId(), rs -> rs.next() ? rs.getLong("tenant_id") : null);
runner.insert(connection, "insert into ctl_metainfo values(?, ?, ?, ?)", rs -> null, currentCTLMetaId, fqn, schema.getAppId(), tenantId);
runner.insert(connection, "insert into ctl values(?, ?, ?, ?, ?, ?, ?)", rs -> null, currentCtlId, schema.getSchems(), schema.getCreatedTime(),
schema.getCreatedUsername(), defaultRecord, schema.getVersion(), currentCTLMetaId);


Ctl ctl = new Ctl(currentCtlId, new CtlMetaInfo(fqn, schema.getAppId(), tenantId));
confSchemasToCTL.put(ctl, schema);
}

List<Object[]> params = new ArrayList<>();
for (Ctl ctl : confSchemasToCTL.keySet()) {
ConfigurationSchema schema = confSchemasToCTL.get(ctl);
params.add(new Object[]{
schema.getId(),
schema.getCreatedTime(),
schema.getCreatedUsername(),
schema.getDescription(),
schema.getName(),
schema.getVersion(),
ctl.getMetaInfo().getAppId(),
ctl.getId()
});
}

runner.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[schemas.size()][]));
} catch (SQLException | ConfigurationGenerationException | IOException e) {
System.err.println("Error: " + e.getMessage());
} finally {
DbUtils.closeQuietly(connection);
}

@Override
protected Schema.SchemaType getType() {
return Schema.SchemaType.CONFIGURATION;
}


}
Expand Up @@ -12,21 +12,23 @@ public CtlMetaInfo(String fqn, Long appId, Long tenatnId) {
this.tenatnId = tenatnId;
}


@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CtlMetaInfo that = (CtlMetaInfo) o;

return fqn != null ? fqn.equals(that.fqn) : that.fqn == null;
if (!fqn.equals(that.fqn)) return false;
return appId != null ? appId.equals(that.appId) : that.appId == null;

}

@Override
public int hashCode() {
return fqn != null ? fqn.hashCode() : 0;
int result = fqn.hashCode();
result = 31 * result + (appId != null ? appId.hashCode() : 0);
return result;
}

public Long getAppId() {
Expand Down
@@ -1,6 +1,6 @@
package org.kaaproject.data_migration.model;

public class ConfigurationSchema {
public class Schema {

private Long id;

Expand All @@ -18,6 +18,8 @@ public class ConfigurationSchema {

private String schems;

private SchemaType type;


public Long getId() {
return id;
Expand Down Expand Up @@ -82,4 +84,16 @@ public String getSchems() {
public void setSchems(String schems) {
this.schems = schems;
}

public SchemaType getType() {
return type;
}

public void setType(SchemaType type) {
this.type = type;
}

public enum SchemaType {
CONFIGURATION,NOTIFICATION, EVENTS, LOG;
}
}
Expand Up @@ -12,8 +12,6 @@ public enum DataSources {
MARIADB(getMariaDB()), POSTGRES(getPostgreSQL());

private final DataSource ds;
private static final String USER_NAME = "root";
private static final String PASSWORD = "root";
private static final String DB_NAME = "kaa";


Expand Down

0 comments on commit 91a53e1

Please sign in to comment.