In [10]:
import logging
import os
import sys
import json
from datetime import date
from firecloud import api as fapi
from oauth2client.client import GoogleCredentials
sys.path.insert(0, os.path.abspath('../../src/python'))
from terra_workspace_utils import get_workspace_submissions, get_gs_links_for_failed_workflows

#pip install firecloud
logging.basicConfig(filename='workspace-utils-app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')

In [5]:
def make_filter_by_date(target_date, newer=True):
    """
    Returns a filter function that filters submissions based on the submission date.

    Parameters:
    - target_date (datetime.date): The target date to filter by.
    - newer (bool, optional): If True, filters submissions with dates greater than the target date.
                              If False, filters submissions with dates less than the target date.
                              Defaults to True.

    Returns:
    - filter_by_date (function): The filter function that can be used to filter submissions.
    """
    def filter_by_date(x):
        if newer:
            return date.fromisoformat(x["submissionDate"].split("T")[0]) > target_date
        else:
            return date.fromisoformat(x["submissionDate"].split("T")[0]) < target_date
    return filter_by_date

def make_filter_by_submitter(email, exclude=False):
    """
    Returns a filter function that filters submissions based on the submitter's email.

    Parameters:
    - email (str): The email of the submitter to filter by.
    - exclude (bool, optional): If True, keep submissions that do not match the email.
                                If False, keep submissions that match the email.
                                Defaults to False.

    Returns:
    - filter_by_submitter (function): The filter function that can be used to filter submissions.
    """
    def filter_by_submitter(x):
        if exclude:
            return x["submitter"] != email
        else:
            return x["submitter"] == email
    return filter_by_submitter

def make_filter_by_workflow_status(status, exclude=False):
    """
    Returns a filter function that filters submissions based on the workflow status.

    Parameters:
    - status (str): The workflow status to filter by.
    - exclude (bool, optional): If True, keep submissions that do not match the status.
                                If False, keep submissions that match the status.
                                Defaults to False.

    Returns:
    - filter_by_workflow_status (function): The filter function that can be used to filter submissions.
    """
    def filter_by_workflow_status(x):
        if exclude:
            return any(x!=status for x in x["workflowStatuses"])
        else:
            return any(x==status for x in x["workflowStatuses"])
    return filter_by_workflow_status

def make_get_links_for_failed_workflows(workspace_project, workspace_namespace):
    """
    Returns a function that gets the Google Storage links for failed workflows in a workspace.

    Parameters:
    - workspace_project (str): The project of the workspace to get the links for.
    - workspace_namespace (str): The namespace of the workspace to get the links for.

    Returns:
    - get_links_for_failed_workflows (function): The function that can be used to get the links for failed workflows.
    """
    def get_links_for_failed_workflows(x):
        return get_gs_links_for_failed_workflows(workspace_project, workspace_namespace, x["submissionId"])
    return get_links_for_failed_workflows

In [None]:
# Terra workspace in the form of 'namespace/name'
workspace_project = ""
workspace_namespace = ""


# Get all submissions for a workspace
all_submissions = get_workspace_submissions(workspace_project, workspace_namespace)
print("\n".join(all_submissions[0].keys()))

deleteIntermediateOutputFiles
methodConfigurationDeleted
methodConfigurationName
methodConfigurationNamespace
status
submissionDate
submissionEntity
submissionId
submissionRoot
submitter
useCallCache
userComment
workflowStatuses


In [39]:
from terra_workspace_utils import get_workflows_by_submission, get_workflow_outputs

# Get all workflows for a submission
submission = all_submissions[1]
workflows = get_workflows_by_submission(workspace_project, workspace_namespace, submission['submissionId'])
print(len(workflows["workflows"]))
# This is the example for just the first workflow. You can loop through all workflows if you have more than one.
print(workflows["workflows"][0]["workflowId"])
outputs = get_workflow_outputs(workspace_project, workspace_namespace, submission['submissionId'], workflows["workflows"][0]["workflowId"])
print(outputs)

1
b46a1a0b-d0c6-459b-a193-2a0f3a6cf341
('{"causes":[{"causes":[{"causes":[],"exceptionClass":"java.util.NoSuchElementException","message":"key not found: outputs","source":"cromwell","stackTrace":[{"className":"scala.collection.MapOps","fileName":"Map.scala","lineNumber":289,"methodName":"default"},{"className":"scala.collection.MapOps","fileName":"Map.scala","lineNumber":288,"methodName":"default$"},{"className":"scala.collection.AbstractMap","fileName":"Map.scala","lineNumber":420,"methodName":"default"},{"className":"scala.collection.MapOps","fileName":"Map.scala","lineNumber":176,"methodName":"apply"},{"className":"scala.collection.MapOps","fileName":"Map.scala","lineNumber":175,"methodName":"apply$"},{"className":"scala.collection.AbstractMap","fileName":"Map.scala","lineNumber":420,"methodName":"apply"},{"className":"spray.json.ProductFormats","fileName":"ProductFormats.scala","lineNumber":58,"methodName":"fromField"},{"className":"spray.json.ProductFormats","fileName":"ProductFo

In [41]:
# Now we can sequentially apply filters to get the submissions we are interested in

# Let's restrict to submissions newer than certain date
target_date = date.fromisoformat("2020-01-01")
submissions_newer_than_target_date = filter(make_filter_by_date(target_date), all_submissions)

# Now let's find which submissions contains Failed or Aborted workflows
failed_submissions = list(filter(make_filter_by_workflow_status("Succeded", exclude=True), submissions_newer_than_target_date))


print(len(failed_submissions))

1670


In [43]:
folders_to_remove = list(map(make_get_links_for_failed_workflows(workspace_project, workspace_namespace), failed_submissions))
folders_to_remove_cleaned = [x for sublist in folders_to_remove for x in sublist if x]

print(len(folders_to_remove_cleaned))
with open("folders_to_remove_gro_share.txt", "w") as f:
    for item in folders_to_remove_cleaned:
        f.write(f"{item}\n")

2618
