Skip to content

Commit

Permalink
Polish bd53168
Browse files Browse the repository at this point in the history
  • Loading branch information
fmbenhassine committed Jul 30, 2019
1 parent bd53168 commit f26165e
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 38 deletions.
8 changes: 8 additions & 0 deletions spring-batch-docs/asciidoc/whatsnew.adoc
Expand Up @@ -10,6 +10,7 @@ Spring Batch 4.2 adds the following features:

* Support for batch metrics with https://micrometer.io[Micrometer]
* Support for reading/writing data from/to https://kafka.apache.org[Apache Kafka] topics
* Support for reading/writing data from/to https://avro.apache.org[Apache Avro] resources
* Improved documentation

[[whatsNewMetrics]]
Expand All @@ -32,6 +33,13 @@ This release adds a new `KafkaItemReader` and `KafkaItemWriter` to read data fro
write it to Kafka topics. For more details about these new components, please refer
to the https://docs.spring.io/spring-batch/4.2.x/api/index.html[Javadoc].

[[whatsNewAvro]]
=== Apache Avro item reader/writer

This release adds a new `AvroItemReader` and `AvroItemWriter` to read data from and
write it to Avro resources. For more details about these new components, please refer
to the https://docs.spring.io/spring-batch/4.2.x/api/index.html[Javadoc].

[[whatsNewDocs]]
=== Documentation updates

