Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions modules/nf-commons/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,9 @@ dependencies {
/* testImplementation inherited from top gradle build file */
testImplementation(testFixtures(project(":nextflow")))
testFixturesImplementation(project(":nextflow"))

testImplementation "org.apache.groovy:groovy-json:4.0.26" // needed by wiremock
testImplementation ('com.github.tomakehurst:wiremock:3.0.0-beta-1') { exclude module: 'groovy-all' }
testImplementation ('com.github.tomjankes:wiremock-groovy:0.2.0') { exclude module: 'groovy-all' }
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package nextflow.plugin

import com.google.gson.Gson
import dev.failsafe.Failsafe
import dev.failsafe.FailsafeExecutor
import dev.failsafe.Fallback
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.BuildInfo
import org.pf4j.PluginRuntimeException
import org.pf4j.update.FileDownloader
import org.pf4j.update.FileVerifier
import org.pf4j.update.PluginInfo
import org.pf4j.update.SimpleFileDownloader
import org.pf4j.update.verifier.CompoundVerifier

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse

/**
* Represents an update repository served via an HTTP api.
*
* It implements PrefetchUpdateRepository so that all relevant
* plugin metadata can be loaded with a single HTTP request, rather
* than a request-per-plugin.
*
* Metadata is prefetched into memory when Nextflow starts and expires
* upon termination (or when 'refresh()' is called).
*/
@Slf4j
@CompileStatic
class HttpPluginRepository implements PrefetchUpdateRepository {
private final HttpClient client = HttpClient.newHttpClient()
private final String id
private final URI url

private Map<String, PluginInfo> plugins = new HashMap<>()

HttpPluginRepository(String id, URI url) {
this.id = id
// ensure url ends with a slash
this.url = !url.toString().endsWith("/")
? URI.create(url.toString() + "/")
: url
}

// NOTE ON PREFETCHING
//
// The prefetch mechanism is used to work around a limitation in the
// UpdateRepository interface from pf4j.
//
// Specifically, p4fj expects that getPlugins() returns a Map<> of all
// metadata about all plugins. To implement this for an HTTP repository
// would require either downloading the entire contents of the remote
// repository or implementing a lazy map and making an HTTP request for
// each required plugin.
//
// Instead we can use the list of configured plugins to load all relevant
// metadata in a single HTTP request at startup, and use this to populate
// the map. Once the prefetch is complete, this repository will behave
// like any other implementation of UpdateRepository.
@Override
void prefetch(List<PluginSpec> plugins) {
if (plugins && !plugins.isEmpty()) {
this.plugins = fetchMetadata(plugins)
}
}

@Override
String getId() {
return id
}

@Override
URL getUrl() {
return url.toURL()
}

@Override
Map<String, PluginInfo> getPlugins() {
if (plugins.isEmpty()) {
log.warn "getPlugins() called before prefetch() - plugins map will be empty"
return Map.of()
}
return Collections.unmodifiableMap(plugins)
}

@Override
PluginInfo getPlugin(String id) {
return plugins.computeIfAbsent(id) { key -> fetchMetadataByIds([key]).get(key) }
}

@Override
void refresh() {
plugins = fetchMetadataByIds(plugins.keySet())
}

@Override
FileDownloader getFileDownloader() {
return new SimpleFileDownloader()
}

@Override
FileVerifier getFileVerifier() {
return new CompoundVerifier()
}

// ----------------------------------------------------------------------------
// http handling

private Map<String, PluginInfo> fetchMetadataByIds(Collection<String> ids) {
def specs = ids.collect(id -> new PluginSpec(id, null))
return fetchMetadata(specs)
}

private Map<String, PluginInfo> fetchMetadata(Collection<PluginSpec> specs) {
final ordered = specs.sort(false)
final CheckedSupplier<Map<String, PluginInfo>> supplier = () -> fetchMetadata0(ordered)
return retry().get(supplier)
}

private Map<String, PluginInfo> fetchMetadata0(List<PluginSpec> specs) {
final gson = new Gson()

def reqBody = gson.toJson([
'nextflowVersion': BuildInfo.version,
'plugins' : specs
])

def req = HttpRequest.newBuilder()
.uri(url.resolve("plugins/collect"))
.POST(HttpRequest.BodyPublishers.ofString(reqBody))
.build()

def rep = client.send(req, HttpResponse.BodyHandlers.ofString())
if (rep.statusCode() != 200) throw new PluginRuntimeException(errorMessage(rep, gson))

try {
def repBody = gson.fromJson(rep.body(), FetchResponse)
return repBody.plugins.collectEntries { p -> Map.entry(p.id, p) }
} catch (Exception e) {
log.info("Plugin metadata response body: '${rep.body()}'")
throw new PluginRuntimeException("Failed to parse response body", e)
}
}

// create a retry executor using failsafe
private static FailsafeExecutor retry() {
EventListener<ExecutionAttemptedEvent> logAttempt = (ExecutionAttemptedEvent attempt) -> {
log.debug("Retrying download of plugins metadata - attempt ${attempt.attemptCount}, ${attempt.lastFailure.message}", attempt.lastFailure)
}
Fallback fallback = Fallback.ofException { e ->
e.lastFailure instanceof ConnectException
? new ConnectException("Failed to download plugins metadata")
: new PluginRuntimeException("Failed to download plugin metadata: ${e.lastFailure.message}")
}
final policy = RetryPolicy.builder()
.withMaxAttempts(3)
.handle(ConnectException)
.onRetry(logAttempt)
.build()
return Failsafe.with(fallback, policy)
}

private static String errorMessage(HttpResponse<String> rep, Gson gson) {
try {
def err = gson.fromJson(rep.body(), ErrorResponse)
return "${err.type} - ${err.message}"
} catch (Exception e) {
return rep.body()
}
}

// ---------------------

/**
* Response format object expected from repository
*/
private static class FetchResponse {
List<PluginInfo> plugins
}

private static class ErrorResponse {
String type
String message
}
}
20 changes: 19 additions & 1 deletion modules/nf-commons/src/main/nextflow/plugin/PluginUpdater.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ class PluginUpdater extends UpdateManager {
result.add(new LocalUpdateRepository('downloaded', local))
}
else {
result.add(new DefaultUpdateRepository('nextflow.io', remote))
def remoteRepo = remote.path.endsWith('.json')
? new DefaultUpdateRepository('nextflow.io', remote)
: new HttpPluginRepository('registry', remote.toURI())

result.add(remoteRepo)
result.addAll(customRepos())
}
return result
Expand Down Expand Up @@ -138,6 +142,20 @@ class PluginUpdater extends UpdateManager {
return new DefaultUpdateRepository('uri', new URL(uri), fileName)
}

/**
* Prefetch metadata for plugins. This gives an opportunity for certain
* repository types to perform some data-loading optimisations.
*/
void prefetchMetadata(List<PluginSpec> plugins) {
// use direct field access to avoid the refresh() call in getRepositories()
// which could fail anything which hasn't had a chance to prefetch yet
for( def repo : this.@repositories ) {
if( repo instanceof PrefetchUpdateRepository ) {
repo.prefetch(plugins)
}
}
}

/**
* Resolve a plugin installing or updating the dependencies if necessary
* and start the plugin
Expand Down
5 changes: 4 additions & 1 deletion modules/nf-commons/src/main/nextflow/plugin/Plugins.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import org.pf4j.PluginManager
@CompileStatic
class Plugins {

public static final String DEFAULT_PLUGINS_REPO = 'https://raw.githubusercontent.com/nextflow-io/plugins/main/plugins.json'
// this is deprecated and should not be used to avoid accessing this static attribute
// cause the instantiation of the PluginsFacade class
@Deprecated
public static final String DEFAULT_PLUGINS_REPO = PluginsFacade.DEFAULT_PLUGINS_REPO

private final static PluginsFacade INSTANCE = new PluginsFacade()

Expand Down
98 changes: 72 additions & 26 deletions modules/nf-commons/src/main/nextflow/plugin/PluginsFacade.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.file.Paths

import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.SysEnv
import nextflow.exception.AbortOperationException
Expand All @@ -41,6 +42,9 @@ import org.pf4j.PluginStateListener
@CompileStatic
class PluginsFacade implements PluginStateListener {

@PackageScope
final static String DEFAULT_PLUGINS_REPO = 'https://raw.githubusercontent.com/nextflow-io/plugins/main/plugins.json'

private static final String DEV_MODE = 'dev'
private static final String PROD_MODE = 'prod'
private Map<String,String> env = SysEnv.get()
Expand All @@ -51,22 +55,25 @@ class PluginsFacade implements PluginStateListener {
private PluginUpdater updater
private CustomPluginManager manager
private DefaultPlugins defaultPlugins = DefaultPlugins.INSTANCE
private String indexUrl = Plugins.DEFAULT_PLUGINS_REPO
private String indexUrl
private boolean embedded

PluginsFacade() {
mode = getPluginsMode()
root = getPluginsDir()
indexUrl = getPluginsIndexUrl()
offline = env.get('NXF_OFFLINE') == 'true'
if( mode==DEV_MODE && root.toString()=='plugins' && !isRunningFromDistArchive() )
root = detectPluginsDevRoot()
System.setProperty('pf4j.mode', mode)
}

PluginsFacade(Path root, String mode=PROD_MODE, boolean offline=false) {
PluginsFacade(Path root, String mode=PROD_MODE, boolean offline=false,
String indexUrl=DEFAULT_PLUGINS_REPO) {
this.mode = mode
this.root = root
this.offline = offline
this.indexUrl = indexUrl
System.setProperty('pf4j.mode', mode)
}

Expand Down Expand Up @@ -96,6 +103,45 @@ class PluginsFacade implements PluginStateListener {
}
}

static protected boolean isSupportedIndex(String url) {
if( !url ) {
throw new IllegalArgumentException("Missing plugins registry URL")
}
if( !url.startsWith('https://') && !url.startsWith('http://') ) {
throw new IllegalArgumentException("Plugins registry URL must start with 'http://' or 'https://': $url")
}
if( url == DEFAULT_PLUGINS_REPO ) {
return true
}
final hostname = URI.create(url).authority
return hostname.endsWith('.nextflow.io')
|| hostname.endsWith('.nextflow.com')
|| hostname.endsWith('.seqera.io')
}

protected String getPluginsIndexUrl() {
final url = env.get('NXF_PLUGINS_INDEX_URL')
if( !url ) {
log.trace "Using default plugins url"
return DEFAULT_PLUGINS_REPO
}
log.debug "Detected NXF_PLUGINS_INDEX_URL=$url"
if( !isSupportedIndex(url) ) {
// warn that this is experimental behaviour
log.warn """\
=======================================================================
= WARNING =
= This workflow run us using an unofficial plugins registry. =
= =
= ${url}
= =
= Its usage is unsupported and not recommended for production workloads. =
=============================================================================
""".stripIndent(true)
}
return url
}

private boolean isNextflowDevRoot(File file) {
file.name=='nextflow' && file.isDirectory() && new File(file, 'settings.gradle').isFile()
}
Expand Down Expand Up @@ -324,33 +370,33 @@ class PluginsFacade implements PluginStateListener {
new DefaultPluginManager()
}

void start( String pluginId ) {
if( !isAllowed(pluginId) ) {
throw new AbortOperationException("Refuse to use plugin '$pluginId' - allowed plugins are: ${allowedPluginsString()}")
}
if( isEmbedded() && defaultPlugins.hasPlugin(pluginId) ) {
log.debug "Plugin 'start' is not required in embedded mode -- ignoring for plugin: $pluginId"
return
}

start(PluginSpec.parse(pluginId, defaultPlugins))
}

void start(PluginSpec plugin) {
if( !isAllowed(plugin) ) {
throw new AbortOperationException("Refuse to use plugin '$plugin' -- allowed plugins are: ${allowedPluginsString()}")
}
if( isEmbedded() && defaultPlugins.hasPlugin(plugin.id) ) {
log.debug "Plugin 'start' is not required in embedded mode -- ignoring for plugin: $plugin.id"
return
}

updater.prepareAndStart(plugin.id, plugin.version)
void start(String pluginId) {
start(List.of(PluginSpec.parse(pluginId, defaultPlugins)))
}

void start(List<PluginSpec> specs) {
for( PluginSpec it : specs ) {
start(it)
// check if the plugins are allowed to start
final disallow = specs.find(it-> !isAllowed(it))
if( disallow ) {
throw new AbortOperationException("Refuse to use plugin '${disallow.id}' - allowed plugins are: ${allowedPluginsString()}")
}
// when running in embedded mode, default plugins should not be started
// the following split partition the "specs" collection in to list
// - the first holding the plugins for which "start" is not required
// - the second all remaining plugins that requires a start invocation
final split = specs.split(plugin -> isEmbedded() && defaultPlugins.hasPlugin(plugin.id))
final skippable = split[0]
final startable = split[1]
// just report a debug line for the skipped ones
if( skippable ) {
final skippedIds = skippable.collect{ plugin -> plugin.id }
log.debug "Plugin 'start' is not required in embedded mode -- ignoring for plugins: $skippedIds"
}
// prefetch the plugins meta
updater.prefetchMetadata(startable)
// finally start the plugins
for( PluginSpec plugin : startable ) {
updater.prepareAndStart(plugin.id, plugin.version)
}
}

Expand Down
Loading