Skip to content

Commit

Permalink
Small refactoring project structure and added data definition utils
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill380 committed Aug 9, 2016
1 parent df8f1e9 commit 1a8f599
Show file tree
Hide file tree
Showing 12 changed files with 339 additions and 77 deletions.
Expand Up @@ -3,18 +3,11 @@
import org.apache.avro.Schema;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;
import org.kaaproject.data_migration.model.Configuration;
import org.kaaproject.data_migration.model.ConfigurationSchema;
import org.kaaproject.data_migration.model.Ctl;
import org.kaaproject.data_migration.model.CtlMetaInfo;
import org.kaaproject.data_migration.utils.Utils;
import org.kaaproject.kaa.common.dto.Util;
import org.kaaproject.kaa.common.dto.ctl.CTLSchemaDto;
import org.kaaproject.data_migration.utils.datadefinition.DataDefinition;
import org.kaaproject.kaa.server.common.admin.AdminClient;
import org.kaaproject.kaa.server.common.core.algorithms.generation.ConfigurationGenerationException;
import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithm;
Expand All @@ -28,51 +21,68 @@
import java.sql.SQLException;
import java.util.*;

import static java.util.Arrays.asList;
import static java.util.stream.Collectors.joining;
import static org.kaaproject.data_migration.utils.Constants.HOST;
import static org.kaaproject.data_migration.utils.Constants.PORT;
import static org.kaaproject.data_migration.utils.datadefinition.Constraint.constraint;
import static org.kaaproject.data_migration.utils.datadefinition.ReferenceOptions.CASCADE;

