Skip to content

Commit

Permalink
Created scratch for data migration
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill380 committed Aug 5, 2016
1 parent b9c025e commit cef2b97
Show file tree
Hide file tree
Showing 16 changed files with 1,578 additions and 0 deletions.
76 changes: 76 additions & 0 deletions server/upgrade/data-migration-0.9.0-0.10.0/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.kaaproject</groupId>
<artifactId>data-migration</artifactId>
<version>1.0-SNAPSHOT</version>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>


<properties>
<pgsql-driver.version>9.3-1101-jdbc41</pgsql-driver.version>
<mariadb-driver.version>1.4.3</mariadb-driver.version>
</properties>

<dependencies>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>${mariadb-driver.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${pgsql-driver.version}</version>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.kaaproject.kaa.common</groupId>
<artifactId>core</artifactId>
<version>0.10.0-SNAPSHOT</version>
</dependency>
</dependencies>




</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.kaaproject.data_migration;


import org.apache.avro.Schema;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.BeanListHandler;
import org.kaaproject.data_migration.model.ConfigurationSchema;
import org.kaaproject.data_migration.model.Ctl;
import org.kaaproject.data_migration.model.CtlMetaInfo;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;


import static java.util.Arrays.asList;
import static java.util.stream.Collectors.joining;

public class CTLConfigurationMigration {

private Connection connection;
private final int NUM_OF_BASE_SCHEMA_FIELDS = 8;

public CTLConfigurationMigration(Connection connection) {
this.connection = connection;
}

public void transform() throws SQLException {
QueryRunner runner = new QueryRunner();
try {
List<ConfigurationSchema> schemas = runner.query(connection, "select conf.id as id, created_time as createdTime, created_username as createdUsername, description, name, schems, version, application_id as appId " +
"from configuration_schems conf join schems s on conf.id = s.id", new BeanListHandler<ConfigurationSchema>(ConfigurationSchema.class));
String toDelete = schemas.stream().map(s -> s.getId().toString()).collect(joining(", "));

runner.update(connection, "delete from schems where id in (" + toDelete + ")");

Long shift = runner.query(connection, "select max(id) as max_id from base_schems", rs -> rs.next() ? rs.getLong("max_id") : null);
runner.update(connection, "update configuration_schems set id = id + " + shift + " order by id desc");
schemas.forEach(s -> s.setId(s.getId() + shift));
Map<Ctl, List<ConfigurationSchema>> confSchemasToCTL = new HashMap<>();
Long currentCTLMetaId = runner.query(connection, "select max(id) as max_id from ctl_metainfo", rs -> rs.next() ? rs.getLong("max_id") : null);
Long currentCtlId = runner.query(connection, "select max(id) as max_id from ctl", rs -> rs.next() ? rs.getLong("max_id") : null);
// CTL creation
for (ConfigurationSchema schema : schemas) {
currentCTLMetaId++;
currentCtlId++;
Schema schemaBody = new Schema.Parser().parse(schema.getSchems());
String fqn = schemaBody.getFullName();
String defaultRecord = ""; //TODO
Long tenantId = runner.query(connection, "select tenant_id from application where id = " + schema.getAppId(), rs -> rs.next() ? rs.getLong("tenant_id") : null);
runner.insert(connection, "insert into ctl_metainfo values(?, ?, ?, ?)", rs -> null, currentCTLMetaId, fqn, schema.getAppId(), tenantId);
runner.insert(connection, "insert into ctl values(?, ?, ?, ?, ?, ?, ?)", rs -> null, currentCtlId, schema.getSchems(), schema.getCreatedTime(),
schema.getCreatedUsername(), defaultRecord, schema.getVersion(), currentCTLMetaId);

// aggregate configuration schemas with same fqn
Ctl ctl = new Ctl(currentCtlId, new CtlMetaInfo(fqn, schema.getAppId(), tenantId));
if (confSchemasToCTL.containsKey(ctl)) {
List<ConfigurationSchema> list = confSchemasToCTL.get(ctl);
list.add(schema);
confSchemasToCTL.put(ctl, list);
} else {
confSchemasToCTL.put(ctl, asList(schema));
}
}

List<Object[]> params = new ArrayList<>();
for (Ctl ctl : confSchemasToCTL.keySet()) {
for (ConfigurationSchema schema : confSchemasToCTL.get(ctl)) {
params.add(new Object[]{
schema.getId(),
schema.getCreatedTime(),
schema.getCreatedUsername(),
schema.getDescription(),
schema.getName(),
schema.getVersion(),
ctl.getMetaInfo().getAppId(),
ctl.getId()
});
}
}

runner.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[schemas.size()][]));
} catch (SQLException e) {
DbUtils.rollback(connection);
} finally {
DbUtils.closeQuietly(connection);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.kaaproject.data_migration;



import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.kaaproject.data_migration.model.gen.tables.pojos.Configuration;
import org.kaaproject.data_migration.model.gen.tables.records.ConfigurationRecord;
import org.kaaproject.data_migration.utils.DataSources;

import java.util.Base64;
import java.util.List;

import static org.kaaproject.data_migration.model.gen.Tables.CONFIGURATION;



public final class ConfigurationUpgrade {
private static final String UUID_FIELD = "__uuid";
private static final String UUID_VALUE = "org.kaaproject.configuration.uuidT";

private ConfigurationUpgrade() {}

public static void run() throws Exception {
updateUuids();
//todo: transformation into CTL
}

private static void updateUuids() throws Exception {
DSLContext create = DSL.using(DataSources.MARIADB.getDs(), SQLDialect.MARIADB);
List<Configuration> configs = create.select().from(CONFIGURATION).fetchInto(Configuration.class);

for (Configuration config : configs) {
JsonNode json = new ObjectMapper().readTree(config.getConfigurationBody());
JsonNode jsonEncoded = encodeUuids(json);
byte[] encodedConfigurationBody = jsonEncoded.toString().getBytes();
config.setConfigurationBody(encodedConfigurationBody);

ConfigurationRecord record = create.newRecord(CONFIGURATION, config);
create.executeUpdate(record);
}
}

private static JsonNode encodeUuids(JsonNode json) throws Exception {
if (json.has(UUID_FIELD)) {
JsonNode j = json.get(UUID_FIELD);
if (j.has(UUID_VALUE)) {
String value = j.get(UUID_VALUE).asText();
String encodedValue = Base64.getEncoder().encodeToString(value.getBytes("ISO-8859-1"));
((ObjectNode)j).put(UUID_VALUE, encodedValue);
}
}

for (JsonNode node : json) {
if (node.isContainerNode()) encodeUuids(node);
}

return json;
}

private static JsonNode decodeUuids(JsonNode json) throws Exception {
if (json.has(UUID_FIELD)) {
JsonNode j = json.get(UUID_FIELD);
if (j.has(UUID_VALUE)) {
String value = j.get(UUID_VALUE).asText();
String encodedValue = new String (Base64.getDecoder().decode(value.getBytes("ISO-8859-1")), "ISO-8859-1");
((ObjectNode)j).put(UUID_VALUE, encodedValue);
}
}

for (JsonNode node : json) {
if (node.isContainerNode()) decodeUuids(node);
}

return json;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.kaaproject.data_migration;


import org.apache.commons.dbutils.QueryRunner;
import org.kaaproject.data_migration.utils.DataSources;

import java.sql.SQLException;

import static org.kaaproject.data_migration.utils.DataSources.MARIADB;

public class MigrateData {
public static void main(String[] args) throws SQLException {
CTLConfigurationMigration migration = new CTLConfigurationMigration(MARIADB.getDs().getConnection());
migration.transform();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.kaaproject.data_migration.model;


public class ConfigurationSchema {

private Long id;

private int version;

private String name;

private String description;

private String createdUsername;

private long createdTime;

private Long appId;

private String schems;


public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public int getVersion() {
return version;
}

public void setVersion(int version) {
this.version = version;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public String getCreatedUsername() {
return createdUsername;
}

public void setCreatedUsername(String createdUsername) {
this.createdUsername = createdUsername;
}

public long getCreatedTime() {
return createdTime;
}

public void setCreatedTime(long createdTime) {
this.createdTime = createdTime;
}

public Long getAppId() {
return appId;
}

public void setAppId(Long appId) {
this.appId = appId;
}

public String getSchems() {
return schems;
}

public void setSchems(String schems) {
this.schems = schems;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.kaaproject.data_migration.model;


public class Ctl {
private final Long id;
private final CtlMetaInfo metaInfo;

public Ctl(Long id, CtlMetaInfo metaInfo) {
this.id = id;
this.metaInfo = metaInfo;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Ctl ctl = (Ctl) o;

if (id != null ? !id.equals(ctl.id) : ctl.id != null) return false;
return metaInfo.equals(ctl.metaInfo);

}

@Override
public int hashCode() {
return metaInfo.hashCode();
}

public Long getId() {
return id;
}

public CtlMetaInfo getMetaInfo() {
return metaInfo;
}
}

0 comments on commit cef2b97

Please sign in to comment.