Skip to content

Commit

Permalink
Add cloudcache plugin (#4097)
Browse files Browse the repository at this point in the history
The nf-cloudcache plugin allows persisting the nextflow cache metadata 
into a cloud object storage instead of using the embedded  leveldb engine

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Co-authored-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
bentsherman and pditommaso committed Jul 20, 2023
1 parent bda4756 commit ac90cc2
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 10 deletions.
5 changes: 5 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,11 @@ The following environment variables control the configuration of the Nextflow ru
`NXF_ASSETS`
: Defines the directory where downloaded pipeline repositories are stored (default: `$NXF_HOME/assets`)

`NXF_CLOUDCACHE_PATH`
: :::{versionadded} 23.07.0-edge
:::
: Defines the base cache path when using the cloud cache store.

`NXF_CHARLIECLOUD_CACHEDIR`
: Directory where remote Charliecloud images are stored. When using a computing cluster it must be a shared folder accessible from all compute nodes.

Expand Down
1 change: 1 addition & 0 deletions docs/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Currently the following functionalities are implemented as plugin components and
- `nf-console`: Implement Nextflow [REPL console](https://www.nextflow.io/blog/2015/introducing-nextflow-console.html).
- `nf-ga4gh`: Support [GA4GH APIs](https://www.ga4gh.org/).
- `nf-google`: Support for Google cloud.
- `nf-cloudcache`: Support for Nextflow cache in object storage.
- `nf-tower`: Support for [Nextflow Tower](https://tower.nf) platform.

## Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.file.Path

import groovy.transform.CompileStatic
import nextflow.exception.AbortOperationException
import nextflow.plugin.Priority

/**
* Implements the default cache factory
Expand All @@ -30,6 +31,7 @@ import nextflow.exception.AbortOperationException
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
@Priority(0)
class DefaultCacheFactory extends CacheFactory {

@Override
Expand Down
41 changes: 41 additions & 0 deletions plugins/nf-cloudcache/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
apply plugin: 'groovy'

sourceSets {
main.java.srcDirs = []
main.groovy.srcDirs = ['src/main']
main.resources.srcDirs = ['src/resources']
test.groovy.srcDirs = ['src/test']
test.java.srcDirs = []
test.resources.srcDirs = []
}

configurations {
// see https://docs.gradle.org/4.1/userguide/dependency_management.html#sub:exclude_transitive_dependencies
runtimeClasspath.exclude group: 'org.slf4j', module: 'slf4j-api'
}

dependencies {
compileOnly project(':nextflow')
compileOnly 'org.slf4j:slf4j-api:2.0.7'
compileOnly 'org.pf4j:pf4j:3.4.1'

testImplementation(testFixtures(project(":nextflow")))
testImplementation "org.codehaus.groovy:groovy:3.0.18"
testImplementation "org.codehaus.groovy:groovy-nio:3.0.18"
}

31 changes: 31 additions & 0 deletions plugins/nf-cloudcache/src/main/nextflow/CloudCachePlugin.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* This Source Code Form is "Incompatible With Secondary Licenses", as
* defined by the Mozilla Public License, v. 2.0.
*/

package nextflow

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.plugin.BasePlugin
import org.pf4j.PluginWrapper
/**
* Nextflow cloud cache plugin
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
@Slf4j
@CompileStatic
class CloudCachePlugin extends BasePlugin {

CloudCachePlugin(PluginWrapper wrapper) {
super(wrapper)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.cache

import java.nio.file.Path

import groovy.transform.CompileStatic
import nextflow.cache.CacheDB
import nextflow.cache.CacheFactory
import nextflow.exception.AbortOperationException
import nextflow.plugin.Priority
/**
* Implements the cloud cache factory
*
* @see CloudCacheStore
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
@CompileStatic
@Priority(-10)
class CloudCacheFactory extends CacheFactory {

@Override
protected CacheDB newInstance(UUID uniqueId, String runName, Path home) {
if( !uniqueId ) throw new AbortOperationException("Missing cache `uuid`")
if( !runName ) throw new AbortOperationException("Missing cache `runName`")
final store = new CloudCacheStore(uniqueId, runName, home)
return new CacheDB(store)
}

}
193 changes: 193 additions & 0 deletions plugins/nf-cloudcache/src/main/nextflow/cache/CloudCacheStore.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.cache

import java.nio.file.Files
import java.nio.file.NoSuchFileException
import java.nio.file.Path

import com.google.common.hash.HashCode
import groovy.transform.CompileStatic
import nextflow.SysEnv
import nextflow.exception.AbortOperationException
import nextflow.extension.FilesEx
import nextflow.util.CacheHelper
/**
* Implements the cloud cache store
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
@CompileStatic
class CloudCacheStore implements CacheStore {

private final String LOCK_NAME = 'LOCK'

private final int KEY_SIZE

/** The session UUID */
private UUID uniqueId

/** The unique run name associated with this cache instance */
private String runName

/** The base path for the entire cache */
private Path basePath

/** The base path for this cache instance */
private Path dataPath

/** The lock file for this cache instance */
private Path lock

/** The path to the index file */
private Path indexPath

/** Index file input stream */
private InputStream indexReader

/** Index file output stream */
private OutputStream indexWriter

CloudCacheStore(UUID uniqueId, String runName, Path basePath=null) {
this.KEY_SIZE = CacheHelper.hasher('x').hash().asBytes().size()
this.uniqueId = uniqueId
this.runName = runName
this.basePath = basePath ?: defaultBasePath()
this.dataPath = this.basePath.resolve("$uniqueId")
this.lock = dataPath.resolve(LOCK_NAME)
this.indexPath = dataPath.resolve("index.$runName")
}

private Path defaultBasePath() {
final basePath = SysEnv.get('NXF_CLOUDCACHE_PATH')
if( !basePath )
throw new IllegalArgumentException("NXF_CLOUDCACHE_PATH must be defined when using the path-based cache store")

return basePath as Path
}

@Override
CloudCacheStore open() {
acquireLock()
indexWriter = new BufferedOutputStream(Files.newOutputStream(indexPath))
return this
}

@Override
CloudCacheStore openForRead() {
if( !dataPath.exists() )
throw new AbortOperationException("Missing cache directory: $dataPath")
acquireLock()
indexReader = Files.newInputStream(indexPath)
return this
}

private void acquireLock() {
if( lock.exists() ) {
final msg = """
Unable to acquire lock for session with ID ${uniqueId}
Common reasons for this error are:
- You are trying to resume the execution of an already running pipeline
- A previous execution was abruptly interrupted, leaving the session open
You can see the name of the conflicting run by inspecting the contents of the following path: ${lock}
"""
throw new IOException(msg)
}

lock.text = runName
}

@Override
void drop() {
dataPath.deleteDir()
}

@Override
void close() {
FilesEx.closeQuietly(indexWriter)
lock.delete()
}

@Override
void writeIndex(HashCode key, boolean cached) {
indexWriter.write(key.asBytes())
indexWriter.write(cached ? 1 : 0)
}

@Override
void deleteIndex() {
indexPath.delete()
}

@Override
Iterator<Index> iterateIndex() {
return new Iterator<Index>() {
private Index next

{
next = fetch()
}

@Override
boolean hasNext() {
return next != null
}

@Override
Index next() {
final result = next
next = fetch()
return result
}

private Index fetch() {
byte[] key = new byte[KEY_SIZE]
if( indexReader.read(key) == -1 )
return null
final cached = indexReader.read() == 1
return new Index(HashCode.fromBytes(key), cached)
}
}
}

@Override
byte[] getEntry(HashCode key) {
try {
return getCachePath(key).bytes
}
catch( NoSuchFileException e ) {
return null
}
}

@Override
void putEntry(HashCode key, byte[] value) {
getCachePath(key).bytes = value
}

@Override
void deleteEntry(HashCode key) {
getCachePath(key).delete()
}

private Path getCachePath(HashCode key) {
dataPath.resolve(key.toString())
}
}
6 changes: 6 additions & 0 deletions plugins/nf-cloudcache/src/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Manifest-Version: 1.0
Plugin-Class: nextflow.CloudCachePlugin
Plugin-Id: nf-cloudcache
Plugin-Version: 0.1.0
Plugin-Provider: Seqera Labs
Plugin-Requires: >=23.04.0
17 changes: 17 additions & 0 deletions plugins/nf-cloudcache/src/resources/META-INF/extensions.idx
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright 2013-2023, Seqera Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

nextflow.cache.CloudCacheFactory
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ include 'plugins:nf-console'
include 'plugins:nf-azure'
include 'plugins:nf-codecommit'
include 'plugins:nf-wave'
include 'plugins:nf-cloudcache'

0 comments on commit ac90cc2

Please sign in to comment.