Skip to content

Commit

Permalink
New flow of upgrade: prepare MigrationEntity list before CTLSchema an…
Browse files Browse the repository at this point in the history
…d BaseSchema inserts
  • Loading branch information
nocs00 committed Aug 9, 2016
1 parent 6f0094f commit 0c3c8bb
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 57 deletions.
Expand Up @@ -8,8 +8,11 @@
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.kaaproject.data_migration.model.EventClass;
import org.kaaproject.data_migration.model.EventSchemaVersion;
import org.kaaproject.data_migration.model.MigrationEntity;
import org.kaaproject.data_migration.utils.datadefinition.DataDefinition;
import org.kaaproject.kaa.common.dto.BaseSchemaDto;
import org.kaaproject.kaa.common.dto.ctl.CTLSchemaDto;
import org.kaaproject.kaa.common.dto.ctl.CTLSchemaMetaInfoDto;
import org.kaaproject.kaa.server.common.admin.AdminClient;
import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithm;
import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithmImpl;
Expand All @@ -25,13 +28,15 @@
import java.util.List;
import java.util.Map;

import static java.util.Arrays.asList;
import static org.kaaproject.data_migration.utils.Constants.HOST;
import static org.kaaproject.data_migration.utils.Constants.PORT;
import static org.kaaproject.data_migration.utils.datadefinition.Constraint.constraint;
import static org.kaaproject.data_migration.utils.datadefinition.ReferenceOptions.CASCADE;

