Skip to content

Commit

Permalink
Fixed Configuration CTL migration, refactoring and added default reco…
Browse files Browse the repository at this point in the history
…rd generation
  • Loading branch information
Kirill380 committed Aug 8, 2016
1 parent f99b97b commit 992535f
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 36 deletions.
4 changes: 4 additions & 0 deletions server/upgrade/data-migration-0.9.0-0.10.0/pom.xml
Expand Up @@ -49,6 +49,10 @@
<groupId>org.kaaproject.kaa.server.common</groupId> <groupId>org.kaaproject.kaa.server.common</groupId>
<artifactId>admin-rest-client</artifactId> <artifactId>admin-rest-client</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.kaaproject.kaa.common</groupId>
<artifactId>core</artifactId>
</dependency>
</dependencies> </dependencies>


<build> <build>
Expand Down
Expand Up @@ -13,10 +13,16 @@
import org.kaaproject.data_migration.model.ConfigurationSchema; import org.kaaproject.data_migration.model.ConfigurationSchema;
import org.kaaproject.data_migration.model.Ctl; import org.kaaproject.data_migration.model.Ctl;
import org.kaaproject.data_migration.model.CtlMetaInfo; import org.kaaproject.data_migration.model.CtlMetaInfo;
import org.kaaproject.data_migration.utils.Utils;
import org.kaaproject.kaa.common.dto.Util;
import org.kaaproject.kaa.common.dto.ctl.CTLSchemaDto; import org.kaaproject.kaa.common.dto.ctl.CTLSchemaDto;
import org.kaaproject.kaa.server.common.admin.AdminClient; import org.kaaproject.kaa.server.common.admin.AdminClient;
import org.slf4j.Logger; import org.kaaproject.kaa.server.common.core.algorithms.generation.ConfigurationGenerationException;
import org.slf4j.LoggerFactory; import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithm;
import org.kaaproject.kaa.server.common.core.algorithms.generation.DefaultRecordGenerationAlgorithmImpl;
import org.kaaproject.kaa.server.common.core.configuration.RawData;
import org.kaaproject.kaa.server.common.core.configuration.RawDataFactory;
import org.kaaproject.kaa.server.common.core.schema.RawSchema;




import java.io.IOException; import java.io.IOException;
Expand All @@ -27,6 +33,8 @@


import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.joining;
import static org.kaaproject.data_migration.utils.Constants.HOST;
import static org.kaaproject.data_migration.utils.Constants.PORT;


