Skip to content

Commit

Permalink
Issue #72. Enhanced collectFile sorting performance.
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Sep 21, 2015
1 parent 0946a41 commit 2d06125
Show file tree
Hide file tree
Showing 8 changed files with 1,645 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/main/groovy/nextflow/file/FileCollector.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ abstract class FileCollector implements Closeable {
return value.newInputStream()

if( value instanceof CharSequence )
return new ByteArrayInputStream(value.toString().getBytes())
return new FastByteArrayInputStream(value.toString().getBytes())

if( value instanceof byte[] )
return new ByteArrayInputStream(value as byte[])
return new FastByteArrayInputStream(value as byte[])

throw new IllegalArgumentException("Not a valid file collector argument [${value.class.name}]: $value")
}
Expand Down
173 changes: 173 additions & 0 deletions src/main/groovy/nextflow/file/SequentialFileStore.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright (c) 2013-2015, Centre for Genomic Regulation (CRG).
* Copyright (c) 2013-2015, Paolo Di Tommaso and the respective authors.
*
* This file is part of 'Nextflow'.
*
* Nextflow is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Nextflow is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Nextflow. If not, see <http://www.gnu.org/licenses/>.
*/

package nextflow.file

import java.nio.file.Path

import groovy.transform.CompileStatic

/**
* A buffered storage. When data written is greater than the local buffer teh content is save to a file.
*
* <p>
* Usage idiom:
*
* <code>
* def store = new SequentialFileStore(temp_file)
* store.writeInt(x)
* store.writeLong(y)
* :
* store.flip()
* int x = store.readInt()
* long y = store.readLong()
* :
* store.close()
*
* </code>
*
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class SequentialFileStore implements Closeable {

private static final int KB = 1024

private static final int DEFAULT_BUFFER_SIZE = 80 * KB

final private Path path

private int limit

private RandomAccessFile file

private DataOutputStream outputStream

private DataInputStream inputStream

private byte[] buffer

SequentialFileStore(Path path, int size = 0) {
this.path = path
this.limit = size ?: DEFAULT_BUFFER_SIZE
this.buffer = new byte[ size ?: this.limit]
this.file = new RandomAccessFile(path.toFile(), 'rw')
this.outputStream = new DataOutputStream(new FastBufferedOutputStream(new FileOutputStream(file.getFD()), buffer))
}


SequentialFileStore writeBool( boolean value ) {
outputStream.writeBoolean(value)
return this
}

SequentialFileStore writeByte( byte value ) {
outputStream.writeByte(value)
return this
}

SequentialFileStore writeChar( char value ) {
outputStream.writeChar(value as int)
return this
}

SequentialFileStore writeShort( short value ) {
outputStream.writeShort(value as short)
return this
}

SequentialFileStore writeInt( int v ) {
outputStream.writeInt(v)
return this
}

SequentialFileStore writeLong( long v ) {
outputStream.writeLong(v)
return this
}


SequentialFileStore writeBytes( byte[] bytes ) {
outputStream.write(bytes)
return this
}

void readBytes( byte[] bytes ) {
inputStream.read(bytes)
}

byte[] readBytes( int len ) {
def bytes = new byte[len]
inputStream.read(bytes)
return bytes
}

boolean readBool() {
inputStream.readBoolean()
}

byte readByte() {
inputStream.readByte()
}

char readChar() {
inputStream.readChar()
}

short readShort() {
inputStream.readShort()
}

int readInt() {
inputStream.readInt()
}

long readLong() {
inputStream.readLong()
}

long size() {
outputStream.size()
}

/**
* Turn the buffer in read mode
*/
void flip() {

if( outputStream.size() > limit ) {
// flush the content to the file
outputStream.flush()
file.seek(0)
inputStream = new DataInputStream(new FastBufferedInputStream(new FileInputStream(file.getFD()), buffer))
}
else {
// do not flush the buffer and use directly it's content
inputStream = new DataInputStream( new FastBufferedInputStream(new FastByteArrayInputStream(buffer, 0, outputStream.size())))
}

}

