Skip to content

Commit

Permalink
Safe match inserts (#67)
Browse files Browse the repository at this point in the history
## What is the goal of this PR?

We validate that match-inserts only match 1 answer before doing inserts. This is disableable with a new flag `--allowMultiInserts`

## What are the changes implemented in this PR?

* Fix a bunch of tests that relied on `toString()` for equality checks on TypeQL insert queries
* Verify that match-inserts only receive 1 `match` answer before doing inserts. We rewrite `match-insert` with constraints into a `match` using IIDs or Labels to do avoid doing the complex `match` multiple times
  • Loading branch information
flyingsilverfin committed Oct 21, 2022
1 parent ce50107 commit 1ffae11
Show file tree
Hide file tree
Showing 19 changed files with 550 additions and 441 deletions.
Binary file added .DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group 'com.vaticle.typedb-osi'
version '1.4.2'
version '1.5.0'

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class LoadOptions {
@CommandLine.Option(names = {"-ls", "--loadSchema"}, description = "optional - reload schema when continuing a migration (ignored when clean migration)", defaultValue = "false")
public boolean loadSchema;

@CommandLine.Option(names = {"-mi", "--allowMultiInsert"}, description = "Allow match-inserts to match multiple answers and insert for each.", defaultValue = "false")
public boolean multiInsert;

public static LoadOptions parse(String[] args) {
CommandLine commandLine = new CommandLine(new TypeDBLoaderCLI())
.addSubcommand("load", new LoadOptions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package com.vaticle.typedb.osi.loader.generator;

import com.vaticle.typedb.client.api.TypeDBTransaction;
import com.vaticle.typedb.client.api.answer.ConceptMap;
import com.vaticle.typedb.client.common.exception.TypeDBClientException;
import com.vaticle.typedb.osi.loader.config.Configuration;
import com.vaticle.typedb.osi.loader.io.FileLogger;
import com.vaticle.typedb.osi.loader.util.GeneratorUtil;
import com.vaticle.typedb.osi.loader.util.TypeDBUtil;
import com.vaticle.typedb.osi.loader.util.Util;
import com.vaticle.typeql.lang.TypeQL;
import com.vaticle.typeql.lang.pattern.constraint.ThingConstraint;
Expand All @@ -33,6 +35,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import static com.vaticle.typedb.osi.loader.util.TypeDBUtil.safeInsert;

public class AppendAttributeGenerator implements Generator {
private static final Logger dataLogger = LogManager.getLogger("com.vaticle.typedb.osi.loader.error");
Expand All @@ -48,8 +53,8 @@ public AppendAttributeGenerator(String filePath, Configuration.Generator.AppendA
this.fileSeparator = fileSeparator;
}

public void write(TypeDBTransaction tx,
String[] row) {
@Override
public void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert) {
String fileName = FilenameUtils.getName(filePath);
String fileNoExtension = FilenameUtils.removeExtension(fileName);
String originalRow = String.join(Character.toString(fileSeparator), row);
Expand All @@ -59,18 +64,24 @@ public void write(TypeDBTransaction tx,
dataLogger.error("Malformed Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_malformed.log" + ">");
}

TypeQLInsert statement = generateMatchInsertStatement(row);
TypeQLInsert query = generateMatchInsertStatement(row);

if (appendAttributeInsertStatementValid(statement)) {
if (appendAttributeInsertStatementValid(query)) {
try {
tx.query().insert(statement);
Iterator<ConceptMap> answers = TypeDBUtil.executeMatch(tx, query);
if (!answers.hasNext()) {
FileLogger.getLogger().logNoMatches(fileName, originalRow);
dataLogger.error("Match-insert failed - File <" + filePath + "> row <" + originalRow + "> generates query <" + query + "> which matched no answers.");
} else {
safeInsert(tx, query, answers, allowMultiInsert, filePath, originalRow, dataLogger);
}
} catch (TypeDBClientException typeDBClientException) {
FileLogger.getLogger().logUnavailable(fileName, originalRow);
dataLogger.error("TypeDB Unavailable - Row in <" + filePath + "> not inserted - written to <" + fileNoExtension + "_unavailable.log" + ">");
}
} else {
FileLogger.getLogger().logInvalid(fileName, originalRow);
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + statement.toString().replace("\n", " ") + ">");
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + query.toString().replace("\n", " ") + ">");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.vaticle.typedb.osi.loader.config.Configuration;
import com.vaticle.typedb.osi.loader.io.FileLogger;
import com.vaticle.typedb.osi.loader.util.GeneratorUtil;
import com.vaticle.typedb.osi.loader.util.TypeDBUtil;
import com.vaticle.typedb.osi.loader.util.Util;
import com.vaticle.typeql.lang.TypeQL;
import com.vaticle.typeql.lang.pattern.constraint.ThingConstraint;
Expand All @@ -34,8 +35,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.stream.Stream;

import static com.vaticle.typedb.osi.loader.util.TypeDBUtil.safeInsert;

public class AppendAttributeOrInsertThingGenerator implements Generator {
private static final Logger dataLogger = LogManager.getLogger("com.vaticle.typedb.osi.loader.error");
private final String filePath;
Expand All @@ -50,8 +54,8 @@ public AppendAttributeOrInsertThingGenerator(String filePath, Configuration.Gene
this.fileSeparator = fileSeparator;
}

public void write(TypeDBTransaction tx,
String[] row) {
@Override
public void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert) {
String fileName = FilenameUtils.getName(filePath);
String fileNoExtension = FilenameUtils.removeExtension(fileName);
String originalRow = String.join(Character.toString(fileSeparator), row);
Expand All @@ -61,30 +65,32 @@ public void write(TypeDBTransaction tx,
dataLogger.error("Malformed Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_malformed.log" + ">");
}

TypeQLInsert appendStatement = generateMatchInsertStatement(row);
TypeQLInsert insertStatement = generateThingInsertStatement(row);
TypeQLInsert appendQuery = generateMatchInsertStatement(row);
TypeQLInsert insertQuery = generateThingInsertStatement(row);

if (appendAttributeInsertStatementValid(appendStatement)) {
if (appendAttributeInsertStatementValid(appendQuery)) {
try {
final Stream<ConceptMap> insertedStream = tx.query().insert(appendStatement);
if (insertedStream.count() == 0) {
if (thingInsertStatementValid(insertStatement)) {
tx.query().insert(insertStatement);
Iterator<ConceptMap> answers = TypeDBUtil.executeMatch(tx, appendQuery);
if (!answers.hasNext()) {
if (thingInsertStatementValid(insertQuery)) {
tx.query().insert(insertQuery);
} else {
FileLogger.getLogger().logInvalid(fileName, originalRow);
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + insertStatement.toString().replace("\n", " ") + ">");
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + insertQuery.toString().replace("\n", " ") + ">");
}
} else {
safeInsert(tx, appendQuery, answers, allowMultiInsert, filePath, originalRow, dataLogger);
}
} catch (TypeDBClientException typeDBClientException) {
FileLogger.getLogger().logUnavailable(fileName, originalRow);
dataLogger.error("TypeDB Unavailable - Row in <" + filePath + "> not inserted - written to <" + fileNoExtension + "_unavailable.log" + ">");
}
} else {
if (thingInsertStatementValid(insertStatement)) {
tx.query().insert(insertStatement);
if (thingInsertStatementValid(insertQuery)) {
tx.query().insert(insertQuery);
} else {
FileLogger.getLogger().logInvalid(fileName, originalRow);
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statements: <" + appendStatement.toString().replace("\n", " ") + "> and <" + insertStatement.toString().replace("\n", " ") + ">");
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statements: <" + appendQuery.toString().replace("\n", " ") + "> and <" + insertQuery.toString().replace("\n", " ") + ">");
}
}
}
Expand Down Expand Up @@ -149,7 +155,7 @@ public boolean appendAttributeInsertStatementValid(TypeQLInsert insert) {
if (insert == null) return false;
if (!insert.toString().contains("isa " + appendOrInsertConfiguration.getMatch().getType())) return false;
for (Configuration.Definition.Attribute ownershipThingGetter : appendOrInsertConfiguration.getMatch().getOwnerships()) {
if (!insert.toString().contains(", has " + ownershipThingGetter.getAttribute())) return false;
if (!insert.toString().contains("has " + ownershipThingGetter.getAttribute())) return false;
}
if (appendOrInsertConfiguration.getInsert().getRequiredOwnerships() != null) {
for (Configuration.Definition.Attribute attribute : appendOrInsertConfiguration.getInsert().getRequiredOwnerships()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public AttributeGenerator(String filePath, Configuration.Generator.Attribute att
this.fileSeparator = fileSeparator;
}

public void write(TypeDBTransaction tx, String[] row) {
@Override
public void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert) {

String fileName = FilenameUtils.getName(filePath);
String fileNoExtension = FilenameUtils.removeExtension(fileName);
Expand All @@ -62,7 +63,7 @@ public void write(TypeDBTransaction tx, String[] row) {
if (isValid(statement)) {
try {
tx.query().insert(statement);
} catch (TypeDBClientException graknClientException) {
} catch (TypeDBClientException clientException) {
FileLogger.getLogger().logUnavailable(fileName, originalRow);
dataLogger.error("TypeDB Unavailable - Row in <" + filePath + "> not inserted - written to <" + fileNoExtension + "_unavailable.log" + ">");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.vaticle.typedb.osi.loader.config.Configuration;
import com.vaticle.typedb.osi.loader.io.FileLogger;
import com.vaticle.typedb.osi.loader.util.GeneratorUtil;
import com.vaticle.typedb.osi.loader.util.TypeDBUtil;
import com.vaticle.typedb.osi.loader.util.Util;
import com.vaticle.typeql.lang.TypeQL;
import com.vaticle.typeql.lang.pattern.variable.ThingVariable;
Expand All @@ -45,8 +46,8 @@ public EntityGenerator(String filePath, Configuration.Generator.Entity entityCon
this.fileSeparator = fileSeparator;
}

public void write(TypeDBTransaction tx,
String[] row) {
@Override
public void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert) {
String fileName = FilenameUtils.getName(filePath);
String fileNoExtension = FilenameUtils.removeExtension(fileName);
String originalRow = String.join(Character.toString(fileSeparator), row);
Expand All @@ -56,17 +57,17 @@ public void write(TypeDBTransaction tx,
dataLogger.error("Malformed Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_malformed.log" + ">");
}

TypeQLInsert statement = generateThingInsertStatement(row);
if (valid(statement)) {
TypeQLInsert query = generateThingInsertStatement(row);
if (valid(query)) {
try {
tx.query().insert(statement);
tx.query().insert(query);
} catch (TypeDBClientException typeDBClientException) {
FileLogger.getLogger().logUnavailable(fileName, originalRow);
dataLogger.error("TypeDB Unavailable - Row in <" + filePath + "> not inserted - written to <" + fileNoExtension + "_unavailable.log" + ">");
}
} else {
FileLogger.getLogger().logInvalid(fileName, originalRow);
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + statement.toString().replace("\n", " ") + ">");
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + query.toString().replace("\n", " ") + ">");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@
import com.vaticle.typedb.client.api.TypeDBTransaction;

public interface Generator {
void write(TypeDBTransaction tx, String[] row);
void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert);
char getFileSeparator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package com.vaticle.typedb.osi.loader.generator;

import com.vaticle.typedb.client.api.TypeDBTransaction;
import com.vaticle.typedb.client.api.answer.ConceptMap;
import com.vaticle.typedb.client.common.exception.TypeDBClientException;
import com.vaticle.typedb.osi.loader.config.Configuration;
import com.vaticle.typedb.osi.loader.io.FileLogger;
import com.vaticle.typedb.osi.loader.util.GeneratorUtil;
import com.vaticle.typedb.osi.loader.util.TypeDBUtil;
import com.vaticle.typedb.osi.loader.util.Util;
import com.vaticle.typeql.lang.TypeQL;
import com.vaticle.typeql.lang.pattern.constraint.ThingConstraint;
Expand All @@ -33,8 +35,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import static com.vaticle.typedb.osi.loader.util.GeneratorUtil.constrainThingWithHasAttributes;
import static com.vaticle.typedb.osi.loader.util.TypeDBUtil.safeInsert;

public class RelationGenerator implements Generator {
private static final Logger dataLogger = LogManager.getLogger("com.vaticle.typedb.osi.loader.error");
Expand All @@ -50,8 +54,8 @@ public RelationGenerator(String filePath, Configuration.Generator.Relation relat
this.fileSeparator = fileSeparator;
}

public void write(TypeDBTransaction tx,
String[] row) {
@Override
public void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert) {
String fileName = FilenameUtils.getName(filePath);
String fileNoExtension = FilenameUtils.removeExtension(fileName);
String originalRow = String.join(Character.toString(fileSeparator), row);
Expand All @@ -61,18 +65,24 @@ public void write(TypeDBTransaction tx,
dataLogger.error("Malformed Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_malformed.log" + ">");
}

TypeQLInsert statement = generateMatchInsertStatement(row);
TypeQLInsert query = generateMatchInsertStatement(row);

if (relationInsertStatementValid(statement)) {
if (relationInsertStatementValid(query)) {
try {
tx.query().insert(statement);
Iterator<ConceptMap> answers = TypeDBUtil.executeMatch(tx, query);
if (!answers.hasNext()) {
FileLogger.getLogger().logNoMatches(fileName, originalRow);
dataLogger.error("Match-insert failed - File <" + filePath + "> row <" + originalRow + "> generates query <" + query + "> which matched no answers.");
} else {
safeInsert(tx, query, answers, allowMultiInsert, filePath, originalRow, dataLogger);
}
} catch (TypeDBClientException typeDBClientException) {
FileLogger.getLogger().logUnavailable(fileName, originalRow);
dataLogger.error("TypeDB Unavailable - Row in <" + filePath + "> not inserted - written to <" + fileNoExtension + "_unavailable.log" + ">");
}
} else {
FileLogger.getLogger().logInvalid(fileName, originalRow);
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + statement.toString().replace("\n", " ") + ">");
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + query.toString().replace("\n", " ") + ">");
}
}

Expand Down
24 changes: 24 additions & 0 deletions src/main/java/com/vaticle/typedb/osi/loader/io/FileLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,30 @@ public synchronized void logUnavailable(String sourceFile, String errorString) {
}
}

public void logNoMatches(String sourceFile, String row) {
try {
FileWriter fw = new FileWriter(directoryString + "/" + FilenameUtils.removeExtension(sourceFile) + "_no_matches.log", true);
fw.append(row.replace("null", ""));
fw.append("\n");
fw.flush();
fw.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}

public void logTooManyMatches(String sourceFile, String row) {
try {
FileWriter fw = new FileWriter(directoryString + "/" + FilenameUtils.removeExtension(sourceFile) + "_too_many_matches.log", true);
fw.append(row.replace("null", ""));
fw.append("\n");
fw.flush();
fw.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}

public synchronized void logColumnWarnings(String sourceFile, String errorString) {
try {
FileWriter fw = new FileWriter(directoryString + "/" + FilenameUtils.removeExtension(sourceFile) + "_column_type.log", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.vaticle.typedb.client.api.TypeDBTransaction;
import com.vaticle.typedb.common.collection.Either;
import com.vaticle.typedb.common.concurrent.NamedThreadFactory;
import com.vaticle.typedb.osi.loader.cli.LoadOptions;
import com.vaticle.typedb.osi.loader.config.Configuration;
import com.vaticle.typedb.osi.loader.generator.AppendAttributeGenerator;
import com.vaticle.typedb.osi.loader.generator.AppendAttributeOrInsertThingGenerator;
Expand Down Expand Up @@ -52,23 +53,25 @@ public class AsyncLoaderWorker {

private static final DecimalFormat countFormat = new DecimalFormat("#,###");
private static final DecimalFormat decimalFormat = new DecimalFormat("#,###.00");
private final Configuration dc;
private final LoadOptions loadOptions;
private final ExecutorService executor;
private final int threads;
private final String databaseName;
private final AtomicBoolean hasError;
private final int batchGroup;
private final Configuration dc;
private Status status;

private enum Status {OK, ERROR}

public AsyncLoaderWorker(Configuration dc, String databaseName) {
public AsyncLoaderWorker(Configuration dc, LoadOptions loadOptions) {
this.dc = dc;
this.loadOptions = loadOptions;
this.threads = dc.getGlobalConfig().getParallelisation();
this.databaseName = databaseName;
this.databaseName = loadOptions.databaseName;
this.hasError = new AtomicBoolean(false);
this.batchGroup = 1;
this.executor = Executors.newFixedThreadPool(threads, new NamedThreadFactory(databaseName));
this.executor = Executors.newFixedThreadPool(threads, new NamedThreadFactory(this.databaseName));
this.status = Status.OK;
}

Expand Down Expand Up @@ -379,7 +382,7 @@ private CompletableFuture<Void> asyncWrite(int id,
try (TypeDBTransaction tx = session.transaction(TypeDBTransaction.Type.WRITE)) {
rows.forEach(csv -> {
Util.debug("async-writer-{}: {}", id, csv);
gen.write(tx, csv);
gen.write(tx, csv, loadOptions.multiInsert);
});
tx.commit();
}
Expand Down
Loading

0 comments on commit 1ffae11

Please sign in to comment.