Skip to content

Commit

Permalink
NIFI-12318: Fixed byte array generation in GenerateRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Dec 3, 2023
1 parent 3b0d810 commit 99f552d
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private Object generateValueFromRecordField(RecordField recordField, Faker faker
case BOOLEAN:
return FakerUtils.getFakeData("Bool.bool", faker);
case BYTE:
return faker.number().numberBetween(Byte.MIN_VALUE, Byte.MAX_VALUE);
return (byte) faker.number().numberBetween(Byte.MIN_VALUE, Byte.MAX_VALUE);
case CHAR:
return (char) faker.number().numberBetween(Character.MIN_VALUE, Character.MAX_VALUE);
case DATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@
*/
package org.apache.nifi.processors.standard;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroReader;
import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.processors.standard.faker.FakerMethodHolder;
import org.apache.nifi.processors.standard.faker.FakerUtils;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -43,8 +44,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -181,7 +184,7 @@ public void testGenerateNoNullableFieldsSchemaText() throws Exception {
testRunner.setProperty(GenerateRecord.SCHEMA_TEXT, schemaText);
testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true"); // Should be ignored
testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "0");
testRunner.setProperty(GenerateRecord.NUM_RECORDS, "3");
testRunner.setProperty(GenerateRecord.NUM_RECORDS, "30");

testRunner.run();
testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
Expand All @@ -198,7 +201,7 @@ public void testGenerateNoNullableFieldsSchemaText() throws Exception {
@Test
public void testGenerateNullableFieldsZeroNullPercentageSchemaText() throws Exception {
String schemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGenerateRecord/nested_nullable.avsc")));
final JsonRecordSetWriter recordWriter = new JsonRecordSetWriter();
final AvroRecordSetWriter recordWriter = new AvroRecordSetWriter();
testRunner.addControllerService("record-writer", recordWriter);
testRunner.enableControllerService(recordWriter);
testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
Expand All @@ -209,43 +212,55 @@ public void testGenerateNullableFieldsZeroNullPercentageSchemaText() throws Exce

testRunner.run();
testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0);
final String output = flowFile.getContent();
final JsonFactory jsonFactory = new JsonFactory();
try (JsonParser jsonParser = jsonFactory.createParser(output)) {
jsonParser.setCodec(new ObjectMapper());
JsonNode recordArray = jsonParser.readValueAsTree();
assertTrue(recordArray instanceof ArrayNode);
JsonNode recordNode = recordArray.get(0);
JsonNode systemNode = recordNode.get("System");
assertNotNull(systemNode);
JsonNode providerNode = systemNode.get("Provider");
assertNotNull(providerNode);
JsonNode guidNode = providerNode.get("Guid");
assertNotNull(guidNode);
assertNotNull(guidNode.asText());
JsonNode nameNode = providerNode.get("Name");
assertNotNull(nameNode);
assertNotNull(nameNode.asText());
JsonNode eventIdNode = systemNode.get("EventID");
assertNotNull(eventIdNode);
eventIdNode.asInt(); // This would throw a NullPointerException if the value was null
JsonNode eventDataNode = recordNode.get("EventData");
assertNotNull(eventDataNode);
JsonNode dataNode = eventDataNode.get("Data");
assertNotNull(dataNode);
assertTrue(dataNode instanceof ArrayNode);
assertTrue(dataNode.size() <= 10 && dataNode.size() >= 0);
for (int i = 0; i < dataNode.size(); i++) {
JsonNode dataElementNode = dataNode.get(i);
assertNotNull(dataElementNode);
JsonNode dataElementNameNode = dataElementNode.get("Name");
assertNotNull(dataElementNameNode);
assertNotNull(dataElementNameNode.asText());
JsonNode dataElementDataNode = dataElementNode.get("DataElement");
assertNotNull(dataElementDataNode);
assertNotNull(dataElementDataNode.asText());
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0);

final AvroReader avroReader = new AvroReader();
testRunner.addControllerService("avroReader", avroReader);
testRunner.enableControllerService(avroReader);
final byte[] validFlowFileBytes = flowFile.toByteArray();
try (
final ByteArrayInputStream resultContentStream = new ByteArrayInputStream(validFlowFileBytes);
final RecordReader recordReader = avroReader.createRecordReader(flowFile.getAttributes(), resultContentStream, validFlowFileBytes.length, testRunner.getLogger());
) {
// Check correct schema
final RecordSchema resultSchema = recordReader.getSchema();

final Optional<RecordField> systemField = resultSchema.getField("System");
assertTrue(systemField.isPresent());
assertEquals(RecordFieldType.RECORD, systemField.get().getDataType().getFieldType());
RecordDataType systemRecordType = (RecordDataType) systemField.get().getDataType();
RecordSchema systemSchema = systemRecordType.getChildSchema();

final Optional<RecordField> providerField = systemSchema.getField("Provider");
assertTrue(providerField.isPresent());
assertEquals(RecordFieldType.RECORD, providerField.get().getDataType().getFieldType());
RecordDataType providerRecordType = (RecordDataType) providerField.get().getDataType();
RecordSchema providerSchema = providerRecordType.getChildSchema();

final Optional<RecordField> guidField = providerSchema.getField("Guid");
assertTrue(guidField.isPresent());
assertEquals(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()), guidField.get().getDataType());

// Check object type, class etc.
final org.apache.nifi.serialization.record.Record record = recordReader.nextRecord();
assertNotNull(record);
final Object systemObject = record.getValue("System");
assertNotNull(systemObject);
assertTrue(systemObject instanceof org.apache.nifi.serialization.record.Record);
final org.apache.nifi.serialization.record.Record systemRecord = (org.apache.nifi.serialization.record.Record) systemObject;
final Object providerObject = systemRecord.getValue("Provider");
assertNotNull(providerObject);
assertTrue(providerObject instanceof org.apache.nifi.serialization.record.Record);
final org.apache.nifi.serialization.record.Record providerRecord = (org.apache.nifi.serialization.record.Record) providerObject;
final Object guidObject = providerRecord.getValue("Guid");
assertNotNull(guidObject);
assertTrue(guidObject instanceof Object[]);
// Check for array of Byte objects if not empty
Object[] guidArray = (Object[]) guidObject;
if (guidArray.length > 0) {
assertTrue(guidArray[0] instanceof Byte);
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"name": "ProviderType",
"fields": [{
"name": "Guid",
"type": "string"
"type": "bytes"
}, {
"name": "Name",
"type": "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"name": "ProviderType",
"fields": [{
"name": "Guid",
"type": ["null", "string"]
"type": ["null", "bytes"]
}, {
"name": "Name",
"type": ["null", "string"]
Expand Down

0 comments on commit 99f552d

Please sign in to comment.