In [1]:
import logging
import urllib.request
import shutil
import tarfile

from pathlib import Path

from Pegasus.api import *

logging.basicConfig(level=logging.DEBUG)

# Set up properties
props = Properties()
props["pegasus.data.configuration"] = "nonsharedfs"
props["dagman.retry"] = "0"
props.write()

# Create a Site Catalog for nonsharedfs data configuration
sc = SiteCatalog()


condorpool_site = Site("condorpool", arch=Arch.X86_64, os_type=OS.LINUX)\
                    .add_pegasus_profile(style="condor")\
                    .add_condor_profile(universe="vanilla")\
                    .add_pegasus_profile(auxillary_local="true")

staging_site = Site("stage", arch=Arch.X86_64, os_type=OS.LINUX)\
                .add_directories(
                    Directory(Directory.SHARED_SCRATCH, "/tmp/")
                        .add_file_servers(FileServer("file:///tmp/", Operation.ALL))
                )\
                .add_pegasus_profile(auxillary_local="true")

sc.add_sites(condorpool_site, staging_site)
sc.write()

# Download the initial input (not using a pegasus job, because we will need
# iterate through it and add all files to the rc)
with urllib.request.urlopen("http://www.isi.edu/~tanaka/nested-dir.tar.gz") as response,\
    open("nested-dir.tar.gz", "wb") as f:
    shutil.copyfileobj(response, f)

with tarfile.open("nested-dir.tar.gz") as tar:
    tar.extractall()
    
rc = ReplicaCatalog()

# Iterate through the directories adding files
# nested-dir
# ├── cats
# │   └── cat_sounds.txt
# └── dogs
#     └── dog_sounds.txt

in_files = list()
def get_files(d: Path) -> None:
    for p in d.iterdir():
        if p.is_file():
            f = File(p.name)
            in_files.append(f)
            rc.add_replica("local", p.name, str(p.resolve()))
            
            print("Added lfn: {}, pfn: {}".format(
                    p.name,
                    p.resolve()
                ))
        else:
            get_files(p)

get_files(Path("nested-dir"))

rc.write()


tc = TransformationCatalog()\
        .add_transformations(
            Transformation(
                "print_file_contents",
                site="local",
                pfn="/home/scitech/shared-data/sample-deep-lfn-condorio-wf/print_file_contents.py",
                is_stageable=True,
                arch=Arch.X86_64,
                os_type=OS.LINUX
            )
        ).write()

wf = Workflow("nested-dir-tutorial")
wf.add_jobs(
    Job("print_file_contents")
        .add_inputs(*in_files)
        .set_stdout("out.txt")
)


try:
    wf.plan(
        sites=["condorpool"],
        submit=True,
        staging_sites={"condorpool":"stage"}
    )
except PegasusClientError as e:
    print(e.output)


Added lfn: dog_sounds.txt, pfn: /home/scitech/shared-data/sample-deep-lfn-nonsharedfs-wf/nested-dir/dogs/dog_sounds.txt
Added lfn: cat_sounds.txt, pfn: /home/scitech/shared-data/sample-deep-lfn-nonsharedfs-wf/nested-dir/cats/cat_sounds.txt


Plan:
[main] WARN  schema.JsonMetaSchema  - Unknown keyword $defs - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
[main] WARN  schema.JsonMetaSchema  - Unknown keyword additionalItems - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
[main] WARN  schema.JsonMetaSchema  - Unknown keyword examples - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
2020.07.23 16:56:12.788 UTC:    
2020.07.23 16:56:12.793 UTC:   ----------------------------------------------------------------------- 
2020.07.23 16:56:12.799 UTC:   File for submitting this DAG to HTCondor           : nested-dir-tutorial-0.dag.condor.sub 
2020.07.23 16:56:12.804 UTC:   Log of DAGMan debugging messages                 : nested-dir-tutorial-0.dag.dagman.out 
2020.07.23 16:56:12.810 UTC:   Log of HTCondor library output    