Skip to content

Commit

Permalink
Refactor observers plugin system
Browse files Browse the repository at this point in the history
  • Loading branch information
pditommaso committed Aug 6, 2019
1 parent d2d6669 commit 9dab8f0
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 98 deletions.
107 changes: 9 additions & 98 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Expand Up @@ -60,14 +60,10 @@ import nextflow.script.ScriptFile
import nextflow.script.ScriptRunner
import nextflow.script.WorkflowMetadata
import nextflow.trace.AnsiLogObserver
import nextflow.trace.GraphObserver
import nextflow.trace.ReportObserver
import nextflow.trace.StatsObserver
import nextflow.trace.TimelineObserver
import nextflow.trace.TraceFileObserver
import nextflow.trace.TraceObserver
import nextflow.trace.TraceObserverFactory
import nextflow.trace.TraceRecord
import nextflow.trace.WebLogObserver
import nextflow.trace.WorkflowStats
import nextflow.util.Barrier
import nextflow.util.BlockingThreadExecutorFactory
Expand Down Expand Up @@ -236,9 +232,7 @@ class Session implements ISession {

boolean ansiLog

private AnsiLogObserver ansiLogObserver

AnsiLogObserver getAnsiLogObserver() { ansiLogObserver }
AnsiLogObserver ansiLogObserver

FilePorter getFilePorter() { filePorter }

Expand Down Expand Up @@ -386,105 +380,22 @@ class Session implements ISession {
*/
@PackageScope
List<TraceObserver> createObservers() {
def result = new ArrayList(10)

createStatsObserver(result) // keep as first, because following may depend on it
createTraceFileObserver(result)
createReportObserver(result)
createTimelineObserver(result)
createDagObserver(result)
createWebLogObserver(result)
createAnsiLogObserver(result)

return result
}

protected void createAnsiLogObserver(Collection<TraceObserver> result) {
if( ansiLog ) {
this.ansiLogObserver = new AnsiLogObserver()
result << ansiLogObserver
}
}

/**
* Create workflow message observer
* @param result
*/
protected void createWebLogObserver(Collection<TraceObserver> result) {
Boolean isEnabled = config.navigate('weblog.enabled') as Boolean
String url = config.navigate('weblog.url') as String
if (isEnabled) {
if ( !url ) url = WebLogObserver.DEF_URL
def observer = new WebLogObserver(url)
result << observer
}
}
def result = new ArrayList(10)

protected void createStatsObserver(Collection<TraceObserver> result) {
// stats is created as first because others may depend on it
final observer = new StatsObserver()
this.workflowStats = observer.stats
result << observer
}


/**
* Create workflow report file observer
*/
protected void createReportObserver(Collection<TraceObserver> result) {
Boolean isEnabled = config.navigate('report.enabled') as Boolean
if( isEnabled ) {
String fileName = config.navigate('report.file')
def maxTasks = config.navigate('report.maxTasks', ReportObserver.DEF_MAX_TASKS) as int
if( !fileName ) fileName = ReportObserver.DEF_FILE_NAME
def report = (fileName as Path).complete()
def observer = new ReportObserver(report)
observer.maxTasks = maxTasks

result << observer
for( TraceObserverFactory f : ServiceLoader.load(TraceObserverFactory) ) {
log.debug "Observer factory: ${f.class.simpleName}"
result.addAll(f.create(this))
}
}

/**
* Create timeline report file observer
*/
protected void createTimelineObserver(Collection<TraceObserver> result) {
Boolean isEnabled = config.navigate('timeline.enabled') as Boolean
if( isEnabled ) {
String fileName = config.navigate('timeline.file')
if( !fileName ) fileName = TimelineObserver.DEF_FILE_NAME
def traceFile = (fileName as Path).complete()
def observer = new TimelineObserver(traceFile)
result << observer
}
}

protected void createDagObserver(Collection<TraceObserver> result) {
Boolean isEnabled = config.navigate('dag.enabled') as Boolean
if( isEnabled ) {
String fileName = config.navigate('dag.file')
if( !fileName ) fileName = GraphObserver.DEF_FILE_NAME
def traceFile = (fileName as Path).complete()
def observer = new GraphObserver(traceFile)
result << observer
}
return result
}

/*
* create the execution trace observer
*/
protected void createTraceFileObserver(Collection<TraceObserver> result) {
Boolean isEnabled = config.navigate('trace.enabled') as Boolean
if( isEnabled ) {
String fileName = config.navigate('trace.file')
if( !fileName ) fileName = TraceFileObserver.DEF_FILE_NAME
def traceFile = (fileName as Path).complete()
def observer = new TraceFileObserver(traceFile)
config.navigate('trace.raw') { it -> observer.useRawNumbers(it == true) }
config.navigate('trace.sep') { observer.separator = it }
config.navigate('trace.fields') { observer.setFieldsAndFormats(it) }
result << observer
}
}

/*
* intercepts interruption signal i.e. CTRL+C
Expand Down Expand Up @@ -610,7 +521,7 @@ class Session implements ISession {
}

protected Map<String,Path> findBinEntries(Path path) {
def result = new LinkedHashMap(10)
Map<String,Path> result = new LinkedHashMap(10)
path
.listFiles { file -> Files.isExecutable(file) }
.each { Path file -> result.put(file.name,file) }
Expand Down
@@ -0,0 +1,116 @@
package nextflow.trace

import java.nio.file.Path

import nextflow.Session

/**
* Creates Nextflow observes object
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class DefaultObserverFactory implements TraceObserverFactory {

private Map config
private Session session

@Override
Collection<TraceObserver> create(Session session) {
this.session = session
this.config = session.config

final result = new ArrayList(10)
createTraceFileObserver(result)
createReportObserver(result)
createTimelineObserver(result)
createDagObserver(result)
createWebLogObserver(result)
createAnsiLogObserver(result)
return result
}

protected void createAnsiLogObserver(Collection<TraceObserver> result) {
if( session.ansiLog ) {
session.ansiLogObserver = new AnsiLogObserver()
result << session.ansiLogObserver
}
}

/**
* Create workflow message observer
* @param result
*/
protected void createWebLogObserver(Collection<TraceObserver> result) {
Boolean isEnabled = config.navigate('weblog.enabled') as Boolean
String url = config.navigate('weblog.url') as String
if ( isEnabled ) {
if ( !url ) url = WebLogObserver.DEF_URL
def observer = new WebLogObserver(url)
result << observer
}
}

/**
* Create workflow report file observer
*/
protected void createReportObserver(Collection<TraceObserver> result) {
Boolean isEnabled = config.navigate('report.enabled') as Boolean
if( !isEnabled )
return

String fileName = config.navigate('report.file')
def maxTasks = config.navigate('report.maxTasks', ReportObserver.DEF_MAX_TASKS) as int
if( !fileName ) fileName = ReportObserver.DEF_FILE_NAME
def report = (fileName as Path).complete()
def observer = new ReportObserver(report)
observer.maxTasks = maxTasks

result << observer
}

/**
* Create timeline report file observer
*/
protected void createTimelineObserver(Collection<TraceObserver> result) {
Boolean isEnabled = config.navigate('timeline.enabled') as Boolean
if( !isEnabled )
return

String fileName = config.navigate('timeline.file')
if( !fileName ) fileName = TimelineObserver.DEF_FILE_NAME
def traceFile = (fileName as Path).complete()
def observer = new TimelineObserver(traceFile)
result << observer
}

protected void createDagObserver(Collection<TraceObserver> result) {
Boolean isEnabled = config.navigate('dag.enabled') as Boolean
if( !isEnabled )
return

String fileName = config.navigate('dag.file')
if( !fileName ) fileName = GraphObserver.DEF_FILE_NAME
def traceFile = (fileName as Path).complete()
def observer = new GraphObserver(traceFile)
result << observer
}

/*
* create the execution trace observer
*/
protected void createTraceFileObserver(Collection<TraceObserver> result) {
Boolean isEnabled = config.navigate('trace.enabled') as Boolean
if( !isEnabled )
return

String fileName = config.navigate('trace.file')
if( !fileName ) fileName = TraceFileObserver.DEF_FILE_NAME
def traceFile = (fileName as Path).complete()
def observer = new TraceFileObserver(traceFile)
config.navigate('trace.raw') { it -> observer.useRawNumbers(it == true) }
config.navigate('trace.sep') { observer.separator = it }
config.navigate('trace.fields') { observer.setFieldsAndFormats(it) }
result << observer
}

}
@@ -0,0 +1,20 @@
package nextflow.trace

import nextflow.Session

/**
* Factory class creating {@link TraceObserver} instances
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
interface TraceObserverFactory {

/**
* Register the observer on the current session object
*
* @param session The current {@link nextflow.Session} instance
* @return One or more instances of {@link TraceObserver} objects
*/
Collection<TraceObserver> create(Session session)

}
@@ -0,0 +1,17 @@
#
# Copyright 2019, Seqera Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

nextflow.trace.DefaultObserverFactory

0 comments on commit 9dab8f0

Please sign in to comment.