## Setup Flow with Prefect Cloud

**Only need to do this one time**

Set backend to cloud (unless running prefect server)

In [1]:
!prefect backend cloud

[32mBackend switched to cloud[0m


Authentification - https://docs.prefect.io/orchestration/tutorial/overview.html#create-an-api-key

In [2]:
from getpass import getpass
prompts = 'Enter your Prefect Cloud API key: '
apikey = getpass(prompt='Enter your Prefect Cloud API key:')

Enter your Prefect Cloud API key: ······················


In [3]:
!touch ~/.prefect/config.toml
!echo -en "[cloud]\nauth_token = \"$apikey\"" >> ~/.prefect/config.toml

Create a project on Prefect Cloud - https://docs.prefect.io/orchestration/tutorial/first.html#creating-a-project

## Create Dask Execution Cluster

In [1]:
import dask
from dask.distributed import Client, LocalCluster
import dask, dask.distributed
dask.config.set({'distributed.dashboard.link': '/proxy/{port}/status'})
import numpy as np

In [2]:
cluster_type = 'local'
#cluster_type = 'HPC'

In [3]:
if cluster_type == 'local':
    dask.config.set({'distributed.dashboard.link': '/proxy/{port}/status'})
    cluster = LocalCluster(n_workers=6)#,threads_per_worker=2)
    cl = Client(cluster)
elif cluster_type == 'HPC':
    import dask_jobqueue as jq
    dask.config.set({'distributed.dashboard.link': '/user/{user}/proxy/{port}/status'})
    partition='brief-low'#,debug,mem,mem-low'
    num_processes = 10
    num_threads_per_processes = 4
    mem = 3.2*num_processes*num_threads_per_processes#*1.25
    n_cores_per_job = num_processes*num_threads_per_processes
    container = 'docker://rowangaffney/data_science_im_rs:latest'
    env = 'py_geo'
    clust = jq.SLURMCluster(queue=partition,
                            processes=num_processes,
                            cores=n_cores_per_job,
                            memory=str(mem)+'GB',
                            interface='ib0',
                            local_directory='$TMPDIR',
                            death_timeout=30,
                            python="singularity -vv exec {} /opt/conda/envs/{}/bin/python".format(container,env),
                            walltime='02:00:00',
                            job_extra=["--output=/dev/null","--error=/dev/null"])
    cl=Client(clust)
    dash_addr = '''/user/{}/proxy/{}/status'''.format(os.environ['USER'],cl.scheduler_info()['services']['dashboard'])
    print('Dask Lab Extention Address (paste into the search box): '+dash_addr)
    
    #Scale Cluster 
    num_jobs=8
    clust.scale(n=num_jobs*num_processes)
else:
    print('Cluster type not defined')
ncpus = int(np.unique(np.array(list(cl.nthreads().values())))[0])
cl

0,1
Client  Scheduler: tcp://127.0.0.1:41697  Dashboard: /proxy/8787/status,Cluster  Workers: 6  Cores: 12  Memory: 24.87 GiB


## Build Flow

In [8]:
#Import Prefect Packages
from prefect import task, Flow, unmapped, Parameter
from prefect.executors import DaskExecutor
import prefect

#Import custom tasks
from prefect_tasks import tasks as t

Matplotlib created a temporary config/cache directory at /tmp/matplotlib-egqcgidy because the default path (/home/jovyan/.config/matplotlib) is not a writable directory; it is highly recommended to set the MPLCONFIGDIR environment variable to a writable directory, in particular to speed up the import of Matplotlib and to better support multiprocessing.


In [9]:
with Flow(name = 'Mosaic_NEON_AOP_Hyper',
          executor=DaskExecutor(address=cl.scheduler_info()['address'])
         ) as flow:
    
    #### Define Parameters ###
    site_p = Parameter('site_p', default = 'CPER')
    processDate_p = Parameter('processDate_p', default = '2017-05')
    
    
    #Task1: Get metadata about the data product for a specific site / date
    site_dict = t.query_data_urls(site=site_p,
                                processDate=processDate_p)
    
    #Task2: Get the URLS for all the h5 files.
    h5_files = t.query_file_urls(site_dict=site_dict,
                               site=site_p,
                               processDate=processDate_p)
    
    #Task3: Setup the BRDF and TOPO correction configurations
    workflow_meta = t.BRDF_TOPO_Config.map(pipeline_dict = h5_files,
                                        site=unmapped(site_p),
                                        processDate=unmapped(processDate_p),
                                        cpus=unmapped(ncpus))
    
    #Task4: Download the files to folder ./{site}_{procossDate}/
    download_res = t.download_file.map(pipeline_dict = workflow_meta,
                                 site=unmapped(site_p),
                                 processDate=unmapped(processDate_p))
    
    #Task5: Get the metadata for each file
    workflow_meta = t.get_file_meta.map(pipeline_dict=download_res)
    
    #Task6: Write the metadata for each file to a human readable file (.json)
    metadata_exported = t.write_pipeline_meta(pipeline_dict = workflow_meta,
                                            site=unmapped(site_p),
                                            processDate=unmapped(processDate_p))
    
    #Task7: Apply the BRDF and TOPO corrections to the data
    ht_pipeline = t.apply_corrections_mosaic.map(pipeline_dict=workflow_meta,
                                               site=unmapped(site_p),
                                               processDate=unmapped(processDate_p))
    
    #Task8: Get the mask for each flight line for mosaicing the flights together
    ht_pipeline2 = t.pixel_mosaic_mask.map(pipeline_dict = ht_pipeline,
                                        pipeline_list=unmapped(ht_pipeline))
    
    #Task9: Get the extents of all the flights for the final mosaic.
    extent = t.moasic_extent(ht_pipeline2)
    
    #Task10: Mosaic the BRDF and Topo corrected flights using pixels to the lowest sensor to-zenith angle
    success = t.mosaic(pipeline_list=ht_pipeline,
                     extents=extent,
                     site=site_p,
                     processDate=processDate_p)

## Register FLow and Start of Local Agent

In [10]:
# Register the flow under the "tutorial" project
flow.register(project_name="Neon_AOP_BRDF_Mosaic")

Flow URL: https://cloud.prefect.io/rowan-data-workflows/flow/a7562972-e524-435d-8076-edb5643e5f07
 └── ID: b585ff3c-3ace-4080-9664-2738b028294e
 └── Project: Neon_AOP_BRDF_Mosaic
 └── Labels: ['a57d20bac99f']


'b585ff3c-3ace-4080-9664-2738b028294e'

In [None]:
!prefect agent local start

[2021-07-01 03:58:53,608] INFO - agent | Registering agent...
[2021-07-01 03:58:53,830] INFO - agent | Registration successful!

 ____            __           _        _                    _
|  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _` |/ _ \ '_ \| __|
|  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
|_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
                                           |___/

[2021-07-01 03:58:54,003] INFO - agent | Starting LocalAgent with labels ['a57d20bac99f']
[2021-07-01 03:58:54,003] INFO - agent | Agent documentation can be found at https://docs.prefect.io/orchestration/
[2021-07-01 03:58:54,003] INFO - agent | Waiting for flow runs...
