diff --git a/spring-batch-docs/asciidoc/whatsnew.adoc b/spring-batch-docs/asciidoc/whatsnew.adoc index 15299c40ec..c0beda4a16 100644 --- a/spring-batch-docs/asciidoc/whatsnew.adoc +++ b/spring-batch-docs/asciidoc/whatsnew.adoc @@ -10,6 +10,7 @@ Spring Batch 4.2 adds the following features: * Support for batch metrics with https://micrometer.io[Micrometer] * Support for reading/writing data from/to https://kafka.apache.org[Apache Kafka] topics +* Support for reading/writing data from/to https://avro.apache.org[Apache Avro] resources * Improved documentation [[whatsNewMetrics]] @@ -32,6 +33,13 @@ This release adds a new `KafkaItemReader` and `KafkaItemWriter` to read data fro write it to Kafka topics. For more details about these new components, please refer to the https://docs.spring.io/spring-batch/4.2.x/api/index.html[Javadoc]. +[[whatsNewAvro]] +=== Apache Avro item reader/writer + +This release adds a new `AvroItemReader` and `AvroItemWriter` to read data from and +write it to Avro resources. For more details about these new components, please refer +to the https://docs.spring.io/spring-batch/4.2.x/api/index.html[Javadoc]. + [[whatsNewDocs]] === Documentation updates diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/AvroItemReader.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/AvroItemReader.java index 4b0fe0b9d2..e34dc83079 100755 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/AvroItemReader.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/AvroItemReader.java @@ -39,6 +39,7 @@ * An {@link ItemReader} that deserializes data from a {@link Resource} containing serialized Avro objects. * * @author David Turanski + * @author Mahmoud Ben Hassine * @since 4.2 */ public class AvroItemReader extends AbstractItemCountingItemStreamItemReader { @@ -102,10 +103,10 @@ public void setEmbeddedSchema(boolean embeddedSchema) { @Override protected T doRead() throws Exception { - if (inputStreamReader != null) { - return inputStreamReader.read(); + if (this.inputStreamReader != null) { + return this.inputStreamReader.read(); } - return dataFileReader.hasNext()? dataFileReader.next(): null; + return this.dataFileReader.hasNext()? this.dataFileReader.next(): null; } @Override @@ -116,10 +117,10 @@ protected void doOpen() throws Exception { @Override protected void doClose() throws Exception { if (this.inputStreamReader != null) { - inputStreamReader.close(); + this.inputStreamReader.close(); return; } - dataFileReader.close(); + this.dataFileReader.close(); } private void initializeReader() throws IOException { @@ -160,8 +161,8 @@ private InputStreamReader(InputStream inputStream, DatumReader datumReader) { } private T read() throws Exception { - if (!binaryDecoder.isEnd()) { - return datumReader.read(null, binaryDecoder); + if (!this.binaryDecoder.isEnd()) { + return this.datumReader.read(null, this.binaryDecoder); } return null; } diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/AvroItemWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/AvroItemWriter.java index 5fef17ce9b..48b54aa1d2 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/AvroItemWriter.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/AvroItemWriter.java @@ -46,6 +46,7 @@ * * @since 4.2 * @author David Turanski + * @author Mahmoud Ben Hassine */ public class AvroItemWriter extends AbstractItemStreamItemWriter { @@ -55,8 +56,6 @@ public class AvroItemWriter extends AbstractItemStreamItemWriter { private WritableResource resource; - private Schema schema; - private Resource schemaResource; private Class clazz; @@ -109,6 +108,7 @@ public void write(List items) throws Exception { */ @Override public void open(ExecutionContext executionContext) { + super.open(executionContext); try { initializeWriter(); } catch (IOException e) { @@ -120,7 +120,7 @@ public void open(ExecutionContext executionContext) { public void close() { try { if (this.dataFileWriter != null) { - dataFileWriter.close(); + this.dataFileWriter.close(); } else { this.outputStreamWriter.close(); @@ -132,22 +132,23 @@ public void close() { } private void initializeWriter() throws IOException { - Assert.notNull(resource, "'resource' is required."); - Assert.notNull(clazz, "'class' is required."); + Assert.notNull(this.resource, "'resource' is required."); + Assert.notNull(this.clazz, "'class' is required."); if (this.embedSchema) { Assert.notNull(this.schemaResource, "'schema' is required."); Assert.state(this.schemaResource.exists(), "'schema' " + this.schemaResource.getFilename() + " does not exist."); + Schema schema; try { - this.schema = new Schema.Parser().parse(this.schemaResource.getInputStream()); + schema = new Schema.Parser().parse(this.schemaResource.getInputStream()); } catch (IOException e) { throw new IllegalArgumentException(e.getMessage(), e); } this.dataFileWriter = new DataFileWriter<>(datumWriterForClass(this.clazz)); - this.dataFileWriter.create(this.schema, this.resource.getOutputStream()); + this.dataFileWriter.create(schema, this.resource.getOutputStream()); } else { - this.outputStreamWriter = createOutputStreamWriter(resource.getOutputStream(), + this.outputStreamWriter = createOutputStreamWriter(this.resource.getOutputStream(), datumWriterForClass(this.clazz)); } @@ -184,8 +185,8 @@ private OutputStreamWriter(OutputStream outputStream, DatumWriter datumWriter } private void write(T datum) throws Exception { - datumWriter.write(datum, binaryEncoder); - binaryEncoder.flush(); + this.datumWriter.write(datum, this.binaryEncoder); + this.binaryEncoder.flush(); } private void close() { diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/builder/AvroItemReaderBuilder.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/builder/AvroItemReaderBuilder.java index 449357d69e..9ae3c8ef19 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/builder/AvroItemReaderBuilder.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/builder/AvroItemReaderBuilder.java @@ -25,9 +25,10 @@ import org.springframework.util.StringUtils; /** - * A builder implementation for the {@link AvroItemReader} + * A builder implementation for the {@link AvroItemReader}. * * @author David Turanski + * @author Mahmoud Ben Hassine * @since 4.2 */ public class AvroItemReaderBuilder { @@ -196,7 +197,7 @@ private AvroItemReader buildForType() { private AvroItemReader buildForSchema() { Assert.notNull(this.schema, "'schema' is required."); - return new AvroItemReader<>(resource, schema); + return new AvroItemReader<>(this.resource, this.schema); } diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/builder/AvroItemWriterBuilder.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/builder/AvroItemWriterBuilder.java index 1c48cd287f..a929991b28 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/builder/AvroItemWriterBuilder.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/avro/builder/AvroItemWriterBuilder.java @@ -23,7 +23,10 @@ import org.springframework.util.Assert; /** + * A builder implementation for the {@link AvroItemWriter}. + * * @author David Turanski + * @author Mahmoud Ben Hassine * @since 4.2 */ public class AvroItemWriterBuilder { @@ -107,11 +110,10 @@ public AvroItemWriter build() { Assert.notNull(this.type, "A 'type' is required."); - AvroItemWriter avroItemWriter = this.schema != null ? - new AvroItemWriter(this.resource, this.schema, this.type): + new AvroItemWriter<>(this.resource, this.schema, this.type): new AvroItemWriter<>(this.resource, this.type); - avroItemWriter.setName(name); + avroItemWriter.setName(this.name); return avroItemWriter; } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/AvroItemWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/AvroItemWriterTests.java index 3eb9349486..d1f672f15d 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/AvroItemWriterTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/AvroItemWriterTests.java @@ -28,6 +28,7 @@ /** * @author David Turanski + * @author Mahmoud Ben Hassine */ public class AvroItemWriterTests extends AvroItemWriterTestSupport { @@ -38,62 +39,60 @@ public class AvroItemWriterTests extends AvroItemWriterTestSupport { @Test public void itemWriterForAvroGeneratedClass() throws Exception { - AvroItemWriter avroItemWriter = new AvroItemWriter<>(output,schemaResource, User.class); + AvroItemWriter avroItemWriter = new AvroItemWriter<>(this.output, this.schemaResource, User.class); avroItemWriter.open(new ExecutionContext()); avroItemWriter.write(this.avroGeneratedUsers()); avroItemWriter.close(); - verifyRecordsWithEmbeddedHeader(outputStream.toByteArray(), this.avroGeneratedUsers(), User.class); + verifyRecordsWithEmbeddedHeader(this.outputStream.toByteArray(), this.avroGeneratedUsers(), User.class); } @Test public void itemWriterForGenericRecords() throws Exception { AvroItemWriter avroItemWriter = - new AvroItemWriter<>(output,plainOldUserSchemaResource,GenericRecord.class); + new AvroItemWriter<>(this.output, this.plainOldUserSchemaResource, GenericRecord.class); avroItemWriter.open(new ExecutionContext()); avroItemWriter.write(this.genericPlainOldUsers()); avroItemWriter.close(); - verifyRecordsWithEmbeddedHeader(outputStream.toByteArray(), this.genericPlainOldUsers(), GenericRecord.class); + verifyRecordsWithEmbeddedHeader(this.outputStream.toByteArray(), this.genericPlainOldUsers(), GenericRecord.class); } @Test public void itemWriterForPojos() throws Exception { - AvroItemWriter avroItemWriter = new AvroItemWriter(output,plainOldUserSchemaResource, PlainOldUser.class); + AvroItemWriter avroItemWriter = new AvroItemWriter<>(this.output, this.plainOldUserSchemaResource, PlainOldUser.class); avroItemWriter.open(new ExecutionContext()); avroItemWriter.write(this.plainOldUsers()); avroItemWriter.close(); - verifyRecordsWithEmbeddedHeader(outputStream.toByteArray(), this.plainOldUsers(), PlainOldUser.class); + verifyRecordsWithEmbeddedHeader(this.outputStream.toByteArray(), this.plainOldUsers(), PlainOldUser.class); } @Test public void itemWriterWithNoEmbeddedHeaders() throws Exception { - AvroItemWriter avroItemWriter = new AvroItemWriter(output, PlainOldUser.class); + AvroItemWriter avroItemWriter = new AvroItemWriter<>(this.output, PlainOldUser.class); avroItemWriter.open(new ExecutionContext()); avroItemWriter.write(this.plainOldUsers()); avroItemWriter.close(); - verifyRecords(outputStream.toByteArray(), this.plainOldUsers(), PlainOldUser.class, false); + verifyRecords(this.outputStream.toByteArray(), this.plainOldUsers(), PlainOldUser.class, false); } @Test(expected = IllegalArgumentException.class) public void shouldFailWitNoOutput() { - - new AvroItemWriter(null, schemaResource, User.class).open(new ExecutionContext());; + new AvroItemWriter<>(null, this.schemaResource, User.class).open(new ExecutionContext()); } @Test(expected = IllegalArgumentException.class) public void shouldFailWitNoType() { - new AvroItemWriter(output, schemaResource, null).open(new ExecutionContext()); - + new AvroItemWriter<>(this.output, this.schemaResource, null).open(new ExecutionContext()); } } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/example/AvroTestUtils.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/example/AvroTestUtils.java index c782f9a171..be78295e8c 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/example/AvroTestUtils.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/example/AvroTestUtils.java @@ -23,9 +23,7 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificDatumWriter; -import org.springframework.batch.item.avro.support.AvroTestFixtures; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; @@ -33,6 +31,7 @@ * Used to create test data. See http://avro.apache.org/docs/1.9.0/gettingstartedjava.html * * @author David Turanski + * @author Mahmoud Ben Hassine */ class AvroTestUtils { diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/support/AvroTestFixtures.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/support/AvroTestFixtures.java index 11af7ac80e..2d1a6a7f06 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/support/AvroTestFixtures.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/avro/support/AvroTestFixtures.java @@ -40,10 +40,10 @@ /** * @author David Turanski + * @author Mahmoud Ben Hassine */ public abstract class AvroTestFixtures { - //@formatter:off private final List avroGeneratedUsers = Arrays.asList( new User("David", 20, "blue"), @@ -51,7 +51,7 @@ public abstract class AvroTestFixtures { new User("Alana", 13, "yellow"), new User("Joe", 1, "pink")); - List plainOldUsers = Arrays.asList( + private List plainOldUsers = Arrays.asList( new PlainOldUser("David", 20, "blue"), new PlainOldUser("Sue", 4, "red"), new PlainOldUser("Alana", 13, "yellow"),