Skip to content

Commit

Permalink
Merge pull request #2442 from opencb/TASK-6078
Browse files Browse the repository at this point in the history
TASK-6078 - Concurrent variant-index from same sample without --load-multi-file-data
  • Loading branch information
j-coll committed May 22, 2024
2 parents 9233577 + e1500e7 commit c74ea8a
Show file tree
Hide file tree
Showing 22 changed files with 960 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.metadata.models.TaskMetadata;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.opencb.opencga.storage.core.variant.VariantStorageOptions;

import java.net.URI;
import java.util.ArrayList;
Expand All @@ -52,6 +53,7 @@ public void removeStudy(String study, URI outdir, String token) throws CatalogEx

public void removeFile(String study, List<String> inputFiles, URI outdir, String token) throws CatalogException, StorageEngineException {
// Update study metadata BEFORE executing the operation and fetching files from Catalog
boolean force = variantStorageEngine.getOptions().getBoolean(VariantStorageOptions.FORCE.key());
StudyMetadata studyMetadata = synchronizeCatalogStudyFromStorage(study, token, true);

List<String> fileNames = new ArrayList<>();
Expand All @@ -62,7 +64,15 @@ public void removeFile(String study, List<String> inputFiles, URI outdir, String
if (!catalogIndexStatus.equals(VariantIndexStatus.READY)) {
// Might be partially loaded in VariantStorage. Check FileMetadata
FileMetadata fileMetadata = variantStorageEngine.getMetadataManager().getFileMetadata(studyMetadata.getId(), fileStr);
if (fileMetadata == null || fileMetadata.getIndexStatus() != TaskMetadata.Status.NONE) {
boolean canBeRemoved;
if (force) {
// When forcing remove, just require the file to be registered in the storage
canBeRemoved = fileMetadata != null;
} else {
// Otherwise, require the file to be in status NONE
canBeRemoved = fileMetadata != null && fileMetadata.getIndexStatus() != TaskMetadata.Status.NONE;
}
if (!canBeRemoved) {
throw new CatalogException("Unable to remove variants from file " + file.getName() + ". "
+ "IndexStatus = " + catalogIndexStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ protected void check() throws Exception {
throw new ToolException("Missing file/s");
}
params.put(VariantStorageOptions.RESUME.key(), variantFileDeleteParams.isResume());
params.put(VariantStorageOptions.FORCE.key(), variantFileDeleteParams.isForce());
}

@Override
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ public class IndexStatus extends InternalStatus {
*/
public static final String NONE = "NONE";
public static final String INDEXING = "INDEXING";
public static final String INVALID = "INVALID";

public static final List<String> STATUS_LIST = Arrays.asList(READY, DELETED, NONE, INDEXING);
public static final List<String> STATUS_LIST = Arrays.asList(READY, DELETED, NONE, INDEXING, INVALID);

public IndexStatus(String status, String message) {
if (isValid(status)) {
Expand Down Expand Up @@ -50,6 +51,7 @@ public static boolean isValid(String status) {
return status != null
&& (status.equals(READY)
|| status.equals(DELETED)
|| status.equals(INVALID)
|| status.equals(NONE)
|| status.equals(INDEXING));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class VariantIndexStatus extends IndexStatus {
public static final String TRANSFORMED = "TRANSFORMED";
public static final String LOADING = "LOADING";

public static final List<String> STATUS_LIST = Arrays.asList(READY, DELETED, NONE, TRANSFORMED, TRANSFORMING, LOADING, INDEXING);
public static final List<String> STATUS_LIST = Arrays.asList(READY, DELETED, NONE, TRANSFORMED, TRANSFORMING, LOADING, INDEXING,
INVALID);

public VariantIndexStatus(String status, String message) {
if (isValid(status)) {
Expand Down Expand Up @@ -56,6 +57,7 @@ public static boolean isValid(String status) {
&& (status.equals(READY)
|| status.equals(DELETED)
|| status.equals(NONE)
|| status.equals(INVALID)
|| status.equals(TRANSFORMED)
|| status.equals(TRANSFORMING)
|| status.equals(LOADING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public VariantFileDeleteParams(List<String> file, boolean resume) {

private List<String> file;
private boolean resume;
private boolean force;

public List<String> getFile() {
return file;
Expand All @@ -52,4 +53,13 @@ public VariantFileDeleteParams setResume(boolean resume) {
this.resume = resume;
return this;
}

public boolean isForce() {
return force;
}

public VariantFileDeleteParams setForce(boolean force) {
this.force = force;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public static StorageEngineException alreadyLoaded(int fileId, String fileName)
return unableToExecute("Already loaded", fileId, fileName);
}

public static StorageEngineException invalidFileStatus(int fileId, String fileName) {
return unableToExecute("File is in INVALID status. Unable to load. File needs to be deleted from the variant-storage",
fileId, fileName);
}

public static StorageEngineException otherOperationInProgressException(TaskMetadata operation, String jobOperationName,
List<Integer> fileIds,
VariantStorageMetadataManager mm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ public <E extends Exception> FileMetadata updateFileMetadata(int studyId, int fi
update.update(fileMetadata);
lock.checkLocked();
unsecureUpdateFileMetadata(studyId, fileMetadata);
fileIdIndexedCache.put(studyId, fileId, fileMetadata.isIndexed());
return fileMetadata;
} finally {
lock.unlock();
Expand Down Expand Up @@ -807,7 +808,6 @@ public void addIndexedFiles(int studyId, List<Integer> fileIds) throws StorageEn
.getName();
logger.info("Register file " + name + " as INDEXED");
}
fileDBAdaptor.addIndexedFiles(studyId, fileIds);
fileIdsFromSampleIdCache.clear();
fileIdIndexedCache.clear();
}
Expand All @@ -821,6 +821,7 @@ public void removeIndexedFiles(int studyId, Collection<Integer> fileIds) throws
fileMetadata.setIndexStatus(TaskMetadata.Status.NONE);
fileMetadata.setSecondaryAnnotationIndexStatus(TaskMetadata.Status.NONE);
fileMetadata.setAnnotationStatus(TaskMetadata.Status.NONE);
fileMetadata.getAttributes().remove(LOAD_ARCHIVE.key());
if (fileMetadata.getType() == FileMetadata.Type.VIRTUAL) {
partialFiles.addAll(fileMetadata.getAttributes().getAsIntegerList(FileMetadata.VIRTUAL_FILES));
}
Expand All @@ -833,6 +834,7 @@ public void removeIndexedFiles(int studyId, Collection<Integer> fileIds) throws
fileMetadata.setIndexStatus(TaskMetadata.Status.NONE);
fileMetadata.setSecondaryAnnotationIndexStatus(TaskMetadata.Status.NONE);
fileMetadata.setAnnotationStatus(TaskMetadata.Status.NONE);
fileMetadata.getAttributes().remove(LOAD_ARCHIVE.key());
});
// deleteVariantFileMetadata(studyId, fileId);
}
Expand All @@ -851,7 +853,6 @@ public void removeIndexedFiles(int studyId, Collection<Integer> fileIds) throws
}
});
}
fileDBAdaptor.removeIndexedFiles(studyId, fileIds);
}

public Iterable<FileMetadata> fileMetadataIterable(int studyId) {
Expand Down Expand Up @@ -1283,14 +1284,22 @@ public TaskMetadata getTask(int studyId, String taskName, List<Integer> fileIds)
return task;
}

public Iterable<TaskMetadata> taskIterable(int studyId) {
return () -> taskIterator(studyId, null, false);
}

public Iterator<TaskMetadata> taskIterator(int studyId) {
return taskIterator(studyId, null);
return taskIterator(studyId, null, false);
}

public Iterator<TaskMetadata> taskIterator(int studyId, List<TaskMetadata.Status> statusFilter) {
return taskIterator(studyId, statusFilter, false);
}

public Iterator<TaskMetadata> taskIterator(int studyId, TaskMetadata.Status... statusFilter) {
return taskIterator(studyId, Arrays.asList(statusFilter), false);
}

public Iterator<TaskMetadata> taskIterator(int studyId, List<TaskMetadata.Status> statusFilter, boolean reversed) {
return taskDBAdaptor.taskIterator(studyId, statusFilter, reversed);
}
Expand All @@ -1299,6 +1308,13 @@ public Iterable<TaskMetadata> getRunningTasks(int studyId) {
return taskDBAdaptor.getRunningTasks(studyId);
}

public Iterable<TaskMetadata> getActiveTasks(int studyId) {
return () -> taskIterator(studyId,
TaskMetadata.Status.RUNNING,
TaskMetadata.Status.DONE,
TaskMetadata.Status.ERROR);
}

public void unsecureUpdateTask(int studyId, TaskMetadata task) throws StorageEngineException {
task.setStudyId(studyId);
if (task.getId() == 0) {
Expand Down Expand Up @@ -1748,6 +1764,9 @@ private int registerFile(int studyId, String filePath, FileMetadata.Type type) t

if (fileId != null) {
updateFileMetadata(studyId, fileId, fileMetadata -> {
if (fileMetadata.getIndexStatus() == TaskMetadata.Status.INVALID) {
throw StorageEngineException.invalidFileStatus(fileMetadata.getId(), fileName);
}
if (fileMetadata.isIndexed()) {
throw StorageEngineException.alreadyLoaded(fileMetadata.getId(), fileName);
}
Expand Down Expand Up @@ -2085,47 +2104,35 @@ private TaskMetadata getRunningTaskCompatibleOrFail(int studyId, String jobOpera
TaskMetadata.Type type, Predicate<TaskMetadata> allowConcurrent)
throws StorageEngineException {
TaskMetadata resumeTask = null;
Iterator<TaskMetadata> iterator = taskIterator(studyId, Arrays.asList(
TaskMetadata.Status.DONE,
TaskMetadata.Status.RUNNING,
TaskMetadata.Status.ERROR));
while (iterator.hasNext()) {
TaskMetadata task = iterator.next();
for (TaskMetadata task : getActiveTasks(studyId)) {
TaskMetadata.Status currentStatus = task.currentStatus();

switch (currentStatus) {
case READY:
logger.warn("Unexpected READY task. IGNORE");
// Ignore ready operations
break;
case DONE:
case RUNNING:
if (!resume) {
if (task.sameOperation(fileIds, type, jobOperationName)) {
throw StorageEngineException.currentOperationInProgressException(task, this);
} else {
if (allowConcurrent.test(task)) {
break;
} else {
throw StorageEngineException.otherOperationInProgressException(task, jobOperationName, fileIds, this);
}
}
}
// DO NOT BREAK!. Resuming last loading, go to error case.
case ERROR:
if (!task.sameOperation(fileIds, type, jobOperationName)) {
if (allowConcurrent.test(task)) {
break;
} else {
throw StorageEngineException.otherOperationInProgressException(task, jobOperationName, fileIds, this, resume);
}
} else {
logger.info("Resuming last batch operation \"" + task.getName() + "\" due to error.");
resumeTask = task;
}
break;
default:
throw new IllegalArgumentException("Unknown Status " + currentStatus);
if (currentStatus != TaskMetadata.Status.DONE
&& currentStatus != TaskMetadata.Status.RUNNING
&& currentStatus != TaskMetadata.Status.ERROR) {
logger.warn("Unexpected {} task. IGNORE", currentStatus);
// Ignore ready operations
continue;
}

if (task.sameOperation(fileIds, type, jobOperationName)) {
if (currentStatus == TaskMetadata.Status.ERROR) {
// Automatically resume ERROR status tasks
logger.info("Resuming last batch operation \"" + task.getName() + "\" due to error.");
resumeTask = task;
} else if (resume) {
// Force resume
logger.info("Manually resuming last batch operation \"" + task.getName() + "\" in status " + currentStatus + ".");
resumeTask = task;
} else {
// Already being executed
throw StorageEngineException.currentOperationInProgressException(task, this);
}
} else {
// Check if it can be executed concurrently
if (!allowConcurrent.test(task)) {
throw StorageEngineException.otherOperationInProgressException(task, jobOperationName, fileIds, this);
}
}
}
return resumeTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ public String description() {

LinkedHashSet<Integer> getIndexedFiles(int studyId, boolean includePartial);

default void addIndexedFiles(int studyId, List<Integer> fileIds) {}

default void removeIndexedFiles(int studyId, Collection<Integer> fileIds) {};

default DataResult count() {
return count(new Query());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,39 @@ public class TaskMetadata {

public enum Status {
NONE,
/**
* Active task.
* Running, but not finished
*/
RUNNING,
DONE, // Finished, but some work still needed (optional)
/**
* Active task.
* Finished, but some work still needed (optional status)
*/
DONE,
/**
* Active task.
* Currently, paused.
* Errors found during the execution. Needs to be resumed or cleaned.
*/
ERROR,
/**
* Finished.
* Ready to be used
*/
READY,
ERROR
/**
* Finished.
* Task was aborted, cancelled or rolled back.
* Any needed clean might be executed by other running tasks
*/
ABORTED,
/**
* Finished.
* Task finished with invalid results.
* Similar to "ERROR" status, but this can't be resumed. Needs to be cleaned first.
*/
INVALID,
}

public enum Type {
Expand Down
Loading

0 comments on commit c74ea8a

Please sign in to comment.