@Override
void close() throws IOException {
file.close()
}
}
67 changes: 42 additions & 25 deletions src/main/groovy/nextflow/file/SortFileCollector.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,12 @@ class SortFileCollector extends FileCollector implements Closeable {

private DB store;

private LevelDbSort<IndexEntry> index;
private LevelDbSort<IndexEntry> index

private Path fTempDir
private Path tempDir

private SequentialFileStore sortedIndex

private List directory

void setSort( def value ) {
if( value == null || value instanceof Closure || value instanceof Comparator )
Expand Down Expand Up @@ -174,8 +175,8 @@ class SortFileCollector extends FileCollector implements Closeable {
if( sliceMaxItems ) result.sliceMaxItems(sliceMaxItems)

index = result.create()
tempDir = getTempDir()

fTempDir = getTempDir()
}

private IndexSort createSortComparator() {
Expand Down Expand Up @@ -206,10 +207,10 @@ class SortFileCollector extends FileCollector implements Closeable {
return value.newInputStream()

if( value instanceof CharSequence )
return new ByteArrayInputStream(value.toString().getBytes())
return new FastByteArrayInputStream(value.toString().getBytes())

if( value instanceof byte[] )
return new ByteArrayInputStream(value as byte[])
return new FastByteArrayInputStream(value as byte[])

throw new IllegalArgumentException("Not a valid file collector argument [${value.class.name}]: $value")
}
Expand Down Expand Up @@ -279,24 +280,30 @@ class SortFileCollector extends FileCollector implements Closeable {
if( !index )
return null

def last = null

Hasher hasher = cacheable ? CacheHelper.hasher( hashKeys, CacheHelper.HashMode.STANDARD ) : null
if( hasher && log.isTraceEnabled() ) {
log.trace " hasher: ${CacheHelper.hasher( hashKeys, CacheHelper.HashMode.STANDARD ) } \n"
}
directory = new LinkedList()

// index route file
sortedIndex = new SequentialFileStore(tempDir.resolve('sort.index'))

def last = null
index.sort { IndexEntry entry ->
if( last != entry.group ) {
directory.add(-1)
directory.add(entry.group)
last = entry.group
def bytes = KryoHelper.serialize(last)
sortedIndex.writeLong( -bytes.length )
sortedIndex.writeBytes( bytes )
}
directory.add(entry.index)
sortedIndex.writeLong(entry.index)

if( hasher ) {
hasher = CacheHelper.hasher(hasher, entry.hash, CacheHelper.HashMode.STANDARD)
if( log.isTraceEnabled() )
log.trace " index: $entry.index - ${CacheHelper.hasher(entry.hash).hash()} \n"
if( log.isTraceEnabled() ) {
log.trace " index: $entry.index - ${CacheHelper.hasher(entry.hash).hash()} \n"
}
}

}
Expand All @@ -309,18 +316,28 @@ class SortFileCollector extends FileCollector implements Closeable {
*/
void saveFile( Closure<Path> closure ) {

if( !directory )
if( !sortedIndex )
return

def last = null
def name = null
OutputStream output = null

def itr = directory.iterator()
while( itr.hasNext() ) {
def item = itr.next()
if( item == -1 ) {
name = itr.next()
// turn the index buffer into 'read' mode
sortedIndex.flip()

while( true ) {
long item
try {
item = sortedIndex.readLong()
}
catch( EOFException e ) {
break
}

if( item < 0 ) {
def data = sortedIndex.readBytes((int)-item)
name = KryoHelper.deserialize(data)
continue
}

Expand All @@ -339,8 +356,8 @@ class SortFileCollector extends FileCollector implements Closeable {
/*
* write the 'seed' value
*/
if( seed instanceof Map ) {
if( ((Map)seed).containsKey(name) ) appendStream(normalizeToStream(((Map)seed).get(name)), output)
if( seed instanceof Map && ((Map)seed).containsKey(name) ) {
appendStream(normalizeToStream(((Map)seed).get(name)), output)
}
else if( seed ) {
appendStream(normalizeToStream(seed), output)
Expand All @@ -350,8 +367,7 @@ class SortFileCollector extends FileCollector implements Closeable {
/*
* add the current value
*/
def index = (long)item
def bytes = (byte[])store.get(bytes(index))
def bytes = (byte[])store.get(bytes(item))
def val = KryoHelper.deserialize(bytes)
appendStream(normalizeToStream(val), output)
}
Expand All @@ -368,12 +384,13 @@ class SortFileCollector extends FileCollector implements Closeable {
log.trace "Closing sorting dbs"
store?.closeQuietly()
index?.closeQuietly()
sortedIndex?.closeQuietly()

// finally invoke the the parent close
super.close()

if( deleteTempFilesOnClose && fTempDir ) {
fTempDir.deleteDir()
if( deleteTempFilesOnClose && tempDir ) {
tempDir.deleteDir()
}
else {
log.debug "FileCollector temp dir not removed: $tempDir"
Expand Down
Loading

0 comments on commit 2d06125

Please sign in to comment.