Skip to content

Commit

Permalink
KAA-1298 upgrade events
Browse files Browse the repository at this point in the history
  • Loading branch information
nocs00 committed Aug 9, 2016
1 parent df8f1e9 commit 77dc288
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 43 deletions.
Expand Up @@ -127,21 +127,31 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
CTLSchemaDto other = (CTLSchemaDto) obj;

if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;

if (dependencySet == null) {
if (other.dependencySet != null)
return false;
} else if (!dependencySet.equals(other.dependencySet))
return false;

if (metaInfo == null) {
if (other.metaInfo != null)
return false;
} else if (!metaInfo.equals(other.metaInfo))
return false;

if (version == null) {
if (other.version != null)
return false;
} else if (!version.equals(other.version))
return false;

return true;
}

Expand Down
Expand Up @@ -133,18 +133,20 @@ private void updateUuids() throws SQLException, IOException {
byte[] encodedConfigurationBody = jsonEncoded.toString().getBytes();

int updates = run.update(this.connection, "UPDATE configuration SET configuration_body=? WHERE id=?", encodedConfigurationBody,config.getId());
if (updates == 1) {
//TODO
if (updates != 1) {
System.err.println("Error: failed to update configuration: " + config);
}
}
}

private JsonNode encodeUuids(JsonNode json) throws IOException {
final String LATIN_1 = "ISO-8859-1";

if (json.has(UUID_FIELD)) {
JsonNode j = json.get(UUID_FIELD);
if (j.has(UUID_VALUE)) {
String value = j.get(UUID_VALUE).asText();
String encodedValue = Base64.getEncoder().encodeToString(value.getBytes("ISO-8859-1"));
String encodedValue = Base64.getEncoder().encodeToString(value.getBytes(LATIN_1));
((ObjectNode)j).put(UUID_VALUE, encodedValue);
}
}
Expand Down
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.dbutils.handlers.BeanListHandler;
Expand All @@ -10,6 +11,11 @@
import org.kaaproject.data_migration.utils.Utils;
import org.kaaproject.kaa.common.dto.ctl.CTLSchemaDto;
import org.kaaproject.kaa.server.common.admin.AdminClient;
import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithm;
import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithmImpl;
import org.kaaproject.kaa.server.common.core.configuration.RawData;
import org.kaaproject.kaa.server.common.core.configuration.RawDataFactory;
import org.kaaproject.kaa.server.common.core.schema.RawSchema;

import java.io.IOException;
import java.sql.Connection;
Expand All @@ -19,9 +25,13 @@
import java.util.List;
import java.util.Map;

import static java.util.Arrays.asList;
import static org.kaaproject.data_migration.utils.Constants.HOST;
import static org.kaaproject.data_migration.utils.Constants.PORT;

public class CTLEventsMigration {
private Connection connection;
private AdminClient client = new AdminClient("localhost", 8080);
private AdminClient client;

private static final String EVENT_SCHEMA_VERSION_TABLE_NAME = "event_schems_versions";
private static final String EVENT_CLASS_FAMILY_TABLE_NAME = "events_class_family";
Expand All @@ -32,9 +42,15 @@ public class CTLEventsMigration {

public CTLEventsMigration(Connection connection) {
this.connection = connection;
this.client = new AdminClient(HOST, PORT);
}

public void transform() throws SQLException, IOException {
public CTLEventsMigration(Connection connection, String host, int port) {
this.connection = connection;
this.client = new AdminClient(host, port);
}

public void transform() throws SQLException, IOException, Exception {
/** Steps to migrate
*
* 1. remember (id,schems) :event_schems_versions
Expand All @@ -55,74 +71,117 @@ public void transform() throws SQLException, IOException {
* 12. profit
*/

QueryRunner run = new QueryRunner();
QueryRunner runner = new QueryRunner();

//1
ResultSetHandler<List<EventSchemaVersion>> esvHandler = new BeanListHandler<EventSchemaVersion>(EventSchemaVersion.class);
List<EventSchemaVersion> oldESVs = run.query(connection, "SELECT id, schems FROM " + EVENT_SCHEMA_VERSION_TABLE_NAME, esvHandler);
List<EventSchemaVersion> oldESVs = runner.query(connection,
"SELECT id, schems, created_time, created_username FROM " + EVENT_SCHEMA_VERSION_TABLE_NAME,
esvHandler);
//2
run.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " DROP COLUMN schems");
runner.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " DROP COLUMN schems");
//3
run.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " RENAME " + EVENT_CLASS_FAMILY_VERSION_TABLE_NAME);
runner.update(connection, "ALTER TABLE " + EVENT_SCHEMA_VERSION_TABLE_NAME + " RENAME " + EVENT_CLASS_FAMILY_VERSION_TABLE_NAME);
//4
Utils.dropFK(connection, EVENT_CLASS_TABLE_NAME, EVENT_CLASS_FAMILY_TABLE_NAME);
run.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " CHANGE events_class_family_id events_class_family_versions_id bigint(20)");
runner.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " CHANGE events_class_family_id events_class_family_versions_id bigint(20)");
//5
ResultSetHandler<List<EventClass>> ecHandler = new BeanListHandler<EventClass>(EventClass.class);
List<EventClass> oldECs = run.query(connection, "SELECT id, schems, version FROM " + EVENT_CLASS_TABLE_NAME, ecHandler);
List<EventClass> oldECs = runner.query(connection, "SELECT id, schems, version FROM " + EVENT_CLASS_TABLE_NAME, ecHandler);
//6
run.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN schems");
run.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN version");
runner.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN schems");
runner.update(connection, "ALTER TABLE " + EVENT_CLASS_TABLE_NAME + " DROP COLUMN version");
//7
for (EventClass ec : oldECs) {
updateFamilyVersionId(ec, oldESVs, run);
updateFamilyVersionId(ec, oldESVs, runner);
}
//8
Map<EventClass, CTLSchemaDto> eventClassCTLMap = new HashMap<>();
Map<CTLSchemaDto, List<EventClass>> ecsToCTL = new HashMap<>();

