Skip to content

Commit

Permalink
Add join options failOnDuplicate and failOnMismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Jun 29, 2020
1 parent 4ba5ac1 commit 387366a
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 18 deletions.
2 changes: 2 additions & 0 deletions docs/operator.rst
Expand Up @@ -1164,6 +1164,8 @@ by The index (zero based) of the element to be used as grouping key
A key composed by multiple elements can be defined specifying a list of indices e.g. ``by: [0,2]``
remainder When ``false`` incomplete tuples (i.e. with less than `size` grouped items)
are discarded (default). When ``true`` incomplete tuples are emitted as the ending emission.
failOnDuplicate An error is reported when the same key is found more than once.
failOnMismatch An error is reported when a channel emits a value for which there isn't a corresponding element in the joining channel. This option cannot be used with ``remainder``.
=============== ========================


Expand Down
6 changes: 3 additions & 3 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Expand Up @@ -597,7 +597,7 @@ class Session implements ISession {
try {
log.trace "Session > destroying"
if( !aborted ) {
allOperatorsJoin()
joinAllOperators()
log.trace "Session > after processors join"
}

Expand Down Expand Up @@ -626,7 +626,7 @@ class Session implements ISession {
}
}

final private allOperatorsJoin() {
final protected void joinAllOperators() {
int attempts=0

while( allOperators.size() ) {
Expand Down Expand Up @@ -712,7 +712,7 @@ class Session implements ISession {
log.debug(status)
// force termination
notifyError(null)
executorFactory.signalExecutors()
executorFactory?.signalExecutors()
processesBarrier.forceTermination()
monitorsBarrier.forceTermination()
operatorsForceTermination()
Expand Down
2 changes: 1 addition & 1 deletion modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy
Expand Up @@ -292,7 +292,7 @@ class DAG {

@PackageScope
void resolveEdgeNames() {
for( Edge edge : edges ) {
for( Edge edge : new ArrayList<>(edges) ) {
final name = lookupVariable(edge.channel)
if( name )
edge.label = name
Expand Down
94 changes: 81 additions & 13 deletions modules/nextflow/src/main/groovy/nextflow/extension/JoinOp.groovy
Expand Up @@ -16,6 +16,8 @@

package nextflow.extension

import static nextflow.extension.DataflowHelper.*

import java.util.concurrent.atomic.AtomicInteger

import groovy.transform.CompileStatic
Expand All @@ -24,8 +26,9 @@ import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
import nextflow.NF
import nextflow.exception.AbortOperationException
import nextflow.util.CheckHelper
import static nextflow.extension.DataflowHelper.addToList
/**
* Implements {@link OperatorEx#join} operator logic
*
Expand All @@ -35,7 +38,7 @@ import static nextflow.extension.DataflowHelper.addToList
@CompileStatic
class JoinOp {

static final private Map JOIN_PARAMS = [remainder: Boolean, by: [List,Integer]]
static final private Map JOIN_PARAMS = [remainder: Boolean, by: [List,Integer], failOnMismatch: Boolean, failOnDuplicate: Boolean]

private DataflowReadChannel source

Expand All @@ -47,12 +50,25 @@ class JoinOp {

private Boolean[] singleton = [null,null] as Boolean[]

private boolean failOnMismatch

private boolean failOnDuplicate

private volatile boolean failed

private Set uniqueKeys = new LinkedHashSet()

JoinOp( DataflowReadChannel source, DataflowReadChannel target, Map params = null ) {
CheckHelper.checkParams('join', params, JOIN_PARAMS)
this.source = source
this.target = target
this.pivot = parsePivot(params?.by)
this.remainder = params?.remainder ? params.remainder as boolean : false
this.failOnMismatch = params?.failOnMismatch || (!this.remainder && NF.isStrictMode())
this.failOnDuplicate = params?.failOnDuplicate || NF.isStrictMode()
// sanity check
if( remainder && failOnMismatch )
throw new AbortOperationException("Conflicting join operator options `remainder` and `failOnMismatch`")
}

private List<Integer> parsePivot(value) {
Expand Down Expand Up @@ -96,18 +112,31 @@ class JoinOp {
final Map<String,Closure> result = new HashMap<>(2)

result.onNext = {
synchronized (buffer) {
def entries = join0(buffer, size, index, it)
if( entries ) {
target.bind( entries.size()==1 ? entries[0] : entries )
synchronized (this) {
if(!failed) try {
def entries = join0(buffer, size, index, it)
if( entries ) {
target.bind( entries.size()==1 ? entries[0] : entries )
}
}
catch (Exception e) {
failed = true
target << Channel.STOP
throw e
}
}}

result.onComplete = {
if( stopCount.decrementAndGet()==0) {
if( remainder )
remainder0(buffer,size,target)
target << Channel.STOP
if( stopCount.decrementAndGet()==0 && !failed ) {
try {
if( remainder )
remainder0(buffer,size,target)
if( failOnMismatch )
checkForMismatch(buffer)
}
finally {
target << Channel.STOP
}
}}

return result
Expand Down Expand Up @@ -145,7 +174,7 @@ class JoinOp {
final item0 = DataflowHelper.makeKey(pivot, data)

// given a key we expect to receive on object with the same key on each channel
def channels = buffer.get(item0.keys)
Map<Integer,List> channels = buffer.get(item0.keys)
if( channels==null ) {
channels = new TreeMap<Integer, List>()
buffer[item0.keys] = channels
Expand All @@ -154,6 +183,7 @@ class JoinOp {
if( !channels.containsKey(index) ) {
channels[index] = []
}

def entries = channels[index]

// add the received item to the list
Expand Down Expand Up @@ -183,6 +213,7 @@ class JoinOp {
}
}

checkForDuplicate(item0.keys, result)
return result
}

Expand Down Expand Up @@ -210,15 +241,52 @@ class JoinOp {
}
}

if( fill )
target.bind( singleton() ? result[0] : result )
if( fill ) {
final value = singleton() ? result[0] : result
checkForDuplicate(key,value)
target.bind(value)
}
else
break
}

}
}

protected void checkForDuplicate( key, value ) {
if( failOnDuplicate && !uniqueKeys.add(key) )
throw new AbortOperationException("Detected join operation duplicate element -- offending key=${csv0(key,',')}; value=${csv0(value,',')}")
}

protected void checkForMismatch( Map<Object,Map<Integer,List>> buffers ) {
final result = new HashMap<Object,List>()
for( Object key : buffers.keySet() ) {
Map<Integer,List> el = buffers.get(key)
final reminder = el.entrySet()
if( !reminder )
continue

result[key] = []
for( Map.Entry entry : reminder ) {
result[key].add(csv0(entry.value,','))
}
}

if( result ) {
final list = result.take(10).collect { k,v -> "key=${csv0(k,';')} values=${csv0(v,';')}" }
final str = result.size() == 1 ? list[0] : list.collect { "\n- $it" }.join(' ')
def message = "Join mismatch for the following entries: $str"
if( list.size()!=result.size() )
message += '\n(more omitted)'
throw new AbortOperationException(message)
}
}


private String csv0(value, String sep) {
value instanceof List ? value.join(sep) : value.toString()
}

private boolean singleton(int i=-1) {
if(i==-1)
return singleton[0]!=false && singleton[1]!=false
Expand Down
131 changes: 130 additions & 1 deletion modules/nextflow/src/test/groovy/nextflow/extension/JoinOpTest.groovy
Expand Up @@ -16,10 +16,12 @@

package nextflow.extension

import groovyx.gpars.dataflow.DataflowReadChannel
import nextflow.Channel
import nextflow.Global
import nextflow.Session
import nextflow.exception.AbortOperationException
import spock.lang.Specification

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Expand Down Expand Up @@ -205,4 +207,131 @@ class JoinOpTest extends Specification {

}


def 'should not fail on mismatches' () {
given:
def ch1 = (DataflowReadChannel) Channel.of(['X', 1], ['Y', 2])
def ch2 = (DataflowReadChannel) Channel.of(['X', 6], ['Y', 5])

when:
def op = new JoinOp(ch1, ch2, [failOnMismatch:true])
def result = op.apply().toList().getVal()
then:
result.size() == 2
result.contains( ['X', 1, 6] )
result.contains( ['Y', 2, 5] )
}

def 'should should fail on mismatches' () {
given:
def ch1 = (DataflowReadChannel) Channel.of(['X', 1])
def ch2 = (DataflowReadChannel) Channel.of(['X', 6], ['Y', 5])
and:
def sess = Global.session as Session

when:
def op = new JoinOp(ch1, ch2, [failOnMismatch:true])
def result = op.apply().toList().getVal()
and:
await(sess)
then:
sess.isAborted()
sess.getError().message == 'Join mismatch for the following entries: key=Y values=[5]'
}

def 'should format error message '() {
given:
def op = new JoinOp(Mock(DataflowReadChannel), Mock(DataflowReadChannel), [:])

when:
op.checkForMismatch([:])
then:
noExceptionThrown()

when:
def buffer1 = [
X: [:],
Y: [(1): ['a','b']] ]
op.checkForMismatch(buffer1)
then:
def e1 = thrown(AbortOperationException)
e1.message == 'Join mismatch for the following entries: key=Y values=a,b'

when:
def buffer2 = [
X: [(0): ['foo']],
Y: [(1): ['a','b']] ]
op.checkForMismatch(buffer2)
then:
def e2 = thrown(AbortOperationException)
e2.message == 'Join mismatch for the following entries: \n- key=X values=foo \n- key=Y values=a,b'
}

def 'should not fail on duplicate matches' () {
given:
def ch1 = (DataflowReadChannel) Channel.of(['X', 1], ['X', 3])
def ch2 = (DataflowReadChannel) Channel.of(['X', 2], ['X', 4])

when:
def op = new JoinOp(ch1, ch2, [:])
def result = op.apply().toList().getVal()
then:
result.size() == 2
result.contains( ['X', 1, 2] )
result.contains( ['X', 3, 4] )
}

def 'should fail on duplicate matches' () {
given:
def ch1 = (DataflowReadChannel) Channel.of(['X', 1], ['X', 3], ['X', 5])
def ch2 = (DataflowReadChannel) Channel.of(['X', 2], ['X', 4], ['X', 6])
and:
def sess = Global.session as Session

when:
def op = new JoinOp(ch1, ch2, [failOnDuplicate:true])
def result = op.apply().toList().getVal()
println "result=$result"
and:
await(sess)
then:
sess.isAborted()
sess.getError().message.startsWith('Detected join operation duplicate element -- offending key=X')
}

def 'should fail on duplicate with reminder' () {
given:
def ch1 = (DataflowReadChannel) Channel.of(['X', 1], ['X', 3])
def ch2 = (DataflowReadChannel) Channel.of(['X', 2])
and:
def sess = Global.session as Session

when:
def op = new JoinOp(ch1, ch2, [failOnDuplicate:true, remainder: true])
def result = op.apply().toList().getVal()
and:
await(sess)
then:
sess.isAborted()
sess.getError().message.startsWith('Detected join operation duplicate element -- offending key=X')
}

def 'should not fail on duplicate without reminder' () {
given:
def ch1 = (DataflowReadChannel) Channel.of(['X', 1], ['X', 3])
def ch2 = (DataflowReadChannel) Channel.of(['X', 2])
and:

when:
def op = new JoinOp(ch1, ch2, [failOnDuplicate:true])
def result = op.apply().toList().getVal()
then:
result == [ ['X',1,2] ]
}

protected void await(Session session) {
def begin = System.currentTimeMillis()
while( !session.isAborted() && System.currentTimeMillis()-begin<5_000 )
sleep 100
}
}

0 comments on commit 387366a

Please sign in to comment.