Skip to content

Commit

Permalink
NIFI-6925: Fixed JoltTransformRecord for RecordReaders, improved Mock…
Browse files Browse the repository at this point in the history
…ProcessSession (apache#3913)

* NIFI-6925: Fixed JoltTransformRecord for RecordReaders, improved MockProcessSession

* Fixed logic for no records, added unit test

* Fixed PutElasticsearchHttpRecord and PutHive3Streaming, same bug as JoltTransformRecord

* Added null checks
  • Loading branch information
mattyb149 authored and patricker committed Jan 22, 2020
1 parent ff4e22e commit ed08a95
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ public InputStream read(FlowFile flowFile) {
final MockFlowFile mock = validateState(flowFile);

final ByteArrayInputStream bais = new ByteArrayInputStream(mock.getData());
incrementReadCount(flowFile);
final InputStream errorHandlingStream = new InputStream() {
@Override
public int read() throws IOException {
Expand All @@ -602,6 +603,7 @@ public int read(byte[] b, int off, int len) throws IOException {

@Override
public void close() throws IOException {
decrementReadCount(flowFile);
openInputStreams.remove(mock);
bais.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,11 @@ public void testReadWithoutCloseThrowsExceptionOnCommit() throws IOException {

assertEquals("hello, world", new String(buffer));

session.remove(flowFile);

try {
session.commit();
Assert.fail("Was able to commit session without closing InputStream");
} catch (final FlowFileHandlingException ffhe) {
System.out.println(ffhe.toString());
} catch (final FlowFileHandlingException | IllegalStateException e) {
System.out.println(e.toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,17 +576,6 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
i++;
}
}

session.putAttribute(successFlowFile, "record.count", Integer.toString(recordCount - failures.size()));

// Normal behavior is to output with record.count. In order to not break backwards compatibility, set both here.
session.putAttribute(failedFlowFile, "record.count", Integer.toString(failures.size()));
session.putAttribute(failedFlowFile, "failure.count", Integer.toString(failures.size()));

session.transfer(successFlowFile, REL_SUCCESS);
session.transfer(failedFlowFile, REL_FAILURE);
session.remove(inputFlowFile);

} catch (final IOException | SchemaNotFoundException | MalformedRecordException e) {
// We failed while handling individual failures. Not much else we can do other than log, and route the whole thing to failure.
getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", new Object[] {flowFile, e});
Expand All @@ -597,7 +586,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if (failedFlowFile != null) {
session.remove(failedFlowFile);
}
return;
}
session.putAttribute(successFlowFile, "record.count", Integer.toString(recordCount - failures.size()));

// Normal behavior is to output with record.count. In order to not break backwards compatibility, set both here.
session.putAttribute(failedFlowFile, "record.count", Integer.toString(failures.size()));
session.putAttribute(failedFlowFile, "failure.count", Integer.toString(failures.size()));
session.transfer(successFlowFile, REL_SUCCESS);
session.transfer(failedFlowFile, REL_FAILURE);
session.remove(inputFlowFile);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,10 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro

StreamingConnection hiveStreamingConnection = null;

try (final InputStream in = session.read(flowFile)) {
try {
final RecordReader reader;

try {
try(final InputStream in = session.read(flowFile)) {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
try {
Expand All @@ -409,7 +409,6 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile, updateAttributes);
session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri());
session.transfer(flowFile, REL_SUCCESS);
} catch (TransactionError te) {
if (rollbackOnFailure) {
throw new ProcessException(te.getLocalizedMessage(), te);
Expand All @@ -426,8 +425,10 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
rrfe
);
session.transfer(flowFile, REL_FAILURE);
return;
}
}
session.transfer(flowFile, REL_SUCCESS);
} catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) {
if (rollbackOnFailure) {
if (hiveStreamingConnection != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.nifi.util.StringUtils;

import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
Expand Down Expand Up @@ -296,42 +297,44 @@ public void onTrigger(final ProcessContext context, ProcessSession session) thro
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);

final RecordSchema schema;
FlowFile transformed = null;

try (final InputStream in = session.read(original);
final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
schema = writerFactory.getSchema(original.getAttributes(), reader.getSchema());

FlowFile transformed = session.create(original);
final Map<String, String> attributes = new HashMap<>();
final WriteResult writeResult;
transformed = session.create(original);

// We want to transform the first record before creating the Record Writer. We do this because the Record will likely end up with a different structure
// and therefore a difference Schema after being transformed. As a result, we want to transform the Record and then provide the transformed schema to the
// Record Writer so that if the Record Writer chooses to inherit the Record Schema from the Record itself, it will inherit the transformed schema, not the
// schema determined by the Record Reader.
final Record firstRecord = reader.nextRecord();
if (firstRecord == null) {
try (final OutputStream out = session.write(transformed);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, transformed)) {

try {
// We want to transform the first record before creating the Record Writer. We do this because the Record will likely end up with a different structure
// and therefore a difference Schema after being transformed. As a result, we want to transform the Record and then provide the transformed schema to the
// Record Writer so that if the Record Writer chooses to inherit the Record Schema from the Record itself, it will inherit the transformed schema, not the
// schema determined by the Record Reader.
final Record firstRecord = reader.nextRecord();
if (firstRecord == null) {
try (final OutputStream out = session.write(transformed);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, transformed)) {

writer.beginRecordSet();
writeResult = writer.finishRecordSet();

attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
}
writer.beginRecordSet();
writeResult = writer.finishRecordSet();

transformed = session.putAllAttributes(transformed, attributes);
session.transfer(transformed, REL_SUCCESS);
session.transfer(original, REL_ORIGINAL);
logger.info("{} had no Records to transform", new Object[]{original});
return;
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
}

transformed = session.putAllAttributes(transformed, attributes);
logger.info("{} had no Records to transform", new Object[]{original});
} else {

final JoltTransform transform = getTransform(context, original);
final Record transformedFirstRecord = transform(firstRecord, transform);

if (transformedFirstRecord == null) {
throw new ProcessException("Error transforming the first record");
}

final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), transformedFirstRecord.getSchema());

// TODO: Is it possible that two Records with the same input schema could have different schemas after transformation?
Expand All @@ -353,27 +356,34 @@ public void onTrigger(final ProcessContext context, ProcessSession session) thro

writeResult = writer.finishRecordSet();

try {
writer.close();
} catch (final IOException ioe) {
getLogger().warn("Failed to close Writer for {}", new Object[]{transformed});
}

attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
}
} catch (Exception e) {
logger.error("Unable to write transformed records {} due to {}", new Object[]{original, e.toString(), e});
session.remove(transformed);
session.transfer(original, REL_FAILURE);
return;
}

final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAllAttributes(transformed, attributes);
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(original, REL_ORIGINAL);
logger.debug("Transformed {}", new Object[]{original});
final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAllAttributes(transformed, attributes);
session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.debug("Transformed {}", new Object[]{original});
}
} catch (final Exception ex) {
logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString(), ex});
session.transfer(original, REL_FAILURE);
if (transformed != null) {
session.remove(transformed);
}
return;
}
if (transformed != null) {
session.transfer(transformed, REL_SUCCESS);
}
session.transfer(original, REL_ORIGINAL);
}

private Record transform(final Record record, final JoltTransform transform) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,24 @@ public void testNoFlowFileContent() throws IOException {
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 0);
}

@Test
public void testNoRecords() throws IOException {
generateTestData(0, null);
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc")));
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(writer, "Pretty Print JSON", "true");
runner.enableControllerService(writer);
final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json")));
runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec);
runner.enqueue("{}");
runner.run();
runner.assertQueueEmpty();
runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0);
runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1);
runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1);
}

@Test
public void testInvalidFlowFileContent() throws IOException {
generateTestData(1, null);
Expand Down

0 comments on commit ed08a95

Please sign in to comment.