Skip to content

Commit

Permalink
Unit test to reproduce the error with QueryRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
pvillard31 committed Oct 12, 2020
1 parent c296099 commit 1016b4a
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 0 deletions.
Expand Up @@ -605,6 +605,9 @@
<exclude>src/test/resources/TestLookupRecord/lookup-array-output.json</exclude>
<exclude>src/test/resources/TestValidateRecord/int-maps-schema.avsc</exclude>
<exclude>src/test/resources/TestValidateRecord/int-maps-data.json</exclude>
<exclude>src/test/resources/TestQueryRecord/schema/fields-value-name.avsc</exclude>
<exclude>src/test/resources/TestQueryRecord/input/fields-value-name.json</exclude>
<exclude>src/test/resources/TestQueryRecord/output/fields-value-name.json</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
Expand Up @@ -17,8 +17,11 @@
package org.apache.nifi.processors.standard;

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
Expand All @@ -42,6 +45,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.ArrayList;
Expand Down Expand Up @@ -510,6 +515,41 @@ public void testCompareResultsOfTwoRecordPathsAgainstArray() throws Initializati
assertEquals("Software Engineer", output.getValue("title"));
}

@Test
public void testRecordPathWithArrayWithJSONReaderWriter() throws InitializationException, IOException {
TestRunner runner = getRunner();

final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestQueryRecord/schema/fields-value-name.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestQueryRecord/schema/fields-value-name.avsc")));

final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);

final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.enableControllerService(jsonWriter);


runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");

runner.enqueue(Paths.get("src/test/resources/TestQueryRecord/input/fields-value-name.json"));
runner.setProperty(REL_NAME,
"SELECT *" +
" FROM FLOWFILE" +
" WHERE RPATH(field, '/country[/language = ''French'']') = 'France'");

runner.run();
runner.assertAllFlowFilesTransferred(REL_NAME, 1);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestQueryRecord/output/fields-value-name.json")));
runner.getFlowFilesForRelationship(REL_NAME).get(0).assertContentEquals(expectedOutput);
}


@Test
public void testRecordPathWithArrayAndOnlyOneElementMatchingRPath() throws InitializationException {
Expand Down
@@ -0,0 +1,31 @@
[
{
"field" :
[
{
"name" : "id",
"value" : "id-1"
}, {
"name" : "country",
"value" : "France"
}, {
"name" : "language",
"value" : "French"
}
]
}, {
"field" :
[
{
"name" : "id",
"value" : "id-2"
}, {
"name" : "country",
"value" : "Belgium"
}, {
"name" : "language",
"value" : "French"
}
]
}
]
@@ -0,0 +1,17 @@
[
{
"field" :
[
{
"name" : "id",
"value" : "id-1"
}, {
"name" : "country",
"value" : "France"
}, {
"name" : "language",
"value" : "French"
}
]
}
]
@@ -0,0 +1,20 @@
{
"name": "docs",
"namespace": "doc",
"type": "record",
"fields": [ {
"name" : "field",
"type" : {
"type" : "array",
"items" : {
"type" : "record",
"name" : "record",
"namespace" : "name",
"fields" : [
{"name": "name", "type": "string"},
{"name": "value", "type": "string"}
]
}
}
} ]
}

0 comments on commit 1016b4a

Please sign in to comment.