public class CTLConfigurationMigration { public class CTLConfigurationMigration {
// private static final Logger LOG = LoggerFactory.getLogger(CTLConfigurationMigration.class.getSimpleName()); // private static final Logger LOG = LoggerFactory.getLogger(CTLConfigurationMigration.class.getSimpleName());
Expand All @@ -36,18 +44,27 @@ public class CTLConfigurationMigration {


private Connection connection; private Connection connection;
private final int NUM_OF_BASE_SCHEMA_FIELDS = 8; private final int NUM_OF_BASE_SCHEMA_FIELDS = 8;
private AdminClient client = new AdminClient("localhost", 8080); private AdminClient client;


public CTLConfigurationMigration(Connection connection) { public CTLConfigurationMigration(Connection connection) {
this.connection = connection; this.connection = connection;
this.client = new AdminClient(HOST, PORT);
}

public CTLConfigurationMigration(Connection connection, String host, int port) {
this.connection = connection;
this.client = new AdminClient(host, port);
} }


public void transform() throws SQLException, IOException { public void transform() throws SQLException, IOException {
updateUuids(); updateUuids();


QueryRunner runner = new QueryRunner(); QueryRunner runner = new QueryRunner();
try { try {
List<ConfigurationSchema> schemas = runner.query(connection, "select conf.id as id, created_time as createdTime, created_username as createdUsername, description, name, schems, version, application_id as appId " + // Utils.runFile(runner, connection, "doc/constraint_update_before.sql");

List<ConfigurationSchema> schemas = runner.query(connection,
"select conf.id as id, created_time as createdTime, created_username as createdUsername, description, name, schems, version, application_id as appId " +
"from configuration_schems conf join schems s on conf.id = s.id", new BeanListHandler<ConfigurationSchema>(ConfigurationSchema.class)); "from configuration_schems conf join schems s on conf.id = s.id", new BeanListHandler<ConfigurationSchema>(ConfigurationSchema.class));
String toDelete = schemas.stream().map(s -> s.getId().toString()).collect(joining(", ")); String toDelete = schemas.stream().map(s -> s.getId().toString()).collect(joining(", "));


Expand All @@ -58,12 +75,24 @@ public void transform() throws SQLException, IOException {
schemas.forEach(s -> s.setId(s.getId() + shift)); schemas.forEach(s -> s.setId(s.getId() + shift));
Map<Ctl, List<ConfigurationSchema>> confSchemasToCTL = new HashMap<>(); Map<Ctl, List<ConfigurationSchema>> confSchemasToCTL = new HashMap<>();


Long currentCTLMetaId = runner.query(connection, "select max(id) as max_id from ctl_metainfo", rs -> rs.next() ? rs.getLong("max_id") : null);
Long currentCtlId = runner.query(connection, "select max(id) as max_id from ctl", rs -> rs.next() ? rs.getLong("max_id") : null);
// CTL creation // CTL creation
for (ConfigurationSchema schema : schemas) { for (ConfigurationSchema schema : schemas) {
currentCTLMetaId++;
currentCtlId++;
Schema schemaBody = new Schema.Parser().parse(schema.getSchems());
String fqn = schemaBody.getFullName();
RawSchema rawSchema = new RawSchema(schemaBody.toString());
DefaultRecordGenerationAlgorithm<RawData> algotithm = new DefaultRecordGenerationAlgorithmImpl<>(rawSchema, new RawDataFactory());
String defaultRecord = algotithm.getRootData().getRawData();
Long tenantId = runner.query(connection, "select tenant_id from application where id = " + schema.getAppId(), rs -> rs.next() ? rs.getLong("tenant_id") : null); Long tenantId = runner.query(connection, "select tenant_id from application where id = " + schema.getAppId(), rs -> rs.next() ? rs.getLong("tenant_id") : null);
CTLSchemaDto ctlSchemaDto = client.saveCTLSchemaWithAppToken(schema.getSchems(), tenantId.toString(), schema.getAppId().toString()); runner.insert(connection, "insert into ctl_metainfo values(?, ?, ?, ?)", rs -> null, currentCTLMetaId, fqn, schema.getAppId(), tenantId);
runner.insert(connection, "insert into ctl values(?, ?, ?, ?, ?, ?, ?)", rs -> null, currentCtlId, schema.getSchems(), schema.getCreatedTime(),
schema.getCreatedUsername(), defaultRecord, schema.getVersion(), currentCTLMetaId);

// aggregate configuration schemas with same fqn // aggregate configuration schemas with same fqn
Ctl ctl = new Ctl(Long.parseLong(ctlSchemaDto.getId()), new CtlMetaInfo(ctlSchemaDto.getMetaInfo().getFqn(), schema.getAppId(), tenantId)); Ctl ctl = new Ctl(currentCtlId, new CtlMetaInfo(fqn, schema.getAppId(), tenantId));
if (confSchemasToCTL.containsKey(ctl)) { if (confSchemasToCTL.containsKey(ctl)) {
List<ConfigurationSchema> list = confSchemasToCTL.get(ctl); List<ConfigurationSchema> list = confSchemasToCTL.get(ctl);
list.add(schema); list.add(schema);
Expand All @@ -90,8 +119,8 @@ public void transform() throws SQLException, IOException {
} }


runner.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[schemas.size()][])); runner.batch(connection, "insert into base_schems values(?, ?, ?, ?, ?, ?, ?, ?)", params.toArray(new Object[schemas.size()][]));
} catch (SQLException e) { } catch (SQLException | ConfigurationGenerationException | IOException e) {
DbUtils.rollback(connection); System.err.println("Error: " + e.getMessage());
} finally { } finally {
DbUtils.closeQuietly(connection); DbUtils.closeQuietly(connection);
} }
Expand All @@ -102,24 +131,16 @@ private void updateUuids() throws SQLException, IOException {
QueryRunner run = new QueryRunner(); QueryRunner run = new QueryRunner();


ResultSetHandler<List<Configuration>> rsHandler = new BeanListHandler<Configuration>(Configuration.class); ResultSetHandler<List<Configuration>> rsHandler = new BeanListHandler<Configuration>(Configuration.class);
try { List<Configuration> configs = run.query(this.connection, "SELECT * FROM configuration", rsHandler);
List<Configuration> configs = run.query(this.connection, "SELECT * FROM configuration", rsHandler); for (Configuration config : configs) {
for (Configuration config : configs) { JsonNode json = new ObjectMapper().readTree(config.getConfiguration_body());
JsonNode json = new ObjectMapper().readTree(config.getConfiguration_body()); JsonNode jsonEncoded = encodeUuids(json);
JsonNode jsonEncoded = encodeUuids(json); byte[] encodedConfigurationBody = jsonEncoded.toString().getBytes();
byte[] encodedConfigurationBody = jsonEncoded.toString().getBytes();

int updates = run.update(this.connection, "UPDATE configuration SET configuration_body=? WHERE id=?", encodedConfigurationBody,config.getId());
int updates = run.update(this.connection, "UPDATE configuration SET configuration_body=? WHERE id=?", encodedConfigurationBody,config.getId()); if (updates == 1) {
if (updates != 1) { //TODO
// LOG.error("Failed to update configuration: {}", config);
} else {
// LOG.info("Updated configuration: {}", config);
}
} }
} catch (SQLException e) {
// LOG.error("Failed to load configurations. {}", e);
} finally {
DbUtils.close(this.connection);
} }
} }


Expand Down
@@ -0,0 +1,10 @@
package org.kaaproject.data_migration.utils;


final public class Constants {
public static final String USER_NAME = "sqladmin";
public static final String PASSWORD = "admin";
public static final String DB_NAME = "kaa";
public static final String HOST = "10.2.1.130";
public static final Integer PORT = 8080;
}
Expand Up @@ -5,14 +5,14 @@


import javax.sql.DataSource; import javax.sql.DataSource;


import static org.kaaproject.data_migration.utils.Constants.*;

public enum DataSources { public enum DataSources {


MARIADB(getMariaDB()), POSTGRES(getPostgreSQL()); MARIADB(getMariaDB()), POSTGRES(getPostgreSQL());


private final DataSource ds; private final DataSource ds;
private static final String USER_NAME = "root";
private static final String PASSWORD = "kaa";
private static final String DB_NAME = "kaa";


DataSources(DataSource ds) { DataSources(DataSource ds) {
this.ds = ds; this.ds = ds;
Expand All @@ -25,7 +25,7 @@ public DataSource getDs() {
private static DataSource getPostgreSQL() { private static DataSource getPostgreSQL() {
BasicDataSource bds = new BasicDataSource(); BasicDataSource bds = new BasicDataSource();
bds.setDriverClassName("org.postgresql.Driver"); bds.setDriverClassName("org.postgresql.Driver");
bds.setUrl("jdbc:postgresql://localhost:5432/" + DB_NAME); bds.setUrl("jdbc:postgresql://" + HOST +":5432/" + DB_NAME);
bds.setUsername(USER_NAME); bds.setUsername(USER_NAME);
bds.setPassword(PASSWORD); bds.setPassword(PASSWORD);
return bds; return bds;
Expand All @@ -35,7 +35,7 @@ private static DataSource getPostgreSQL() {
private static DataSource getMariaDB() { private static DataSource getMariaDB() {
BasicDataSource bds = new BasicDataSource(); BasicDataSource bds = new BasicDataSource();
bds.setDriverClassName("org.mariadb.jdbc.Driver"); bds.setDriverClassName("org.mariadb.jdbc.Driver");
bds.setUrl("jdbc:mysql://localhost:3306/" + DB_NAME); bds.setUrl("jdbc:mysql://" + HOST +":3306/" + DB_NAME);
bds.setUsername(USER_NAME); bds.setUsername(USER_NAME);
bds.setPassword(PASSWORD); bds.setPassword(PASSWORD);
// bds.setDefaultAutoCommit(false); // bds.setDefaultAutoCommit(false);
Expand Down
@@ -0,0 +1,18 @@
package org.kaaproject.data_migration.utils;


import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;

public class Utils {

public static void runFile(QueryRunner runner, Connection connection, String fileName) throws IOException, SQLException {
String query = IOUtils.toString(Utils.class.getClassLoader().getResourceAsStream(fileName));
runner.update(connection, query);
}

}

This file was deleted.

@@ -0,0 +1,14 @@
SET @const_name = (SELECT CONSTRAINT_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE REFERENCED_TABLE_SCHEMA = 'kaa' AND TABLE_NAME = 'configuration_schems' and referenced_table_name='schems');
SET @sql = CONCAT('ALTER TABLE configuration_schems DROP FOREIGN KEY ', @const_name);
PREPARE s from @sql;
EXECUTE s;
DEALLOCATE PREPARE s;

SET @const_name = (SELECT CONSTRAINT_NAME FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE REFERENCED_TABLE_SCHEMA = 'kaa' AND TABLE_NAME = 'configuration' and referenced_table_name='configuration_schems');
SET @sql = CONCAT('ALTER TABLE configuration DROP FOREIGN KEY ', @const_name);
PREPARE s from @sql;
EXECUTE s;
DEALLOCATE PREPARE s;

ALTER TABLE configuration add constraint `FK_configuration_schems_id`
FOREIGN KEY (`configuration_schems_id`) REFERENCES `configuration_schems` (`id`) ON DELETE CASCADE ON UPDATE CASCADE;
Expand Up @@ -1081,4 +1081,4 @@ UNLOCK TABLES;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;


-- Dump completed on 2016-08-01 3:23:12 -- Dump completed on 2016-08-08 5:55:48

0 comments on commit 992535f

Please sign in to comment.