Long currentCTLMetaId = runner.query(connection, "select max(id) as max_id from ctl_metainfo", rs -> rs.next() ? rs.getLong("max_id") : null);
Long currentCtlId = runner.query(connection, "select max(id) as max_id from ctl", rs -> rs.next() ? rs.getLong("max_id") : null);
for (EventClass ec : oldECs) {
String tenantId = run.query(connection, "select tenant_id from " + EVENT_CLASS_TABLE_NAME + " where id=?",
currentCTLMetaId++;
currentCtlId++;

Schema schemaBody = new Schema.Parser().parse(ec.getSchems());
String fqn = schemaBody.getFullName();

RawSchema rawSchema = new RawSchema(schemaBody.toString());
DefaultRecordGenerationAlgorithm<RawData> algotithm = new DefaultRecordGenerationAlgorithmImpl<>(rawSchema, new RawDataFactory());
String defaultRecord = algotithm.getRootData().getRawData();

String tenantId = runner.query(connection, "select tenant_id from " + EVENT_CLASS_TABLE_NAME + " where id=?",
rs -> rs.next() ? rs.getString("tenant_id") : null, ec.getId());

CTLSchemaDto ctlDto = client.saveCTLSchemaWithAppToken(ec.getSchems(), tenantId, null);
EventSchemaVersion esv = findParent(ec, oldESVs);
Long createdTime = esv.getCreatedTime();
String createUsername = esv.getCreatedUsername();
runner.insert(connection, "insert into ctl_metainfo values(?, ?, ?, ?)", rs -> null, currentCTLMetaId, fqn, null, tenantId);
runner.insert(connection, "insert into ctl values(?, ?, ?, ?, ?, ?, ?)", rs -> null, currentCtlId, ec.getSchems(), createdTime,
createUsername, defaultRecord, ec.getVersion(), currentCTLMetaId);

CTLSchemaDto ctlDto = new CTLSchemaDto();
ctlDto.setId(String.valueOf(currentCtlId));
ctlDto.setVersion(ec.getVersion());
ctlDto.setBody(ec.getSchems());
ctlDto.setCreatedTime(createdTime);
ctlDto.setCreatedUsername(createUsername);

//9
eventClassCTLMap.put(ec, ctlDto);
// aggregate configuration schemas with same fqn
if (ecsToCTL.containsKey(ctlDto)) {
List<EventClass> list = ecsToCTL.get(ctlDto);
list.add(ec);
ecsToCTL.put(ctlDto, list);
} else {
ecsToCTL.put(ctlDto, asList(ec));
}
}

//10
Long maxId = run.query(connection, "select max(id) as max_id from base_schems", rs -> rs.next() ? rs.getLong("max_id") : null);
Long maxId = runner.query(connection, "select max(id) as max_id from base_schems", rs -> rs.next() ? rs.getLong("max_id") : null);
//11
List<Object[]> params = new ArrayList<>();
for (Map.Entry<EventClass, CTLSchemaDto> entry : eventClassCTLMap.entrySet()) {
EventClass ec = entry.getKey();
CTLSchemaDto ctl = entry.getValue();

run.update(connection, "update " + EVENT_CLASS_TABLE_NAME + " set id = (id + ?) where id = ?", maxId, ec.getId());
ec.setId(ec.getId()+maxId);

String ctlBody = run.query(connection, "select body from " + CTL_TABLE_NAME + " where id = ?",
rs -> rs.next() ? rs.getString("tenant_id") : null, ctl.getId());

params.add(new Object[]{
ec.getId(),
ctl.getCreatedTime(),
ctl.getCreatedUsername(),
null,
parseName(ctlBody),
ctl.getVersion(),
null,
ctl.getId()
});

for (CTLSchemaDto ctl : ecsToCTL.keySet()) {
for (EventClass ec : ecsToCTL.get(ctl)) {
runner.update(connection, "update " + EVENT_CLASS_TABLE_NAME + " set id = (id + ?) where id = ?", maxId, ec.getId());
ec.setId(ec.getId() + maxId);
String ctlBody = ctl.getBody();

params.add(new Object[]{
ec.getId(),
ctl.getCreatedTime(),
ctl.getCreatedUsername(),
null,
parseName(ctlBody),
ctl.getVersion(),
null,
ctl.getId()
});
}
}
runner.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[params.size()][]));
}

