Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cloud cache plugin #4097

Merged
merged 12 commits into from
Jul 20, 2023
5 changes: 5 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,11 @@ The following environment variables control the configuration of the Nextflow ru
`NXF_ASSETS`
: Defines the directory where downloaded pipeline repositories are stored (default: `$NXF_HOME/assets`)

`NXF_CLOUDCACHE_PATH`
: :::{versionadded} 23.07.0-edge
:::
: Defines the base cache path when using the cloud cache store.

`NXF_CHARLIECLOUD_CACHEDIR`
: Directory where remote Charliecloud images are stored. When using a computing cluster it must be a shared folder accessible from all compute nodes.

Expand Down
1 change: 1 addition & 0 deletions docs/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Currently the following functionalities are implemented as plugin components and
- `nf-console`: Implement Nextflow [REPL console](https://www.nextflow.io/blog/2015/introducing-nextflow-console.html).
- `nf-ga4gh`: Support [GA4GH APIs](https://www.ga4gh.org/).
- `nf-google`: Support for Google cloud.
- `nf-cloudcache`: Support for Nextflow cache in object storage.
- `nf-tower`: Support for [Nextflow Tower](https://tower.nf) platform.

## Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.file.Path

import groovy.transform.CompileStatic
import nextflow.exception.AbortOperationException
import nextflow.plugin.Priority

/**
* Implements the default cache factory
Expand All @@ -30,6 +31,7 @@ import nextflow.exception.AbortOperationException
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
@Priority(0)
class DefaultCacheFactory extends CacheFactory {

@Override
Expand Down
41 changes: 41 additions & 0 deletions plugins/nf-cloudcache/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
apply plugin: 'groovy'

sourceSets {
main.java.srcDirs = []
main.groovy.srcDirs = ['src/main']
main.resources.srcDirs = ['src/resources']
test.groovy.srcDirs = ['src/test']
test.java.srcDirs = []
test.resources.srcDirs = []
}

configurations {
// see https://docs.gradle.org/4.1/userguide/dependency_management.html#sub:exclude_transitive_dependencies
runtimeClasspath.exclude group: 'org.slf4j', module: 'slf4j-api'
}

dependencies {
compileOnly project(':nextflow')
compileOnly 'org.slf4j:slf4j-api:2.0.7'
compileOnly 'org.pf4j:pf4j:3.4.1'

testImplementation(testFixtures(project(":nextflow")))
testImplementation "org.codehaus.groovy:groovy:3.0.18"
testImplementation "org.codehaus.groovy:groovy-nio:3.0.18"
}

31 changes: 31 additions & 0 deletions plugins/nf-cloudcache/src/main/nextflow/CloudCachePlugin.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* This Source Code Form is "Incompatible With Secondary Licenses", as
* defined by the Mozilla Public License, v. 2.0.
*/

package nextflow

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.plugin.BasePlugin
import org.pf4j.PluginWrapper
/**
* Nextflow cloud cache plugin
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
@Slf4j
@CompileStatic
class CloudCachePlugin extends BasePlugin {

CloudCachePlugin(PluginWrapper wrapper) {
super(wrapper)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.cache

import java.nio.file.Path

import groovy.transform.CompileStatic
import nextflow.cache.CacheDB
import nextflow.cache.CacheFactory
import nextflow.exception.AbortOperationException
import nextflow.plugin.Priority
/**
* Implements the cloud cache factory
*
* @see CloudCacheStore
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
@CompileStatic
@Priority(-10)
class CloudCacheFactory extends CacheFactory {

@Override
protected CacheDB newInstance(UUID uniqueId, String runName, Path home) {
if( !uniqueId ) throw new AbortOperationException("Missing cache `uuid`")
if( !runName ) throw new AbortOperationException("Missing cache `runName`")
final store = new CloudCacheStore(uniqueId, runName, home)
return new CacheDB(store)
}

}
193 changes: 193 additions & 0 deletions plugins/nf-cloudcache/src/main/nextflow/cache/CloudCacheStore.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.cache

import java.nio.file.Files
import java.nio.file.NoSuchFileException
import java.nio.file.Path

import com.google.common.hash.HashCode
import groovy.transform.CompileStatic
import nextflow.SysEnv
import nextflow.exception.AbortOperationException
import nextflow.extension.FilesEx
import nextflow.util.CacheHelper
/**
* Implements the cloud cache store
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
@CompileStatic
class CloudCacheStore implements CacheStore {

private final String LOCK_NAME = 'LOCK'

private final int KEY_SIZE

/** The session UUID */
private UUID uniqueId

/** The unique run name associated with this cache instance */
private String runName

/** The base path for the entire cache */
private Path basePath

/** The base path for this cache instance */
private Path dataPath

/** The lock file for this cache instance */
private Path lock

/** The path to the index file */
private Path indexPath

/** Index file input stream */
private InputStream indexReader

/** Index file output stream */
private OutputStream indexWriter

CloudCacheStore(UUID uniqueId, String runName, Path basePath=null) {
this.KEY_SIZE = CacheHelper.hasher('x').hash().asBytes().size()
this.uniqueId = uniqueId
this.runName = runName
this.basePath = basePath ?: defaultBasePath()
this.dataPath = this.basePath.resolve("$uniqueId")
this.lock = dataPath.resolve(LOCK_NAME)
this.indexPath = dataPath.resolve("index.$runName")
}

private Path defaultBasePath() {
final basePath = SysEnv.get('NXF_CLOUDCACHE_PATH')
if( !basePath )
throw new IllegalArgumentException("NXF_CLOUDCACHE_PATH must be defined when using the path-based cache store")

return basePath as Path
}

@Override
CloudCacheStore open() {
acquireLock()
indexWriter = new BufferedOutputStream(Files.newOutputStream(indexPath))
return this
}

@Override
CloudCacheStore openForRead() {
if( !dataPath.exists() )
throw new AbortOperationException("Missing cache directory: $dataPath")
acquireLock()
indexReader = Files.newInputStream(indexPath)
return this
}

private void acquireLock() {
if( lock.exists() ) {
final msg = """
Unable to acquire lock for session with ID ${uniqueId}

Common reasons for this error are:
- You are trying to resume the execution of an already running pipeline
- A previous execution was abruptly interrupted, leaving the session open

You can see the name of the conflicting run by inspecting the contents of the following path: ${lock}
"""
throw new IOException(msg)
}

lock.text = runName
}

@Override
void drop() {
dataPath.deleteDir()
}

@Override
void close() {
FilesEx.closeQuietly(indexWriter)
lock.delete()
}

@Override
void writeIndex(HashCode key, boolean cached) {
indexWriter.write(key.asBytes())
indexWriter.write(cached ? 1 : 0)
}

@Override
void deleteIndex() {
indexPath.delete()
}

@Override
Iterator<Index> iterateIndex() {
return new Iterator<Index>() {
private Index next

{
next = fetch()
}

@Override
boolean hasNext() {
return next != null
}

@Override
Index next() {
final result = next
next = fetch()
return result
}

private Index fetch() {
byte[] key = new byte[KEY_SIZE]
if( indexReader.read(key) == -1 )
return null
final cached = indexReader.read() == 1
return new Index(HashCode.fromBytes(key), cached)
}
}
}

@Override
byte[] getEntry(HashCode key) {
try {
return getCachePath(key).bytes
}
catch( NoSuchFileException e ) {
return null
}
}

@Override
void putEntry(HashCode key, byte[] value) {
getCachePath(key).bytes = value
}

@Override
void deleteEntry(HashCode key) {
getCachePath(key).delete()
}

private Path getCachePath(HashCode key) {
dataPath.resolve(key.toString())
}
}
6 changes: 6 additions & 0 deletions plugins/nf-cloudcache/src/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Manifest-Version: 1.0
Plugin-Class: nextflow.CloudCachePlugin
Plugin-Id: nf-cloudcache
Plugin-Version: 0.1.0
Plugin-Provider: Seqera Labs
Plugin-Requires: >=23.04.0
17 changes: 17 additions & 0 deletions plugins/nf-cloudcache/src/resources/META-INF/extensions.idx
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright 2013-2023, Seqera Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

nextflow.cache.CloudCacheFactory
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ include 'plugins:nf-console'
include 'plugins:nf-azure'
include 'plugins:nf-codecommit'
include 'plugins:nf-wave'
include 'plugins:nf-cloudcache'