Skip to content

Commit

Permalink
Merge pull request #250 from chsfleury/master
Browse files Browse the repository at this point in the history
java 8 stream support
  • Loading branch information
fmbenhassine committed Oct 20, 2016
2 parents 8d7517e + 5e63c30 commit cbac281
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 0 deletions.
104 changes: 104 additions & 0 deletions easybatch-extensions/easybatch-stream/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>easybatch-extensions</artifactId>
<groupId>org.easybatch</groupId>
<version>5.0.0-RC3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>easybatch-stream</artifactId>

<name>easybatch-stream</name>
<description>Extension to support java 8 streams</description>
<url>http://www.easybatch.org</url>

<properties>
<java.version>1.8</java.version>
</properties>

<developers>
<developer>
<id>benas</id>
<name>Mahmoud Ben Hassine</name>
<url>http://benas.github.io</url>
<email>mahmoud.benhassine@icloud.com</email>
<roles>
<role>Project founder</role>
</roles>
</developer>
<developer>
<id>chsfleury</id>
<name>Charles Fleury</name>
<url>https://github.com/chsfleury</url>
<email>chs.fleury@gmail.com</email>
<roles>
<role>Contributor</role>
</roles>
</developer>
</developers>

<scm>
<url>git@github.com:easybatch/easybatch-framework.git</url>
<connection>scm:git:git@github.com:easybatch/easybatch-framework.git</connection>
<developerConnection>scm:git:git@github.com:easybatch/easybatch-framework.git</developerConnection>
<tag>HEAD</tag>
</scm>

<issueManagement>
<system>GitHub</system>
<url>https://github.com/easybatch/easybatch-framework/issues</url>
</issueManagement>

<ciManagement>
<system>Travis CI</system>
<url>https://travis-ci.org/EasyBatch/easybatch-framework</url>
</ciManagement>

<licenses>
<license>
<name>MIT License</name>
<url>http://opensource.org/licenses/mit-license.php</url>
</license>
</licenses>

<dependencies>
<dependency>
<groupId>org.easybatch</groupId>
<artifactId>easybatch-core</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<verbose>true</verbose>
<source>${java.version}</source>
<target>${java.version}</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.easybatch.extensions.stream;

import org.easybatch.core.reader.RecordReader;
import org.easybatch.core.record.Record;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Stream;

import static java.nio.file.Files.list;
import static java.nio.file.Files.walk;