private EventSchemaVersion findParent(EventClass ec, List<EventSchemaVersion> versions) throws IOException {
for (EventSchemaVersion esv : versions) {
if (ecBelongToThisFamilyVersion(ec, esv)) return esv;
}
run.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[params.size()][]));
return null;
}

private void updateFamilyVersionId(EventClass ec, List<EventSchemaVersion> versions, QueryRunner run) throws SQLException, IOException {
private void updateFamilyVersionId(EventClass ec, List<EventSchemaVersion> versions, QueryRunner runner) throws SQLException, IOException {
for (EventSchemaVersion esv : versions) {
int updateCount = 0;
if (ecBelongToThisFamilyVersion(ec, esv)) {
updateCount = run.update(this.connection,
updateCount = runner.update(this.connection,
"UPDATE " + EVENT_CLASS_TABLE_NAME + " SET events_class_family_versions_id=? WHERE id=?",
esv.getId(), ec.getId());
if (updateCount > 0) ; //success
if (updateCount != 1) {
System.err.println("Error: failed to update event class's reference to ECFV: " + ec);
}

break;
}
Expand Down
Expand Up @@ -3,6 +3,8 @@
public class EventSchemaVersion {
private Long id;
private String schems;
private Long created_time;
private String created_username;

public Long getId() {
return id;
Expand All @@ -27,4 +29,28 @@ public String toString() {
", schems=" + schems +
'}';
}

public Long getCreatedTime() {
return created_time;
}

public Long getCreated_time() {
return created_time;
}

public void setCreated_time(Long created_time) {
this.created_time = created_time;
}

public String getCreatedUsername() {
return created_username;
}

public String getCreated_username() {
return created_username;
}

public void setCreated_username(String created_username) {
this.created_username = created_username;
}
}

0 comments on commit 77dc288

Please sign in to comment.