Skip to content

Commit

Permalink
KAA-1298 upgrade events
Browse files Browse the repository at this point in the history
  • Loading branch information
nocs00 committed Aug 8, 2016
1 parent 992535f commit df8f1e9
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 54 deletions.
@@ -1,6 +1,5 @@
package org.kaaproject.data_migration;


import org.apache.avro.Schema;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
Expand All @@ -24,13 +23,11 @@
import org.kaaproject.kaa.server.common.core.configuration.RawDataFactory;
import org.kaaproject.kaa.server.common.core.schema.RawSchema;


import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;


import static java.util.Arrays.asList;
import static java.util.stream.Collectors.joining;
import static org.kaaproject.data_migration.utils.Constants.HOST;
Expand All @@ -57,11 +54,9 @@ public CTLConfigurationMigration(Connection connection, String host, int port) {
}

public void transform() throws SQLException, IOException {
updateUuids();

QueryRunner runner = new QueryRunner();
try {
// Utils.runFile(runner, connection, "doc/constraint_update_before.sql");
updateUuids();

List<ConfigurationSchema> schemas = runner.query(connection,
"select conf.id as id, created_time as createdTime, created_username as createdUsername, description, name, schems, version, application_id as appId " +
Expand Down
Expand Up @@ -5,37 +5,31 @@
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.kaaproject.data_migration.model.ConfigurationSchema;
import org.kaaproject.data_migration.model.Ctl;
import org.kaaproject.data_migration.model.EventClass;
import org.kaaproject.data_migration.model.EventSchemaVersion;
import org.kaaproject.data_migration.utils.Utils;
import org.kaaproject.kaa.common.dto.ctl.CTLSchemaDto;
import org.kaaproject.kaa.server.common.admin.AdminClient;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Created by user482400 on 05.08.16.
*/
public class CTLEventsMigration {
private Connection connection;
private AdminClient client = new AdminClient("localhost", 8080);

private static final String EVENT_SCHEMA_VERSION_TABLE_NAME = "event_schems_versions";
private static final String EVENT_CLASS_FAMILY_VERSION_TABLE_NAME = "event_class_family_versions";
private static final String EVENT_CLASS_FAMILY_TABLE_NAME = "events_class_family";
private static final String EVENT_CLASS_FAMILY_VERSION_TABLE_NAME = "events_class_family_versions";
private static final String EVENT_CLASS_TABLE_NAME = "events_class";
private static final String BASE_SCHEMA_TABLE_NAME = "base_schems";
private static final String CTL_TABLE_NAME = "ctl";


public CTLEventsMigration(Connection connection) {
this.connection = connection;
}
Expand All @@ -50,7 +44,7 @@ public void transform() throws SQLException, IOException {
* 5. remember (id,schems,version) : events_class
* 6. drop column (schems,version) : events_class
* 7. (p.6 schems) search them as substring in (p.1 schems) and update (p.1 id) with (p.4 events_class_family_versions_id)
* 8. AdminClient: save ctl schems based on bodies (p.5 schems) , application_id = null, tenant_id as select :event_class by (p.5 id)
* 8. AdminClient: save ctl schems based on bodies (p.5 schems) , application_id = null, tenant_id as select :events_class by (p.5 id)
* 9. remember (p.5 EC, p.8 CTLDto)
* 10. find max(id) :base_schems
* 11. save base_schems:
Expand All @@ -65,27 +59,30 @@ public void transform() throws SQLException, IOException {

//1
ResultSetHandler<List<EventSchemaVersion>> esvHandler = new BeanListHandler<EventSchemaVersion>(EventSchemaVersion.class);
List<EventSchemaVersion> oldESVs = run.query(connection, "SELECT id, schems FROM ?", esvHandler, EVENT_SCHEMA_VERSION_TABLE_NAME);
List<EventSchemaVersion> oldESVs = run.query(connection, "SELECT id, schems FROM " + EVENT_SCHEMA_VERSION_TABLE_NAME, esvHandler);
//2
run.update(connection, "ALTER TABLE ? DROP COLUMN schems", EVENT_SCHEMA_VERSION_TABLE_NAME);
run.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " DROP COLUMN schems");
//3
run.update(connection, "ALTER TABLE ? RENAME ?", EVENT_SCHEMA_VERSION_TABLE_NAME, EVENT_CLASS_FAMILY_VERSION_TABLE_NAME);
run.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " RENAME " + EVENT_CLASS_FAMILY_VERSION_TABLE_NAME);
//4
run.update(connection, "ALTER TABLE ? CHANGE events_class_family_id events_class_family_versions_id bigint(20)", EVENT_CLASS_TABLE_NAME);
Utils.dropFK(connection, EVENT_CLASS_TABLE_NAME, EVENT_CLASS_FAMILY_TABLE_NAME);
run.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " CHANGE events_class_family_id events_class_family_versions_id bigint(20)");
//5
ResultSetHandler<List<EventClass>> ecHandler = new BeanListHandler<EventClass>(EventClass.class);
List<EventClass> oldECs = run.query(connection, "SELECT id, schems, version FROM ?", ecHandler, EVENT_CLASS_TABLE_NAME);
List<EventClass> oldECs = run.query(connection, "SELECT id, schems, version FROM " + EVENT_CLASS_TABLE_NAME, ecHandler);
//6
run.update(connection, "ALTER TABLE ? DROP COLUMN schems", EVENT_CLASS_TABLE_NAME);
run.update(connection, "ALTER TABLE ? DROP COLUMN version", EVENT_CLASS_TABLE_NAME);
run.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN schems");
run.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN version");
//7
for (EventClass ec : oldECs) {
updateFamilyVersionId(ec, oldESVs, run);
}
//8
Map<EventClass, CTLSchemaDto> eventClassCTLMap = new HashMap<>();
for (EventClass ec : oldECs) {
String tenantId = run.query(connection, "select tenant_id from ?", rs -> rs.next() ? rs.getString("tenant_id") : null, BASE_SCHEMA_TABLE_NAME);
String tenantId = run.query(connection, "select tenant_id from " + EVENT_CLASS_TABLE_NAME + " where id=?",
rs -> rs.next() ? rs.getString("tenant_id") : null, ec.getId());

CTLSchemaDto ctlDto = client.saveCTLSchemaWithAppToken(ec.getSchems(), tenantId, null);
//9
eventClassCTLMap.put(ec, ctlDto);
Expand All @@ -98,10 +95,11 @@ public void transform() throws SQLException, IOException {
EventClass ec = entry.getKey();
CTLSchemaDto ctl = entry.getValue();

run.update(connection, "update ? set id = (id + ?) where id = ?", EVENT_CLASS_TABLE_NAME, maxId, ec.getId());
run.update(connection, "update " + EVENT_CLASS_TABLE_NAME + " set id = (id + ?) where id = ?", maxId, ec.getId());
ec.setId(ec.getId()+maxId);

String ctlBody = run.query(connection, "select body from ? where id = ?", rs -> rs.next() ? rs.getString("tenant_id") : null, CTL_TABLE_NAME, ctl.getId());
String ctlBody = run.query(connection, "select body from " + CTL_TABLE_NAME + " where id = ?",
rs -> rs.next() ? rs.getString("tenant_id") : null, ctl.getId());

params.add(new Object[]{
ec.getId(),
Expand All @@ -117,22 +115,26 @@ public void transform() throws SQLException, IOException {
run.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[params.size()][]));
}

private void updateFamilyVersionId(EventClass ec, List<EventSchemaVersion> versions, QueryRunner run) throws SQLException {
private void updateFamilyVersionId(EventClass ec, List<EventSchemaVersion> versions, QueryRunner run) throws SQLException, IOException {
for (EventSchemaVersion esv : versions) {
int updateCount = 0;
if (ecBelongToThisFamilyVersion(ec, esv)) {
updateCount = run.update(this.connection,
"UPDATE ? SET events_class_family_versions_id=? WHERE id=?",
EVENT_CLASS_TABLE_NAME, esv.getId(), ec.getId());
"UPDATE " + EVENT_CLASS_TABLE_NAME + " SET events_class_family_versions_id=? WHERE id=?",
esv.getId(), ec.getId());
if (updateCount > 0) ; //success

break;
}
}
}

private boolean ecBelongToThisFamilyVersion(EventClass ec, EventSchemaVersion esv) {
return new String(esv.getSchems()).contains(ec.getSchems());
private boolean ecBelongToThisFamilyVersion(EventClass ec, EventSchemaVersion esv) throws IOException {
JsonNode jsonNode = new ObjectMapper().readTree(ec.getSchems());
String namespace = jsonNode.get("namespace").asText();
String name = jsonNode.get("name").asText();

return esv.getSchems().contains(name) && esv.getSchems().contains(namespace);
}

private String parseName(String body) throws IOException {
Expand Down
@@ -1,16 +1,16 @@
package org.kaaproject.data_migration;


import org.apache.commons.dbutils.QueryRunner;
import org.kaaproject.data_migration.utils.DataSources;

import java.io.IOException;
import java.sql.SQLException;

import static org.kaaproject.data_migration.utils.DataSources.MARIADB;

public class MigrateData {
public static void main(String[] args) throws SQLException {
public static void main(String[] args) throws SQLException, IOException {
CTLConfigurationMigration migration = new CTLConfigurationMigration(MARIADB.getDs().getConnection());
migration.transform();

CTLEventsMigration eventsMigration = new CTLEventsMigration(MARIADB.getDs().getConnection());
eventsMigration.transform();
}
}
Expand Up @@ -2,9 +2,6 @@

import java.io.Serializable;

/**
* Created by user482400 on 04.08.16.
*/
public class Configuration implements Serializable {

private static final long serialVersionUID = -1176562073;
Expand Down
@@ -1,6 +1,5 @@
package org.kaaproject.data_migration.model;


public class ConfigurationSchema {

private Long id;
Expand Down
@@ -1,6 +1,5 @@
package org.kaaproject.data_migration.model;


public class Ctl {
private final Long id;
private final CtlMetaInfo metaInfo;
Expand Down
@@ -1,6 +1,5 @@
package org.kaaproject.data_migration.model;


public class CtlMetaInfo {

private final String fqn;
Expand Down
@@ -1,8 +1,5 @@
package org.kaaproject.data_migration.model;

/**
* Created by user482400 on 04.08.16.
*/
public class EventClass {
private Long id;
private String schems;
Expand Down
@@ -1,13 +1,8 @@
package org.kaaproject.data_migration.model;

import java.util.Arrays;

/**
* Created by user482400 on 04.08.16.
*/
public class EventSchemaVersion {
private Long id;
private byte[] schems;
private String schems;

public Long getId() {
return id;
Expand All @@ -17,19 +12,19 @@ public void setId(Long id) {
this.id = id;
}

public byte[] getSchems() {
public String getSchems() {
return schems;
}

public void setSchems(byte[] schems) {
public void setSchems(String schems) {
this.schems = schems;
}

@Override
public String toString() {
return "EventSchemaVersion{" +
"id=" + id +
", schems=" + Arrays.toString(schems) +
", schems=" + schems +
'}';
}
}
Expand Up @@ -12,6 +12,9 @@ public enum DataSources {
MARIADB(getMariaDB()), POSTGRES(getPostgreSQL());

private final DataSource ds;
private static final String USER_NAME = "root";
private static final String PASSWORD = "root";
private static final String DB_NAME = "kaa";


DataSources(DataSource ds) {
Expand Down
Expand Up @@ -10,9 +10,19 @@

public class Utils {

private static final String QUERY_FIND_FK_NAME = "SELECT CONSTRAINT_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE REFERENCED_TABLE_SCHEMA = 'kaa' AND TABLE_NAME = '%s' and referenced_table_name='%s'";

public static void runFile(QueryRunner runner, Connection connection, String fileName) throws IOException, SQLException {
String query = IOUtils.toString(Utils.class.getClassLoader().getResourceAsStream(fileName));
runner.update(connection, query);
}

public static void dropFK(Connection connection, String tableName, String referencedTableName) throws SQLException {
QueryRunner runner = new QueryRunner();

String query = String.format(QUERY_FIND_FK_NAME, tableName, referencedTableName);
String fkName = runner.query(connection, query, rs -> rs.next() ? rs.getString(1) : null);
runner.update(connection, "ALTER TABLE " + tableName + " DROP FOREIGN KEY " + fkName);
}

}

0 comments on commit df8f1e9

Please sign in to comment.