public class CTLEventsMigration {
private Connection connection;
private AdminClient client;
private QueryRunner runner;

private static final String EVENT_SCHEMA_VERSION_TABLE_NAME = "event_schems_versions";
private static final String EVENT_CLASS_FAMILY_TABLE_NAME = "events_class_family";
Expand All @@ -45,36 +50,37 @@ public CTLEventsMigration(Connection connection) {
this.connection = connection;
this.dd = new DataDefinition(connection);
this.client = new AdminClient(HOST, PORT);
runner = new QueryRunner();
}

public CTLEventsMigration(Connection connection, String host, int port) {
this.connection = connection;
this.dd = new DataDefinition(connection);
this.client = new AdminClient(host, port);
runner = new QueryRunner();
}

public void transform() throws SQLException, IOException, Exception {
/** Steps to migrate
*
* 1. remember (id,schems) :event_schems_versions
* 2. drop column (schems) :event_schems_versions
* 3. rename table event_schems_versions to events_class_family_versions
* 4. rename column (events_class_family_id) to (events_class_family_versions_id) :events_class
* 5. remember (id,schems,version) : events_class
* 6. drop column (schems,version) : events_class
* 7. (p.6 schems) search them as substring in (p.1 schems) and update (p.1 id) with (p.4 events_class_family_versions_id)
* 8. AdminClient: save ctl schems based on bodies (p.5 schems) , application_id = null, tenant_id as select :events_class by (p.5 id)
* 9. remember (p.5 EC, p.8 CTLDto)
* 10. find max(id) :base_schems
* 11. save base_schems:
* id = (p.9 id + p.10) and update (id):events_class with this
* created_time, created_username, version, ctl_id from (p.9 CTLDto)
* name = parse "name" in (body) :ctl
* description = null, application_id = null
* 12. profit
*/

QueryRunner runner = new QueryRunner();

/** Steps to migrate
*
* 1. remember (id,schems) :event_schems_versions
* 2. drop column (schems) :event_schems_versions
* 3. rename table event_schems_versions to events_class_family_versions
* 4. rename column (events_class_family_id) to (events_class_family_versions_id) :events_class
* 5. remember (id,schems,version) : events_class
* 6. drop column (schems,version) : events_class
* 7. (p.6 schems) search them as substring in (p.1 schems) and update (p.1 id) with (p.4 events_class_family_versions_id)
* 8. AdminClient: save ctl schems based on bodies (p.5 schems) , application_id = null, tenant_id as select :events_class by (p.5 id)
* 9. remember (p.5 EC, p.8 CTLDto)
* 10. find max(id) :base_schems
* 11. save base_schems:
* id = (p.9 id + p.10) and update (id):events_class with this
* created_time, created_username, version, ctl_id from (p.9 CTLDto)
* name = parse "name" in (body) :ctl
* description = null, application_id = null
* 12. profit
*/
private List<MigrationEntity> beforeTransform() throws Exception {
List<MigrationEntity> entityList = new ArrayList<>();
//1
ResultSetHandler<List<EventSchemaVersion>> esvHandler = new BeanListHandler<EventSchemaVersion>(EventSchemaVersion.class);
List<EventSchemaVersion> oldESVs = runner.query(connection,
Expand All @@ -89,16 +95,25 @@ public void transform() throws SQLException, IOException, Exception {
runner.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " CHANGE events_class_family_id events_class_family_versions_id bigint(20)");
//5
ResultSetHandler<List<EventClass>> ecHandler = new BeanListHandler<EventClass>(EventClass.class);
List<EventClass> oldECs = runner.query(connection, "SELECT id, schems, version FROM " + EVENT_CLASS_TABLE_NAME, ecHandler);
List<EventClass> oldECs = runner.query(connection, "SELECT id, schems, version FROM " + EVENT_CLASS_TABLE_NAME +
" WHERE type='OBJECT' and schems not like '%\"type\":\"enum\"%'", ecHandler);
//6
runner.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN schems");
runner.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN version");
//7
for (EventClass ec : oldECs) {
updateFamilyVersionId(ec, oldESVs, runner);
}
dd.alterTable(EVENT_CLASS_TABLE_NAME)
.add(constraint("FK_events_class_family_versions_id")
.foreignKey("events_class_family_versions_id")
.references("events_class_family_versions", "id")
.onDelete(CASCADE)
.onUpdate(CASCADE)
)
.execute();
//8
Map<CTLSchemaDto, List<EventClass>> ecsToCTL = new HashMap<>();
Map<EventClass, CTLSchemaDto> ecsToCTL = new HashMap<>();

Long currentCTLMetaId = runner.query(connection, "select max(id) as max_id from ctl_metainfo", rs -> rs.next() ? rs.getLong("max_id") : null);
Long currentCtlId = runner.query(connection, "select max(id) as max_id from ctl", rs -> rs.next() ? rs.getLong("max_id") : null);
Expand All @@ -119,52 +134,52 @@ public void transform() throws SQLException, IOException, Exception {
EventSchemaVersion esv = findParent(ec, oldESVs);
Long createdTime = esv.getCreatedTime();
String createUsername = esv.getCreatedUsername();
runner.insert(connection, "insert into ctl_metainfo values(?, ?, ?, ?)", rs -> null, currentCTLMetaId, fqn, null, tenantId);
runner.insert(connection, "insert into ctl values(?, ?, ?, ?, ?, ?, ?)", rs -> null, currentCtlId, ec.getSchems(), createdTime,
createUsername, defaultRecord, ec.getVersion(), currentCTLMetaId);

CTLSchemaMetaInfoDto ctlMetaDto = new CTLSchemaMetaInfoDto();
ctlMetaDto.setId(String.valueOf(currentCTLMetaId));
ctlMetaDto.setTenantId(tenantId);
ctlMetaDto.setFqn(fqn);
ctlMetaDto.setApplicationId(null);

CTLSchemaDto ctlDto = new CTLSchemaDto();
ctlDto.setId(String.valueOf(currentCtlId));
ctlDto.setVersion(ec.getVersion());
ctlDto.setBody(ec.getSchems());
ctlDto.setCreatedTime(createdTime);
ctlDto.setCreatedUsername(createUsername);
ctlDto.setDefaultRecord(defaultRecord);
ctlDto.setMetaInfo(ctlMetaDto);

//9
// aggregate configuration schemas with same fqn
if (ecsToCTL.containsKey(ctlDto)) {
List<EventClass> list = ecsToCTL.get(ctlDto);
list.add(ec);
ecsToCTL.put(ctlDto, list);
} else {
ecsToCTL.put(ctlDto, asList(ec));
}
ecsToCTL.put(ec, ctlDto);
}

//10
Long maxId = runner.query(connection, "select max(id) as max_id from base_schems", rs -> rs.next() ? rs.getLong("max_id") : null);
//11
List<Object[]> params = new ArrayList<>();

for (CTLSchemaDto ctl : ecsToCTL.keySet()) {
for (EventClass ec : ecsToCTL.get(ctl)) {
runner.update(connection, "update " + EVENT_CLASS_TABLE_NAME + " set id = (id + ?) where id = ?", maxId, ec.getId());
ec.setId(ec.getId() + maxId);
String ctlBody = ctl.getBody();

params.add(new Object[]{
ec.getId(),
ctl.getCreatedTime(),
ctl.getCreatedUsername(),
null,
parseName(ctlBody),
ctl.getVersion(),
null,
ctl.getId()
});
}
for (Map.Entry<EventClass, CTLSchemaDto> entry : ecsToCTL.entrySet()) {
EventClass ec = entry.getKey();
CTLSchemaDto ctlDto = entry.getValue();

runner.update(connection, "update " + EVENT_CLASS_TABLE_NAME + " set id = (id + ?) where id = ?", maxId, ec.getId());
ec.setId(ec.getId() + maxId);

BaseSchemaDto baseDto = new BaseSchemaDto();
baseDto.setId(String.valueOf(ec.getId()));
baseDto.setCreatedTime(ctlDto.getCreatedTime());
baseDto.setCreatedUsername(ctlDto.getCreatedUsername());
baseDto.setDescription(null);
baseDto.setName(parseName(ctlDto.getBody()));
baseDto.setVersion(ctlDto.getVersion());
baseDto.setApplicationId(null);
baseDto.setCtlSchemaId(ctlDto.getId());

entityList.add(new MigrationEntity(ctlDto, baseDto));
}
runner.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[params.size()][]));

return entityList;
}

private EventSchemaVersion findParent(EventClass ec, List<EventSchemaVersion> versions) throws IOException {
Expand Down
Expand Up @@ -34,8 +34,8 @@ public void transform() throws IOException {
byte[] encodedConfigurationBody = jsonEncoded.toString().getBytes();

int updates = run.update(connection, "UPDATE configuration SET configuration_body=? WHERE id=?", encodedConfigurationBody, config.getId());
if (updates == 1) {
//TODO
if (updates != 1) {
System.err.println("Error: failed to update configuration: " + config);
}
}

Expand Down
@@ -0,0 +1,36 @@
package org.kaaproject.data_migration.model;

import org.kaaproject.kaa.common.dto.BaseSchemaDto;
import org.kaaproject.kaa.common.dto.ctl.CTLSchemaDto;

/**
* Created by user482400 on 09.08.16.
*/
public class MigrationEntity {
private CTLSchemaDto ctl;
private BaseSchemaDto baseSchema;

public MigrationEntity() {
}

public MigrationEntity(CTLSchemaDto ctl, BaseSchemaDto baseSchema) {
this.ctl = ctl;
this.baseSchema = baseSchema;
}

public CTLSchemaDto getCtl() {
return ctl;
}

public void setCtl(CTLSchemaDto ctl) {
this.ctl = ctl;
}

public BaseSchemaDto getBaseSchema() {
return baseSchema;
}

public void setBaseSchema(BaseSchemaDto baseSchema) {
this.baseSchema = baseSchema;
}
}

0 comments on commit 0c3c8bb

Please sign in to comment.