# Parallel bucket upload

In [10]:
from dask.diagnostics import ProgressBar
from pathlib import Path
from dask import bag
pbar = ProgressBar()
pbar.register()

def iterput(*args):
    import s3fs
    role = "COSEMPDSAdmin"
    fs = s3fs.S3FileSystem(profile=role)
    for arg in args:
        #print(f'put {arg[0]} to {arg[1]}')
        fs.put(*arg)
    return True
        

In [19]:
!ls /nrs/cosem/davis/s3_testing/

hela2	   HeLa_b.n5  Jurkat.n5      Pancreas_islets.n5
hela_a.n5  HeLa_c.n5  macrophage.n5  SUM159.n5


In [29]:
bucket = 'janelia-cosem-dev'
source_path = Path('/nrs/cosem/davis/s3_testing/Macrophage.n5/')
target_path = Path(bucket) / source_path.stem / source_path.parts[-1]

In [30]:
all_files = list(filter(lambda v: not v.is_dir(), source_path.rglob('*')))
all_object_names = [target_path / f.relative_to(source_path) for f in all_files]

In [31]:
tmp = bag.from_sequence(zip(all_files, all_object_names), npartitions=5000)

In [32]:
# default multiprocessing scheduler gets the best results
with pbar:
    tmp.map_partitions(lambda v: iterput(*v)).compute()

[########################################] | 100% Completed | 13min 43.9s
[########################################] | 100% Completed | 13min 44.0s


In [6]:
import s3fs
fs = s3fs.S3FileSystem(profile='COSEMPDSAdmin')
fs.rm('janelia-cosem-dev/HeLa_c/README.md')


In [1]:
# upload readme file to bucket
import s3fs
role = "COSEMPDSAdmin"
fs = s3fs.S3FileSystem(profile=role)
fs.put('/groups/scicompsoft/home/bennettd/dev/aws-opendata/cosem-introduction.md', 'janelia-cosem/README.md')

# Experimenting with parallelized recursive glob on s3 

In [1]:
bucket = 'janelia-cosem-dev'
source = 'janelia-cosem-dev/HeLa_b.n5/attributes.json'
target = 'HeLa_b/HeLa_b.n5/attributes.json'

In [2]:
from s3fs import S3FileSystem
fs = S3FileSystem(profile='COSEMPDSAdmin')


In [4]:
def s3glob(paths, detail=True):
    from s3fs import S3FileSystem
    fs = S3FileSystem(profile='COSEMPDSAdmin')
    if isinstance(paths, str):
        result = fs.glob(paths, detail=detail)
    else:
        result = []
        for p in paths:
            result.extend(fs.glob(p, detail=detail))
    return result

In [6]:
root_contents = s3glob('janelia-cosem-dev/*.n5/*/*[a-z/]*/*', detail=True)
from pathlib import Path
discard, keep = [],[]
root_paths = tuple(root_contents.keys())
for ind, val in enumerate(root_paths[:-1]):
    # discard directories that have been globbed out already
    if Path(val).parts != Path(root_paths[ind+1]).parts[:-1]:
        keep.append(val)
    else:
        discard.append(val)

In [13]:
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()

In [14]:
import dask.bag as bag

In [15]:
bg = bag.from_sequence([pth + '**' for pth in keep if root_contents[pth]['type']=='directory'], npartitions=50)

In [24]:
with pbar:
    result = bg.map_partitions(s3glob).compute(scheduler='threads')

[########################################] | 100% Completed |  2min 37.7s


In [26]:
bag.from_sequence([pth + '**' for pth in keep[:10] if root_contents[pth]['type']=='directory'])

dask.bag<from_sequence, npartitions=50>