diff --git a/server/common/dto/src/main/java/org/kaaproject/kaa/common/dto/ctl/CTLSchemaDto.java b/server/common/dto/src/main/java/org/kaaproject/kaa/common/dto/ctl/CTLSchemaDto.java index 2fe2b30b62..cc6cd9e30f 100644 --- a/server/common/dto/src/main/java/org/kaaproject/kaa/common/dto/ctl/CTLSchemaDto.java +++ b/server/common/dto/src/main/java/org/kaaproject/kaa/common/dto/ctl/CTLSchemaDto.java @@ -127,21 +127,31 @@ public boolean equals(Object obj) { if (getClass() != obj.getClass()) return false; CTLSchemaDto other = (CTLSchemaDto) obj; + + if (id == null) { + if (other.id != null) + return false; + } else if (!id.equals(other.id)) + return false; + if (dependencySet == null) { if (other.dependencySet != null) return false; } else if (!dependencySet.equals(other.dependencySet)) return false; + if (metaInfo == null) { if (other.metaInfo != null) return false; } else if (!metaInfo.equals(other.metaInfo)) return false; + if (version == null) { if (other.version != null) return false; } else if (!version.equals(other.version)) return false; + return true; } diff --git a/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/CTLConfigurationMigration.java b/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/CTLConfigurationMigration.java index 332ead73f0..3394f147ab 100644 --- a/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/CTLConfigurationMigration.java +++ b/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/CTLConfigurationMigration.java @@ -133,18 +133,20 @@ private void updateUuids() throws SQLException, IOException { byte[] encodedConfigurationBody = jsonEncoded.toString().getBytes(); int updates = run.update(this.connection, "UPDATE configuration SET configuration_body=? WHERE id=?", encodedConfigurationBody,config.getId()); - if (updates == 1) { - //TODO + if (updates != 1) { + System.err.println("Error: failed to update configuration: " + config); } } } private JsonNode encodeUuids(JsonNode json) throws IOException { + final String LATIN_1 = "ISO-8859-1"; + if (json.has(UUID_FIELD)) { JsonNode j = json.get(UUID_FIELD); if (j.has(UUID_VALUE)) { String value = j.get(UUID_VALUE).asText(); - String encodedValue = Base64.getEncoder().encodeToString(value.getBytes("ISO-8859-1")); + String encodedValue = Base64.getEncoder().encodeToString(value.getBytes(LATIN_1)); ((ObjectNode)j).put(UUID_VALUE, encodedValue); } } diff --git a/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/CTLEventsMigration.java b/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/CTLEventsMigration.java index ce080ef167..7971de2942 100644 --- a/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/CTLEventsMigration.java +++ b/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/CTLEventsMigration.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; import org.apache.commons.dbutils.QueryRunner; import org.apache.commons.dbutils.ResultSetHandler; import org.apache.commons.dbutils.handlers.BeanListHandler; @@ -10,6 +11,11 @@ import org.kaaproject.data_migration.utils.Utils; import org.kaaproject.kaa.common.dto.ctl.CTLSchemaDto; import org.kaaproject.kaa.server.common.admin.AdminClient; +import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithm; +import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithmImpl; +import org.kaaproject.kaa.server.common.core.configuration.RawData; +import org.kaaproject.kaa.server.common.core.configuration.RawDataFactory; +import org.kaaproject.kaa.server.common.core.schema.RawSchema; import java.io.IOException; import java.sql.Connection; @@ -19,9 +25,13 @@ import java.util.List; import java.util.Map; +import static java.util.Arrays.asList; +import static org.kaaproject.data_migration.utils.Constants.HOST; +import static org.kaaproject.data_migration.utils.Constants.PORT; + public class CTLEventsMigration { private Connection connection; - private AdminClient client = new AdminClient("localhost", 8080); + private AdminClient client; private static final String EVENT_SCHEMA_VERSION_TABLE_NAME = "event_schems_versions"; private static final String EVENT_CLASS_FAMILY_TABLE_NAME = "events_class_family"; @@ -32,9 +42,15 @@ public class CTLEventsMigration { public CTLEventsMigration(Connection connection) { this.connection = connection; + this.client = new AdminClient(HOST, PORT); } - public void transform() throws SQLException, IOException { + public CTLEventsMigration(Connection connection, String host, int port) { + this.connection = connection; + this.client = new AdminClient(host, port); + } + + public void transform() throws SQLException, IOException, Exception { /** Steps to migrate * * 1. remember (id,schems) :event_schems_versions @@ -55,74 +71,117 @@ public void transform() throws SQLException, IOException { * 12. profit */ - QueryRunner run = new QueryRunner(); + QueryRunner runner = new QueryRunner(); //1 ResultSetHandler> esvHandler = new BeanListHandler(EventSchemaVersion.class); - List oldESVs = run.query(connection, "SELECT id, schems FROM " + EVENT_SCHEMA_VERSION_TABLE_NAME, esvHandler); + List oldESVs = runner.query(connection, + "SELECT id, schems, created_time, created_username FROM " + EVENT_SCHEMA_VERSION_TABLE_NAME, + esvHandler); //2 - run.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " DROP COLUMN schems"); + runner.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " DROP COLUMN schems"); //3 - run.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " RENAME " + EVENT_CLASS_FAMILY_VERSION_TABLE_NAME); + runner.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " RENAME " + EVENT_CLASS_FAMILY_VERSION_TABLE_NAME); //4 Utils.dropFK(connection, EVENT_CLASS_TABLE_NAME, EVENT_CLASS_FAMILY_TABLE_NAME); - run.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " CHANGE events_class_family_id events_class_family_versions_id bigint(20)"); + runner.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " CHANGE events_class_family_id events_class_family_versions_id bigint(20)"); //5 ResultSetHandler> ecHandler = new BeanListHandler(EventClass.class); - List oldECs = run.query(connection, "SELECT id, schems, version FROM " + EVENT_CLASS_TABLE_NAME, ecHandler); + List oldECs = runner.query(connection, "SELECT id, schems, version FROM " + EVENT_CLASS_TABLE_NAME, ecHandler); //6 - run.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN schems"); - run.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN version"); + runner.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN schems"); + runner.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN version"); //7 for (EventClass ec : oldECs) { - updateFamilyVersionId(ec, oldESVs, run); + updateFamilyVersionId(ec, oldESVs, runner); } //8 - Map eventClassCTLMap = new HashMap<>(); + Map> ecsToCTL = new HashMap<>(); + + Long currentCTLMetaId = runner.query(connection, "select max(id) as max_id from ctl_metainfo", rs -> rs.next() ? rs.getLong("max_id") : null); + Long currentCtlId = runner.query(connection, "select max(id) as max_id from ctl", rs -> rs.next() ? rs.getLong("max_id") : null); for (EventClass ec : oldECs) { - String tenantId = run.query(connection, "select tenant_id from " + EVENT_CLASS_TABLE_NAME + " where id=?", + currentCTLMetaId++; + currentCtlId++; + + Schema schemaBody = new Schema.Parser().parse(ec.getSchems()); + String fqn = schemaBody.getFullName(); + + RawSchema rawSchema = new RawSchema(schemaBody.toString()); + DefaultRecordGenerationAlgorithm algotithm = new DefaultRecordGenerationAlgorithmImpl<>(rawSchema, new RawDataFactory()); + String defaultRecord = algotithm.getRootData().getRawData(); + + String tenantId = runner.query(connection, "select tenant_id from " + EVENT_CLASS_TABLE_NAME + " where id=?", rs -> rs.next() ? rs.getString("tenant_id") : null, ec.getId()); - CTLSchemaDto ctlDto = client.saveCTLSchemaWithAppToken(ec.getSchems(), tenantId, null); + EventSchemaVersion esv = findParent(ec, oldESVs); + Long createdTime = esv.getCreatedTime(); + String createUsername = esv.getCreatedUsername(); + runner.insert(connection, "insert into ctl_metainfo values(?, ?, ?, ?)", rs -> null, currentCTLMetaId, fqn, null, tenantId); + runner.insert(connection, "insert into ctl values(?, ?, ?, ?, ?, ?, ?)", rs -> null, currentCtlId, ec.getSchems(), createdTime, + createUsername, defaultRecord, ec.getVersion(), currentCTLMetaId); + + CTLSchemaDto ctlDto = new CTLSchemaDto(); + ctlDto.setId(String.valueOf(currentCtlId)); + ctlDto.setVersion(ec.getVersion()); + ctlDto.setBody(ec.getSchems()); + ctlDto.setCreatedTime(createdTime); + ctlDto.setCreatedUsername(createUsername); + //9 - eventClassCTLMap.put(ec, ctlDto); + // aggregate configuration schemas with same fqn + if (ecsToCTL.containsKey(ctlDto)) { + List list = ecsToCTL.get(ctlDto); + list.add(ec); + ecsToCTL.put(ctlDto, list); + } else { + ecsToCTL.put(ctlDto, asList(ec)); + } } + //10 - Long maxId = run.query(connection, "select max(id) as max_id from base_schems", rs -> rs.next() ? rs.getLong("max_id") : null); + Long maxId = runner.query(connection, "select max(id) as max_id from base_schems", rs -> rs.next() ? rs.getLong("max_id") : null); //11 List params = new ArrayList<>(); - for (Map.Entry entry : eventClassCTLMap.entrySet()) { - EventClass ec = entry.getKey(); - CTLSchemaDto ctl = entry.getValue(); - - run.update(connection, "update " + EVENT_CLASS_TABLE_NAME + " set id = (id + ?) where id = ?", maxId, ec.getId()); - ec.setId(ec.getId()+maxId); - - String ctlBody = run.query(connection, "select body from " + CTL_TABLE_NAME + " where id = ?", - rs -> rs.next() ? rs.getString("tenant_id") : null, ctl.getId()); - - params.add(new Object[]{ - ec.getId(), - ctl.getCreatedTime(), - ctl.getCreatedUsername(), - null, - parseName(ctlBody), - ctl.getVersion(), - null, - ctl.getId() - }); + + for (CTLSchemaDto ctl : ecsToCTL.keySet()) { + for (EventClass ec : ecsToCTL.get(ctl)) { + runner.update(connection, "update " + EVENT_CLASS_TABLE_NAME + " set id = (id + ?) where id = ?", maxId, ec.getId()); + ec.setId(ec.getId() + maxId); + String ctlBody = ctl.getBody(); + + params.add(new Object[]{ + ec.getId(), + ctl.getCreatedTime(), + ctl.getCreatedUsername(), + null, + parseName(ctlBody), + ctl.getVersion(), + null, + ctl.getId() + }); + } + } + runner.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[params.size()][])); + } + + private EventSchemaVersion findParent(EventClass ec, List versions) throws IOException { + for (EventSchemaVersion esv : versions) { + if (ecBelongToThisFamilyVersion(ec, esv)) return esv; } - run.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[params.size()][])); + return null; } - private void updateFamilyVersionId(EventClass ec, List versions, QueryRunner run) throws SQLException, IOException { + private void updateFamilyVersionId(EventClass ec, List versions, QueryRunner runner) throws SQLException, IOException { for (EventSchemaVersion esv : versions) { int updateCount = 0; if (ecBelongToThisFamilyVersion(ec, esv)) { - updateCount = run.update(this.connection, + updateCount = runner.update(this.connection, "UPDATE " + EVENT_CLASS_TABLE_NAME + " SET events_class_family_versions_id=? WHERE id=?", esv.getId(), ec.getId()); - if (updateCount > 0) ; //success + if (updateCount != 1) { + System.err.println("Error: failed to update event class's reference to ECFV: " + ec); + } break; } diff --git a/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/model/EventSchemaVersion.java b/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/model/EventSchemaVersion.java index cb535404fe..52bb64dbc4 100644 --- a/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/model/EventSchemaVersion.java +++ b/server/upgrade/data-migration-0.9.0-0.10.0/src/main/java/org/kaaproject/data_migration/model/EventSchemaVersion.java @@ -3,6 +3,8 @@ public class EventSchemaVersion { private Long id; private String schems; + private Long created_time; + private String created_username; public Long getId() { return id; @@ -27,4 +29,28 @@ public String toString() { ", schems=" + schems + '}'; } + + public Long getCreatedTime() { + return created_time; + } + + public Long getCreated_time() { + return created_time; + } + + public void setCreated_time(Long created_time) { + this.created_time = created_time; + } + + public String getCreatedUsername() { + return created_username; + } + + public String getCreated_username() { + return created_username; + } + + public void setCreated_username(String created_username) { + this.created_username = created_username; + } }