Skip to content

Commit

Permalink
Implemented ctl aggregation algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill380 committed Aug 11, 2016
1 parent e48403a commit 76e15dd
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 17 deletions.
Expand Up @@ -13,19 +13,28 @@
import org.kaaproject.kaa.server.common.core.configuration.RawData;
import org.kaaproject.kaa.server.common.core.configuration.RawDataFactory;
import org.kaaproject.kaa.server.common.core.schema.RawSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Arrays.binarySearch;

public class CTLAggregation {
private Connection connection;
private QueryRunner runner;
private DataDefinition dd;

private static final Logger LOG = LoggerFactory.getLogger(CTLAggregation.class);
private Map<Ctl, List<Schema>> confSchemasToCTL;
private Set<Ctl> ctls;


public CTLAggregation(Connection connection) {
this.connection = connection;
runner = new QueryRunner();
Expand All @@ -34,12 +43,12 @@ public CTLAggregation(Connection connection) {


public Map<Ctl, List<Schema>> aggregate(List<Schema> schemas) throws SQLException, ConfigurationGenerationException, IOException {
Map<Ctl, List<Schema>> confSchemasToCTL = new HashMap<>();
confSchemasToCTL = new HashMap<>();
ctls = new HashSet<>();
Long currentCTLMetaId = runner.query(connection, "select max(id) as max_id from ctl_metainfo", rs -> rs.next() ? rs.getLong("max_id") : null);
Long currentCtlId = runner.query(connection, "select max(id) as max_id from ctl", rs -> rs.next() ? rs.getLong("max_id") : null);

// CTL creation
//TODO add check for already existed CTL schema to avoid constrain violation
for (Schema schema : schemas) {
currentCTLMetaId++;
currentCtlId++;
Expand All @@ -49,16 +58,114 @@ public Map<Ctl, List<Schema>> aggregate(List<Schema> schemas) throws SQLExceptio
DefaultRecordGenerationAlgorithm<RawData> algotithm = new DefaultRecordGenerationAlgorithmImpl<>(rawSchema, new RawDataFactory());
String defaultRecord = algotithm.getRootData().getRawData();
Long tenantId = runner.query(connection, "select tenant_id from application where id = " + schema.getAppId(), rs -> rs.next() ? rs.getLong("tenant_id") : null);
Ctl ctl = new Ctl(currentCtlId, new CtlMetaInfo(fqn, schema.getAppId(), tenantId));
Ctl ctl = new Ctl(currentCtlId, new CtlMetaInfo(currentCTLMetaId, fqn, schema.getAppId(), tenantId), defaultRecord);

if (ctls.isEmpty()) {
confSchemasToCTL.put(ctl, asList(schema));
ctls.add(ctl);
} else {
Ctl ctlToCompare = sameFqn(ctls, ctl);

if (ctlToCompare != null) {

if (bothAppIdNull(ctlToCompare, ctl)) {

runner.insert(connection, "insert into ctl_metainfo values(?, ?, ?, ?)", new ScalarHandler<Long>(), currentCTLMetaId, fqn, schema.getAppId(), tenantId);
runner.insert(connection, "insert into ctl values(?, ?, ?, ?, ?, ?, ?)", new ScalarHandler<Long>(), currentCtlId, schema.getSchems(), schema.getCreatedTime(),
schema.getCreatedUsername(), defaultRecord, schema.getVersion(), currentCTLMetaId);
if (sameTenant(ctlToCompare, ctl)) {
aggregateSchemas(ctlToCompare, ctl, schema);
} else {
putToMapSchema(ctlToCompare, ctl, schema, "tenant");
}

} else {

if (sameAppId(ctlToCompare, ctl)) {
aggregateSchemas(ctlToCompare, ctl, schema);
} else {
putToMapSchema(ctlToCompare, ctl, schema, "application");
}

}

} else {
ctlToCompare = sameBody(ctls, ctl);
if (ctlToCompare != null) {
LOG.warn("Schemas {} and {} have different fqn but same body {}", ctl.getMetaInfo().getFqn(), ctlToCompare.getMetaInfo().getFqn(), ctl.getDefaultRecord());
}
confSchemasToCTL.put(ctl, new ArrayList<>(asList(schema)));
ctls.add(ctl);
}
}

confSchemasToCTL.put(ctl, schema);
}

for (Ctl ctl : ctls) {
CtlMetaInfo mi = ctl.getMetaInfo();
Schema s = confSchemasToCTL.get(ctl).get(0);
runner.insert(connection, "insert into ctl_metainfo values(?, ?, ?, ?)", new ScalarHandler<Long>(), mi.getId(), mi.getFqn(), mi.getAppId(), mi.getTenatnId());
runner.insert(connection, "insert into ctl values(?, ?, ?, ?, ?, ?, ?)", new ScalarHandler<Long>(), ctl.getId(), s.getSchems(), s.getCreatedTime(),
s.getCreatedUsername(), ctl.getDefaultRecord(), s.getVersion(), mi.getId());

}

return confSchemasToCTL;

}


private Ctl sameFqn(Set<Ctl> set, Ctl ctl) {
for (Ctl ctl1 : set) {
if (ctl1.getMetaInfo().getFqn().equals(ctl.getMetaInfo().getFqn())) {
return ctl1;
}
}
return null;
}


private Ctl sameBody(Set<Ctl> set, Ctl ctl) {
for (Ctl ctl1 : set) {
if (ctl1.getMetaInfo().getFqn().equals(ctl.getMetaInfo().getFqn())) {
return ctl1;
}
}
return null;
}

private boolean bothAppIdNull(Ctl c1, Ctl c2) {
return c1.getMetaInfo().getAppId() != null && c2.getMetaInfo().getAppId() != null;
}


private boolean sameBody(Ctl c1, Ctl c2) {
return c1.getDefaultRecord().equals(c2.getDefaultRecord());
}

private boolean sameAppId(Ctl c1, Ctl c2) {
return c1.getMetaInfo().getAppId().equals(c2.getMetaInfo().getAppId());
}


private boolean sameTenant(Ctl c1, Ctl c2) {
return c1.getMetaInfo().getTenatnId().equals(c2.getMetaInfo().getTenatnId());
}


///TODO in future add npromotion if needed
private void putToMapSchema(Ctl c1, Ctl newCtl, Schema schema, String scope) {
if (!sameBody(c1, newCtl)) {
LOG.warn("Schemas in different %ss' scopes have different bodies {} and {} but the same fqn {}", scope, newCtl.getDefaultRecord(), c1.getDefaultRecord(), newCtl.getMetaInfo().getFqn());
}
confSchemasToCTL.put(newCtl, new ArrayList<>(asList(schema)));
ctls.add(newCtl);
}

private void aggregateSchemas(Ctl c1, Ctl c2, Schema schema) {
if (!sameBody(c1, c2)) {
CtlMetaInfo mi = c1.getMetaInfo();
String message = format("Unable to do migrate due to schemas with same fqn[%s] and scope[appId=%d, tenant=%d] but different bodies", mi.getFqn(), mi.getAppId(), mi.getTenatnId());
throw new MigrationException(message);
}
List<Schema> sc = confSchemasToCTL.get(c1);
sc.add(schema);
}
}
@@ -0,0 +1,11 @@
package org.kaaproject.data_migration;


public class MigrationException extends RuntimeException {

public MigrationException(String message) {
super(message);
}


}
@@ -1,12 +1,17 @@
package org.kaaproject.data_migration.model;


import com.sun.istack.internal.NotNull;

public class Ctl {
private final Long id;
private final CtlMetaInfo metaInfo;
private final String defaultRecord;

public Ctl(Long id, CtlMetaInfo metaInfo) {
public Ctl(Long id, CtlMetaInfo metaInfo, String defaultRecord) {
this.id = id;
this.metaInfo = metaInfo;
this.defaultRecord = defaultRecord;
}

@Override
Expand All @@ -16,14 +21,16 @@ public boolean equals(Object o) {

Ctl ctl = (Ctl) o;

if (id != null ? !id.equals(ctl.id) : ctl.id != null) return false;
return metaInfo.equals(ctl.metaInfo);
if (!metaInfo.equals(ctl.metaInfo)) return false;
return defaultRecord.equals(ctl.defaultRecord);

}

@Override
public int hashCode() {
return metaInfo.hashCode();
int result = metaInfo.hashCode();
result = 31 * result + defaultRecord.hashCode();
return result;
}

public Long getId() {
Expand All @@ -33,4 +40,8 @@ public Long getId() {
public CtlMetaInfo getMetaInfo() {
return metaInfo;
}

public String getDefaultRecord() {
return defaultRecord;
}
}
@@ -1,12 +1,13 @@
package org.kaaproject.data_migration.model;

public class CtlMetaInfo {

private final Long id;
private final String fqn;
private final Long appId;
private final Long tenatnId;

public CtlMetaInfo(String fqn, Long appId, Long tenatnId) {
public CtlMetaInfo(Long id, String fqn, Long appId, Long tenatnId) {
this.id = id;
this.fqn = fqn;
this.appId = appId;
this.tenatnId = tenatnId;
Expand All @@ -20,14 +21,16 @@ public boolean equals(Object o) {
CtlMetaInfo that = (CtlMetaInfo) o;

if (!fqn.equals(that.fqn)) return false;
return appId != null ? appId.equals(that.appId) : that.appId == null;
if (appId != null ? !appId.equals(that.appId) : that.appId != null) return false;
return tenatnId != null ? tenatnId.equals(that.tenatnId) : that.tenatnId == null;

}

@Override
public int hashCode() {
int result = fqn.hashCode();
result = 31 * result + (appId != null ? appId.hashCode() : 0);
result = 31 * result + (tenatnId != null ? tenatnId.hashCode() : 0);
return result;
}

Expand All @@ -42,4 +45,8 @@ public Long getTenatnId() {
public String getFqn() {
return fqn;
}

public Long getId() {
return id;
}
}

0 comments on commit 76e15dd

Please sign in to comment.