public class CTLConfigurationMigration {
// private static final Logger LOG = LoggerFactory.getLogger(CTLConfigurationMigration.class.getSimpleName());

private static final String UUID_FIELD = "__uuid";
private static final String UUID_VALUE = "org.kaaproject.configuration.uuidT";

private Connection connection;
private final int NUM_OF_BASE_SCHEMA_FIELDS = 8;
private AdminClient client;
private QueryRunner runner;
private DataDefinition dd;

public CTLConfigurationMigration(Connection connection) {
this.connection = connection;
this.client = new AdminClient(HOST, PORT);
runner = new QueryRunner();
dd = new DataDefinition(connection);
}

public CTLConfigurationMigration(Connection connection, String host, int port) {
this.connection = connection;
this.client = new AdminClient(host, port);
runner = new QueryRunner();
dd = new DataDefinition(connection);
}


private void beforeTransform() throws SQLException {
dd.dropUnnamedFK("configuration_schems", "schems");
dd.dropUnnamedFK("configuration", "configuration_schems");
dd.alterTable("configuration")
.add(constraint("FK_configuration_schems_id")
.foreignKey("configuration_schems_id")
.references("configuration_schems", "id")
.onDelete(CASCADE)
.onUpdate(CASCADE)
)
.execute();
}

public void transform() throws SQLException, IOException {
QueryRunner runner = new QueryRunner();
try {
updateUuids();

List<ConfigurationSchema> schemas = runner.query(connection,
public void transform() throws SQLException {
try {
beforeTransform();
List<ConfigurationSchema> schemas = runner.query(connection,
"select conf.id as id, created_time as createdTime, created_username as createdUsername, description, name, schems, version, application_id as appId " +
"from configuration_schems conf join schems s on conf.id = s.id", new BeanListHandler<ConfigurationSchema>(ConfigurationSchema.class));
String toDelete = schemas.stream().map(s -> s.getId().toString()).collect(joining(", "));
"from configuration_schems conf join schems s on conf.id = s.id", new BeanListHandler<ConfigurationSchema>(ConfigurationSchema.class));

String toDelete = schemas.stream().map(s -> s.getId().toString()).collect(joining(", "));
runner.update(connection, "delete from schems where id in (" + toDelete + ")");

Long shift = runner.query(connection, "select max(id) as max_id from base_schems", rs -> rs.next() ? rs.getLong("max_id") : null);
runner.update(connection, "update configuration_schems set id = id + " + shift + " order by id desc");
schemas.forEach(s -> s.setId(s.getId() + shift));
Map<Ctl, List<ConfigurationSchema>> confSchemasToCTL = new HashMap<>();

Map<Ctl, ConfigurationSchema> confSchemasToCTL = new HashMap<>();
Long currentCTLMetaId = runner.query(connection, "select max(id) as max_id from ctl_metainfo", rs -> rs.next() ? rs.getLong("max_id") : null);
Long currentCtlId = runner.query(connection, "select max(id) as max_id from ctl", rs -> rs.next() ? rs.getLong("max_id") : null);

// CTL creation
//TODO add check for already existed CTL schema to avoid constrain violation
for (ConfigurationSchema schema : schemas) {
currentCTLMetaId++;
currentCtlId++;
Expand All @@ -86,20 +96,14 @@ public void transform() throws SQLException, IOException {
runner.insert(connection, "insert into ctl values(?, ?, ?, ?, ?, ?, ?)", rs -> null, currentCtlId, schema.getSchems(), schema.getCreatedTime(),
schema.getCreatedUsername(), defaultRecord, schema.getVersion(), currentCTLMetaId);

// aggregate configuration schemas with same fqn

Ctl ctl = new Ctl(currentCtlId, new CtlMetaInfo(fqn, schema.getAppId(), tenantId));
if (confSchemasToCTL.containsKey(ctl)) {
List<ConfigurationSchema> list = confSchemasToCTL.get(ctl);
list.add(schema);
confSchemasToCTL.put(ctl, list);
} else {
confSchemasToCTL.put(ctl, asList(schema));
}
confSchemasToCTL.put(ctl, schema);
}

List<Object[]> params = new ArrayList<>();
for (Ctl ctl : confSchemasToCTL.keySet()) {
for (ConfigurationSchema schema : confSchemasToCTL.get(ctl)) {
ConfigurationSchema schema = confSchemasToCTL.get(ctl);
params.add(new Object[]{
schema.getId(),
schema.getCreatedTime(),
Expand All @@ -110,7 +114,6 @@ public void transform() throws SQLException, IOException {
ctl.getMetaInfo().getAppId(),
ctl.getId()
});
}
}

runner.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[schemas.size()][]));
Expand All @@ -122,37 +125,5 @@ public void transform() throws SQLException, IOException {

}

private void updateUuids() throws SQLException, IOException {
QueryRunner run = new QueryRunner();

ResultSetHandler<List<Configuration>> rsHandler = new BeanListHandler<Configuration>(Configuration.class);
List<Configuration> configs = run.query(this.connection, "SELECT * FROM configuration", rsHandler);
for (Configuration config : configs) {
JsonNode json = new ObjectMapper().readTree(config.getConfiguration_body());
JsonNode jsonEncoded = encodeUuids(json);
byte[] encodedConfigurationBody = jsonEncoded.toString().getBytes();

int updates = run.update(this.connection, "UPDATE configuration SET configuration_body=? WHERE id=?", encodedConfigurationBody,config.getId());
if (updates == 1) {
//TODO
}
}
}

private JsonNode encodeUuids(JsonNode json) throws IOException {
if (json.has(UUID_FIELD)) {
JsonNode j = json.get(UUID_FIELD);
if (j.has(UUID_VALUE)) {
String value = j.get(UUID_VALUE).asText();
String encodedValue = Base64.getEncoder().encodeToString(value.getBytes("ISO-8859-1"));
((ObjectNode)j).put(UUID_VALUE, encodedValue);
}
}

for (JsonNode node : json) {
if (node.isContainerNode()) encodeUuids(node);
}

return json;
}
}
Expand Up @@ -7,7 +7,7 @@
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.kaaproject.data_migration.model.EventClass;
import org.kaaproject.data_migration.model.EventSchemaVersion;
import org.kaaproject.data_migration.utils.Utils;
import org.kaaproject.data_migration.utils.datadefinition.DataDefinition;
import org.kaaproject.kaa.common.dto.ctl.CTLSchemaDto;
import org.kaaproject.kaa.server.common.admin.AdminClient;

Expand All @@ -29,9 +29,11 @@ public class CTLEventsMigration {
private static final String EVENT_CLASS_TABLE_NAME = "events_class";
private static final String BASE_SCHEMA_TABLE_NAME = "base_schems";
private static final String CTL_TABLE_NAME = "ctl";
private DataDefinition dd;

public CTLEventsMigration(Connection connection) {
this.connection = connection;
this.dd = new DataDefinition(connection);
}

public void transform() throws SQLException, IOException {
Expand Down Expand Up @@ -65,7 +67,7 @@ public void transform() throws SQLException, IOException {
//3
run.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " RENAME " + EVENT_CLASS_FAMILY_VERSION_TABLE_NAME);
//4
Utils.dropFK(connection, EVENT_CLASS_TABLE_NAME, EVENT_CLASS_FAMILY_TABLE_NAME);
dd.dropUnnamedFK(EVENT_CLASS_TABLE_NAME, EVENT_CLASS_FAMILY_TABLE_NAME);
run.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " CHANGE events_class_family_id events_class_family_versions_id bigint(20)");
//5
ResultSetHandler<List<EventClass>> ecHandler = new BeanListHandler<EventClass>(EventClass.class);
Expand Down
@@ -0,0 +1,49 @@
package org.kaaproject.data_migration;


import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.kaaproject.data_migration.model.Configuration;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

import static org.kaaproject.data_migration.utils.Utils.encodeUuids;

public class UpdateUuidsMigration {
private Connection connection;

public UpdateUuidsMigration(Connection connection) {
this.connection = connection;
}

public void transform() throws IOException {
try {
QueryRunner run = new QueryRunner();
ResultSetHandler<List<Configuration>> rsHandler = new BeanListHandler<Configuration>(Configuration.class);
List<Configuration> configs = run.query(connection, "SELECT * FROM configuration", rsHandler);
for (Configuration config : configs) {
JsonNode json = new ObjectMapper().readTree(config.getConfiguration_body());
JsonNode jsonEncoded = encodeUuids(json);
byte[] encodedConfigurationBody = jsonEncoded.toString().getBytes();

int updates = run.update(connection, "UPDATE configuration SET configuration_body=? WHERE id=?", encodedConfigurationBody, config.getId());
if (updates == 1) {
//TODO
}
}

} catch (SQLException e) {
System.err.println("Error: " + e.getMessage());
} finally {
DbUtils.closeQuietly(connection);
}
}

}
Expand Up @@ -3,26 +3,43 @@

import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.io.IOUtils;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.node.ObjectNode;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Base64;

public class Utils {
final public class Utils {

private static final String QUERY_FIND_FK_NAME = "SELECT CONSTRAINT_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE REFERENCED_TABLE_SCHEMA = 'kaa' AND TABLE_NAME = '%s' and referenced_table_name='%s'";
private static final String UUID_FIELD = "__uuid";
private static final String UUID_VALUE = "org.kaaproject.configuration.uuidT";

public static void runFile(QueryRunner runner, Connection connection, String fileName) throws IOException, SQLException {
String query = IOUtils.toString(Utils.class.getClassLoader().getResourceAsStream(fileName));
runner.update(connection, query);
}

public static void dropFK(Connection connection, String tableName, String referencedTableName) throws SQLException {
QueryRunner runner = new QueryRunner();

String query = String.format(QUERY_FIND_FK_NAME, tableName, referencedTableName);
String fkName = runner.query(connection, query, rs -> rs.next() ? rs.getString(1) : null);
runner.update(connection, "ALTER TABLE " + tableName + " DROP FOREIGN KEY " + fkName);


public static JsonNode encodeUuids(JsonNode json) throws IOException {
if (json.has(UUID_FIELD)) {
JsonNode j = json.get(UUID_FIELD);
if (j.has(UUID_VALUE)) {
String value = j.get(UUID_VALUE).asText();
String encodedValue = Base64.getEncoder().encodeToString(value.getBytes("ISO-8859-1"));
((ObjectNode)j).put(UUID_VALUE, encodedValue);
}
}

for (JsonNode node : json) {
if (node.isContainerNode()) encodeUuids(node);
}

return json;
}






}
@@ -0,0 +1,10 @@
package org.kaaproject.data_migration.utils.datadefinition;


public class BuilderException extends RuntimeException {


public BuilderException(String s) {
super(s);
}
}

0 comments on commit 1a8f599

Please sign in to comment.