Expand Down
Expand Up @@ -39,6 +39,7 @@
* An {@link ItemReader} that deserializes data from a {@link Resource} containing serialized Avro objects.
*
* @author David Turanski
* @author Mahmoud Ben Hassine
* @since 4.2
*/
public class AvroItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> {
Expand Down Expand Up @@ -102,10 +103,10 @@ public void setEmbeddedSchema(boolean embeddedSchema) {

@Override
protected T doRead() throws Exception {
if (inputStreamReader != null) {
return inputStreamReader.read();
if (this.inputStreamReader != null) {
return this.inputStreamReader.read();
}
return dataFileReader.hasNext()? dataFileReader.next(): null;
return this.dataFileReader.hasNext()? this.dataFileReader.next(): null;
}

@Override
Expand All @@ -116,10 +117,10 @@ protected void doOpen() throws Exception {
@Override
protected void doClose() throws Exception {
if (this.inputStreamReader != null) {
inputStreamReader.close();
this.inputStreamReader.close();
return;
}
dataFileReader.close();
this.dataFileReader.close();
}

private void initializeReader() throws IOException {
Expand Down Expand Up @@ -160,8 +161,8 @@ private InputStreamReader(InputStream inputStream, DatumReader<T> datumReader) {
}

private T read() throws Exception {
if (!binaryDecoder.isEnd()) {
return datumReader.read(null, binaryDecoder);
if (!this.binaryDecoder.isEnd()) {
return this.datumReader.read(null, this.binaryDecoder);
}
return null;
}
Expand Down
Expand Up @@ -46,6 +46,7 @@
*
* @since 4.2
* @author David Turanski
* @author Mahmoud Ben Hassine
*/
public class AvroItemWriter<T> extends AbstractItemStreamItemWriter<T> {

Expand All @@ -55,8 +56,6 @@ public class AvroItemWriter<T> extends AbstractItemStreamItemWriter<T> {

private WritableResource resource;

private Schema schema;

private Resource schemaResource;

private Class<T> clazz;
Expand Down Expand Up @@ -109,6 +108,7 @@ public void write(List<? extends T> items) throws Exception {
*/
@Override
public void open(ExecutionContext executionContext) {
super.open(executionContext);
try {
initializeWriter();
} catch (IOException e) {
Expand All @@ -120,7 +120,7 @@ public void open(ExecutionContext executionContext) {
public void close() {
try {
if (this.dataFileWriter != null) {
dataFileWriter.close();
this.dataFileWriter.close();
}
else {
this.outputStreamWriter.close();
Expand All @@ -132,22 +132,23 @@ public void close() {
}

private void initializeWriter() throws IOException {
Assert.notNull(resource, "'resource' is required.");
Assert.notNull(clazz, "'class' is required.");
Assert.notNull(this.resource, "'resource' is required.");
Assert.notNull(this.clazz, "'class' is required.");

if (this.embedSchema) {
Assert.notNull(this.schemaResource, "'schema' is required.");
Assert.state(this.schemaResource.exists(),
"'schema' " + this.schemaResource.getFilename() + " does not exist.");
Schema schema;
try {
this.schema = new Schema.Parser().parse(this.schemaResource.getInputStream());
schema = new Schema.Parser().parse(this.schemaResource.getInputStream());
} catch (IOException e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
this.dataFileWriter = new DataFileWriter<>(datumWriterForClass(this.clazz));
this.dataFileWriter.create(this.schema, this.resource.getOutputStream());
this.dataFileWriter.create(schema, this.resource.getOutputStream());
} else {
this.outputStreamWriter = createOutputStreamWriter(resource.getOutputStream(),
this.outputStreamWriter = createOutputStreamWriter(this.resource.getOutputStream(),
datumWriterForClass(this.clazz));
}

Expand Down Expand Up @@ -184,8 +185,8 @@ private OutputStreamWriter(OutputStream outputStream, DatumWriter<T> datumWriter
}

private void write(T datum) throws Exception {
datumWriter.write(datum, binaryEncoder);
binaryEncoder.flush();
this.datumWriter.write(datum, this.binaryEncoder);
this.binaryEncoder.flush();
}

private void close() {
Expand Down
Expand Up @@ -25,9 +25,10 @@
import org.springframework.util.StringUtils;

/**
* A builder implementation for the {@link AvroItemReader}
* A builder implementation for the {@link AvroItemReader}.
*
* @author David Turanski
* @author Mahmoud Ben Hassine
* @since 4.2
*/
public class AvroItemReaderBuilder<T> {
Expand Down Expand Up @@ -196,7 +197,7 @@ private AvroItemReader<T> buildForType() {

private AvroItemReader<T> buildForSchema() {
Assert.notNull(this.schema, "'schema' is required.");
return new AvroItemReader<>(resource, schema);
return new AvroItemReader<>(this.resource, this.schema);
}


Expand Down
Expand Up @@ -23,7 +23,10 @@
import org.springframework.util.Assert;

/**
* A builder implementation for the {@link AvroItemWriter}.
*
* @author David Turanski
* @author Mahmoud Ben Hassine
* @since 4.2
*/
public class AvroItemWriterBuilder<T> {
Expand Down Expand Up @@ -107,11 +110,10 @@ public AvroItemWriter<T> build() {

Assert.notNull(this.type, "A 'type' is required.");


AvroItemWriter<T> avroItemWriter = this.schema != null ?
new AvroItemWriter(this.resource, this.schema, this.type):
new AvroItemWriter<>(this.resource, this.schema, this.type):
new AvroItemWriter<>(this.resource, this.type);
avroItemWriter.setName(name);
avroItemWriter.setName(this.name);
return avroItemWriter;
}

Expand Down
Expand Up @@ -28,6 +28,7 @@

/**
* @author David Turanski
* @author Mahmoud Ben Hassine
*/
public class AvroItemWriterTests extends AvroItemWriterTestSupport {

Expand All @@ -38,62 +39,60 @@ public class AvroItemWriterTests extends AvroItemWriterTestSupport {
@Test
public void itemWriterForAvroGeneratedClass() throws Exception {

AvroItemWriter<User> avroItemWriter = new AvroItemWriter<>(output,schemaResource, User.class);
AvroItemWriter<User> avroItemWriter = new AvroItemWriter<>(this.output, this.schemaResource, User.class);
avroItemWriter.open(new ExecutionContext());
avroItemWriter.write(this.avroGeneratedUsers());
avroItemWriter.close();

verifyRecordsWithEmbeddedHeader(outputStream.toByteArray(), this.avroGeneratedUsers(), User.class);
verifyRecordsWithEmbeddedHeader(this.outputStream.toByteArray(), this.avroGeneratedUsers(), User.class);
}

@Test
public void itemWriterForGenericRecords() throws Exception {

AvroItemWriter<GenericRecord> avroItemWriter =
new AvroItemWriter<>(output,plainOldUserSchemaResource,GenericRecord.class);
new AvroItemWriter<>(this.output, this.plainOldUserSchemaResource, GenericRecord.class);

avroItemWriter.open(new ExecutionContext());
avroItemWriter.write(this.genericPlainOldUsers());
avroItemWriter.close();

verifyRecordsWithEmbeddedHeader(outputStream.toByteArray(), this.genericPlainOldUsers(), GenericRecord.class);
verifyRecordsWithEmbeddedHeader(this.outputStream.toByteArray(), this.genericPlainOldUsers(), GenericRecord.class);

}

@Test
public void itemWriterForPojos() throws Exception {

AvroItemWriter<PlainOldUser> avroItemWriter = new AvroItemWriter(output,plainOldUserSchemaResource, PlainOldUser.class);
AvroItemWriter<PlainOldUser> avroItemWriter = new AvroItemWriter<>(this.output, this.plainOldUserSchemaResource, PlainOldUser.class);
avroItemWriter.open(new ExecutionContext());
avroItemWriter.write(this.plainOldUsers());
avroItemWriter.close();

verifyRecordsWithEmbeddedHeader(outputStream.toByteArray(), this.plainOldUsers(), PlainOldUser.class);
verifyRecordsWithEmbeddedHeader(this.outputStream.toByteArray(), this.plainOldUsers(), PlainOldUser.class);

}

@Test
public void itemWriterWithNoEmbeddedHeaders() throws Exception {

AvroItemWriter<PlainOldUser> avroItemWriter = new AvroItemWriter(output, PlainOldUser.class);
AvroItemWriter<PlainOldUser> avroItemWriter = new AvroItemWriter<>(this.output, PlainOldUser.class);
avroItemWriter.open(new ExecutionContext());
avroItemWriter.write(this.plainOldUsers());
avroItemWriter.close();

verifyRecords(outputStream.toByteArray(), this.plainOldUsers(), PlainOldUser.class, false);
verifyRecords(this.outputStream.toByteArray(), this.plainOldUsers(), PlainOldUser.class, false);

}

@Test(expected = IllegalArgumentException.class)
public void shouldFailWitNoOutput() {

new AvroItemWriter(null, schemaResource, User.class).open(new ExecutionContext());;
new AvroItemWriter<>(null, this.schemaResource, User.class).open(new ExecutionContext());

}

@Test(expected = IllegalArgumentException.class)
public void shouldFailWitNoType() {
new AvroItemWriter(output, schemaResource, null).open(new ExecutionContext());

new AvroItemWriter<>(this.output, this.schemaResource, null).open(new ExecutionContext());
}
}
Expand Up @@ -23,16 +23,15 @@
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.springframework.batch.item.avro.support.AvroTestFixtures;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

/**
* Used to create test data. See http://avro.apache.org/docs/1.9.0/gettingstartedjava.html
*
* @author David Turanski
* @author Mahmoud Ben Hassine
*/
class AvroTestUtils {

Expand Down
Expand Up @@ -40,18 +40,18 @@

/**
* @author David Turanski
* @author Mahmoud Ben Hassine
*/
public abstract class AvroTestFixtures {


//@formatter:off
private final List<User> avroGeneratedUsers = Arrays.asList(
new User("David", 20, "blue"),
new User("Sue", 4, "red"),
new User("Alana", 13, "yellow"),
new User("Joe", 1, "pink"));

List<PlainOldUser> plainOldUsers = Arrays.asList(
private List<PlainOldUser> plainOldUsers = Arrays.asList(
new PlainOldUser("David", 20, "blue"),
new PlainOldUser("Sue", 4, "red"),
new PlainOldUser("Alana", 13, "yellow"),
Expand Down

0 comments on commit f26165e

Please sign in to comment.