/**
* @author Charles Fleury
* @since 17/10/16.
*/
public class PathRecordReader implements RecordReader {

private Path directory;
private boolean recursive;
private StreamRecordReader<Path> delegate;

/**
* Create a {@link PathRecordReader} to read record from java 8 stream.
*
* @param directory to read files from
*/
public PathRecordReader(final Path directory) {
this(directory, false);
}

/**
* Create a {@link PathRecordReader} to read record from java 8 stream.
*
* @param directory to read files from
* @param recursive if the reader should be recursive
*/
public PathRecordReader(final Path directory, final boolean recursive) {
this.directory = directory;
this.recursive = recursive;
}

/**
* Open the reader.
*/
@Override
public void open() throws Exception {
try {
Stream<Path> stream;
if (recursive) {
stream = walk(directory);
} else {
stream = list(directory);
}
delegate = new StreamRecordReader<>(stream.filter(Files::isRegularFile), directory.toAbsolutePath().toString());
delegate.open();
} catch (IOException e) {
throw new IllegalArgumentException("cannot read directory", e);
}
}

/**
* Read next record from the data source.
*
* @return the next record from the data source.
*/
@Override
public Record<Path> readRecord() throws Exception {
return delegate.readRecord();
}

/**
* Close the reader.
*/
@Override
public void close() throws Exception {
if(delegate != null) {
delegate.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.easybatch.extensions.stream;

import org.easybatch.core.reader.RecordReader;
import org.easybatch.core.record.GenericRecord;
import org.easybatch.core.record.Header;
import org.easybatch.core.record.Record;

import java.util.Date;
import java.util.Iterator;
import java.util.stream.Stream;

/**
* @author Charles Fleury
* @since 17/10/16.
*/
public class StreamRecordReader<T> implements RecordReader {

private static final String DEFAULT_DATASOURCE_NAME = "DATASOURCE";

protected String datasource;
protected Stream<T> stream;
protected Iterator<T> iterator;
protected long currentRecordNumber;

/**
* Create a {@link StreamRecordReader} to read record from java 8 stream.
*
* @param stream to read record from
* @param datasource name (default to DEFAULT_DATASOURCE_NAME)
*/
public StreamRecordReader(final Stream<T> stream, final String datasource) {
this.stream = stream;
this.datasource = datasource;
}

/**
* Open the reader.
*/
@Override
public void open() throws Exception {
if(stream == null) {
throw new IllegalArgumentException("stream must not be null");
}

if(datasource == null || datasource.isEmpty()) {
datasource = DEFAULT_DATASOURCE_NAME;
}

currentRecordNumber = 0;
iterator = stream.iterator();
}

/**
* Read next record from the data source.
*
* @return the next record from the data source.
*/
@Override
public Record<T> readRecord() throws Exception {
if(iterator.hasNext()) {
Header header = new Header(++currentRecordNumber, datasource, new Date());
return new GenericRecord<>(header, iterator.next());
} else {
return null;
}
}

/**
* Close the reader.
*/
@Override
public void close() throws Exception {
// no op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* The MIT License
*
* Copyright (c) 2016, Charles Fleury (chs.fleury@gmail.com)
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

/**
* This package contains java 8 stream support classes.
*/
package org.easybatch.extensions.stream;
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.easybatch.extensions.stream;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @author Charles Fleury
* @since 17/10/16.
*/
public class PathRecordReaderTest {

private PathRecordReader pathRecordReader;

private Path emptyDirectory;

@Before
public void setUp() throws Exception {
emptyDirectory = Files.createTempDirectory("easybatch_test");
}

@Test
public void whenDirectoryIsNotEmpty_thenThereShouldBeANextRecordToRead() throws Exception {
pathRecordReader = new PathRecordReader(Paths.get("src/main/java/org/easybatch/extensions/stream"));
pathRecordReader.open();
assertThat(pathRecordReader.readRecord()).isNotNull();
}

@Test
public void whenDirectoryIsNotEmpty_thenThereShouldBeANextRecordToRead_recursive() throws Exception {
pathRecordReader = new PathRecordReader(Paths.get("src/main/java/org/easybatch/extensions"), true);
pathRecordReader.open();
assertThat(pathRecordReader.readRecord()).isNotNull();
}

@Test
public void whenDirectoryIsNotEmptyButDirs_thenThereShouldBeNoNextRecordToRead_not_recursive() throws Exception {
pathRecordReader = new PathRecordReader(Paths.get("src/main/java/org/easybatch/extensions"), false);
pathRecordReader.open();
assertThat(pathRecordReader.readRecord()).isNull();
}

@Test
public void whenDirectoryIsEmpty_thenThereShouldBeNoNextRecordToRead() throws Exception {
pathRecordReader = new PathRecordReader(emptyDirectory);
pathRecordReader.open();
assertThat(pathRecordReader.readRecord()).isNull();
}

@Test(expected = IllegalArgumentException.class)
public void whenDirectoryDoesNotExist_thenShouldThrowAnIllegalArgumentException() throws Exception {
pathRecordReader = new PathRecordReader(Paths.get("src/main/java/ImSureThisDirectoryDoesNotExist"));
pathRecordReader.open();
}

@After
public void tearDown() throws Exception {
pathRecordReader.close();
Files.delete(emptyDirectory);
}


}
Loading

0 comments on commit cbac281

Please sign in to comment.