In [None]:
# load required source files and packages
from __future__ import print_function
import os.path
import pandas as pd
import numpy as np
import re
from IPython.display import Image,display
import dalmatian as dm
from IPython.core.display import HTML 
import sys
sys.path.insert(0, '../JKBio/')
from IPython.core.debugger import set_trace
%load_ext autoreload
%autoreload 2
%load_ext rpy2.ipython
import ipdb

# Check google bucket access

The use of this notebook is to determine for any workspace which files (input files and outputs from running workflows) the user does not have access to. In particular, running this notebook will generate a list of the buckets used in the workspace that the user cannot access.

The user will need to obtain access to these buckets before they will be able to execute all of the workflows. That being said, if the user is only interested in one or two select workflows, the below code can be edited such that the user can determine what they'll need to get access to in order to run that particular workflow.

This notebook also finds google storage file paths that ahve been directly handed into a workflow as an input parameter. This isn't good practice, and should probably be fixed if found by putting the file into into workspace data and then updating the workflow input parameter to point to the appropriate item in the workspace data. In particular, currently this method just identifies bucket paths (not the actual file path). But this can easily be changed if one of these inputs is found.

In [None]:
# current directory
cwd = os.getcwd()
print(cwd)

In [None]:
def flatten_list_of_lists(lol):
    flat_list = []
    for sublist in lol:
        for item in sublist:
            flat_list.append(item)
    return(flat_list)

def str_remove_quotes_brackets(string):
    string = string.replace("'","")
    string = string.replace('"','') # check: does adding this break anything? no.
    string = string.replace("[", "")
    string = string.replace("]","")
    return(string)

def str_to_list_if_comma(string):
    if "," in string: # this string is actually a list! break it up
        result = [x.strip() for x in string.split(',')]
        return(result)
    else: # just a string
        return(string)

In [None]:
# we want to check whether the entity contains any gs bucket link.
# we also need to deal with chance that we're being passed multiple paths (should be comma separated)
# If it does, we want to check access using gsutil. If we don't have access, add the link to the noAccess links.
# path may be given as a single path or a character string with a list of paths (e.g. "gs://path/1, gs://path/2")

# format lists from terra TSVs: terra sometimes has things as str that are actually lists
def str_to_list_remove_quotes_brackets(string):
    if "," in string: # this string is actually a list! break it up
        string = string.replace("'","")
        string = string.replace("[", "")
        string = string.replace("]","")
        result = [x.strip() for x in string.split(',')]
        return(result)
    else: # just a string
        return(string)


def check_access(entity):
    # for each value or cell in the dataframe, we will check for gs:// filepaths
    # Arg entity: can be either a pandas dataframe or a list (not nested)
    if isinstance(entity, list):
        flattened_entity = entity # when list as input
    else:
        flattened_entity = entity.values.flatten().tolist()
        
    paths = set() # using sets is faster than using lists
    # create list of paths
    for i in flattened_entity: # for each cell of our entity aka Terra TSV            
        if isinstance(i, str):
            i = str_remove_quotes_brackets(i)
            i = str_to_list_if_comma(i) # this will output a list if the string contains a comma, else a string
            if isinstance(i, str) and (i.startswith('gs://') or i.startswith('"gs://')):
#                 ipdb.set_trace()
                paths.add(i)
        if isinstance(i, list) and np.all([j.startswith('gs://') for j in i if isinstance(j, str)]): # check: but what if it's not a list, but a comma separated string?
            paths.update(i)
        else:
            continue # if it's not a str or a list, then it won't have a file path. Skip.
            
    print("We have a list of "+str(len(paths))+" file paths to check for access. \n")
    
    tested_buckets = set()
    noAccess = set()
    loopNum = 0
    for path in paths:
        loopNum += 1
        if loopNum%10000 == 0:
            print("We're on path number "+str(loopNum)+" of "+str(len(paths))+".")
        
        bucketpath = re.match("^(gs:\/\/)([^:\/\s]+)\/", str(path)) # only check the bucket (more efficient and avoids duplication)
        if bucketpath: # found match aka gs bucket path
            bucketpath = bucketpath[0] # get the bucket path
            if bucketpath not in tested_buckets:
                tested_buckets.add(bucketpath)
                print("We're using gsutil to check access for "+bucketpath)
                # is there a better way to check for access besides using gsutil ls?
                accessDeniedCheck = ! gsutil ls {bucketpath} # ideally, faster to check if have full path (not just bucket) because a bucket might have tons of stuff in it.
                if "AccessDeniedException" in accessDeniedCheck[0]: # shell threw an error: you don't have access
                    noAccess.add(bucketpath)
    return(noAccess)

# create a list of each google storage file path you cannot access
def check_workspace_bucket_access(func, wmfrom, wmto=None):
    data = {}
    try:
        a = wmfrom.get_participants()
        data.update({'participants': a})
    except:
        print('no participants')
    try:
        a = wmfrom.get_samples()
        data.update({'samples': a})
    except:
        print('no samples')
    try:
        a = wmfrom.get_pair_sets()
        data.update({'pair_sets': a})
    except:
        print('no pair_sets')
    try:
        a = wmfrom.get_pairs()
        data.update({'pairs': a})
    except:
        print('no pairs')
    try:
        a = wmfrom.get_sample_sets()
        data.update({'sample_sets': a})
    except:
        print('no sample_sets')
#     result = []
    result = set()
    for key, entity in iter(data.items()): # iter(data.items()) in place of data.iteritems()
        print("working on key: ", key)
#         result += func(entity)
        result.update(func(entity))
    print("\n Done checking for access.")
    return(result)


## INPUT WORKSPACE INFO HERE
Edit the cell below (tagged with 'parameters') to specific your namespace and workspace of interest.

In [None]:
workspace="nci-mimoun-bi-org/PANCAN_TWIST%20copy"


In [None]:
wmfrom = dm.WorkspaceManager(workspace)
func = check_access

In [None]:
wmfrom.get_attributes()

## DeniedAccess paths for all workspace entities (e.g. samples, sample sets)

This allows the user to see if thet have acccess to all of the files produced by the workflows.

In [None]:
denied_entities = check_workspace_bucket_access(func, wmfrom)
denied_entities

## DeniedAccess paths for Workspace data TSV

This allows the user to see if they have access to all of the buckets used for the workspace data. The workspace data usually contains items that are passed in as input parameters to various workflows. Running workflows may require access to a large portion of the files/objects in the workspace data.

In [None]:
# converting Workspace data TSV values to flattened list (just grabbing the values, not the keys)
# on this list, we want to check google bucket access.
a = wmfrom.get_attributes()

list_dict_inputs_outputs = [list(a.values())] # get values from (nested) dict
workspace_data_list = flatten_list_of_lists(list_dict_inputs_outputs)
workspace_data_list

denied_workspace = check_access(workspace_data_list)
denied_workspace

## DeniedAccess paths for Workflow inputs and outputs

This allows the user to see if they have access to all of the inputs in a workflow that are passed as direct google storage file paths. This direct passing of paths is not good practice, and should probably be fixed at some point when it occurs (except if it's public data that will always be public, like BAMs from Terra tutorials and such).

In [None]:
workflows = wmfrom.get_configs()
workflows


In [None]:
if len(workflows) == 0:
    print("There are no workflows to check.")

In [None]:
denied_workflows = set()
if len(workflows) != 0:
    for i in workflows['name']:
        print(i)
        # converting Workflow input information to flattened list
        try:
            a = wmfrom.get_config(i)
        except:
            print("We did not find "+i+" in the namespace specified above. \n")
            continue
        list_dict_inputs_outputs = [a.get(key) for key in ["inputs", "outputs"]] # get values from (nested) dict
        list_inputs_outputs = [list(item.values()) for item in list_dict_inputs_outputs] # create list of list from (nested) dict values
        workflow_data_list = flatten_list_of_lists(list_inputs_outputs) # flattened list of all inputs to the workflow
    #     print(workflow_data_list)
    #     print("\n")
    #     print(check_access(workflow_data_list)) # check which ones get added for each workflow
    #     print("\n")

        # on this list, we want to check google bucket access.
        denied_workflows.update(check_access(workflow_data_list))
    #     print(denied_workflows)
    #     print(len(denied_workflows))
    #     print("\n")
    
denied_workflows

## All found DeniedAccess bucket paths

In [None]:
denied_all = set()
denied_all.update(denied_entities)
denied_all.update(denied_workspace)
denied_all.update(denied_workflows)
denied_all # now this set contains all bucket paths that the user cannot access

In [None]:
len(denied_all)