In [None]:
import xarray as xr

In [2]:
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster()
cluster

In [3]:
# from distributed import LocalCluster
# cluster = LocalCluster()
# cluster

In [4]:
from distributed import Client
# client=Client(cluster)
client=Client('tcp://localhost:8789')
client

0,1
Client  Scheduler: tcp://localhost:8789  Dashboard: http://localhost:8787/status,Cluster  Workers: 40  Cores: 80  Memory: 600.00 GB


In [5]:
# If a number of workers get killed or things just seem a bit haywire, or you want to clear the temp memory, 
# it can be useful to run this
# it kills and restarts the python kernels on each worker

# client.restart()

# Demonstrate the usage of intake-thredds

In [6]:
import intake
cat = intake.open_thredds_cat('http://thredds.aodn.org.au/thredds/catalog/IMOS/SRS/OC/gridded/aqua/P1D/catalog.xml')

Pick out a month - the \[''\]() syntax is a little ugly, but it works to return the catalogue objects

In [7]:
month=cat['2007']()['02']()
month.url

'http://thredds.aodn.org.au/thredds/catalog/IMOS/SRS/OC/gridded/aqua/P1D/2007/02/catalog.xml'

# Use Dask to walk through the the THREDDS catalog in parallel

Use dask to traverse the catalog in parallel. This involves creating some delayed functions that are chained together

In [8]:
from dask import delayed
from dask.distributed import get_client

@delayed
def obtain_day_urls(url):
    day_urls = []
    cat = intake.open_thredds_cat(url)
    for filename in cat:
        if 'K_490' in filename: # Filter the filenames - just the light attenuation files ('*K_490*')
            day_urls.append(cat[filename].urlpath)
    return day_urls

@delayed
def obtain_month_urls(url):
    month_urls = []
    cat = intake.open_thredds_cat(url)
    for month in cat:
        day_urls=obtain_day_urls(cat[month].url)
        month_urls = month_urls+day_urls
    return month_urls


This will return a delayed function call (yet to be evaluated) to request the catalogs for the months of each year

In [9]:
futures=[]
for year in cat:
    # if year in ['2014',]: #could filter here
    print(year)
    futures.append(obtain_month_urls(cat[year].url))
futures

2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019


[Delayed('obtain_month_urls-541f2418-97b7-40c9-bf62-e37025e2b494'),
 Delayed('obtain_month_urls-9b6cc6fc-8327-47f2-bb32-1b7623f5f236'),
 Delayed('obtain_month_urls-750ff324-ebc5-44ad-a639-528436c67fed'),
 Delayed('obtain_month_urls-c65720c9-23cb-4ea2-98e1-98fd97fc4b64'),
 Delayed('obtain_month_urls-b74b3e7b-90e3-4f10-8560-8745d103cafb'),
 Delayed('obtain_month_urls-b6c42bd6-a559-492e-a706-84ddf770d75b'),
 Delayed('obtain_month_urls-1ff14604-a98a-42d9-9acf-6a212230bea2'),
 Delayed('obtain_month_urls-7bda8d8d-eee9-4c6d-8afd-bb9c196017d0'),
 Delayed('obtain_month_urls-71e80985-8159-424f-9929-1c533f5b95d9'),
 Delayed('obtain_month_urls-e43841ea-da48-4093-957a-1d422e369dcc'),
 Delayed('obtain_month_urls-40b29213-ae88-4353-ac6b-e6941c3f6a1f'),
 Delayed('obtain_month_urls-a0e7e3be-04f9-45a7-a624-afe6afdb5dc8'),
 Delayed('obtain_month_urls-af1cc414-0d84-4e06-bbb8-78a7875c858a'),
 Delayed('obtain_month_urls-500f6b5a-45ef-441e-a510-086b662ac07b'),
 Delayed('obtain_month_urls-b9b92d9c-a173-49cc-8

Now send these delayed function calls to the dask scheduler to be computed. First to return the months of each year and then the all the files for each month

In [10]:
from dask import compute
month_urls=compute(futures)[0]
day_urls=compute(month_urls)[0]

In [11]:
# day_urls contains a list of lists for each month, concatentate them all together
all_urls = []
for m in day_urls:
    all_urls=all_urls+m
len(all_urls)

6102

Note, now we have the thredds 'dodsC' OpenDAP URLs for each netCDF file, this is whats needed to open the file directly using Xarray

In [12]:
all_urls[0:5]

['http://thredds.aodn.org.au/thredds/dodsC/IMOS/SRS/OC/gridded/aqua/P1D/2002/07/A.P1D.20020706T000000Z.aust.K_490.nc',
 'http://thredds.aodn.org.au/thredds/dodsC/IMOS/SRS/OC/gridded/aqua/P1D/2002/07/A.P1D.20020707T000000Z.aust.K_490.nc',
 'http://thredds.aodn.org.au/thredds/dodsC/IMOS/SRS/OC/gridded/aqua/P1D/2002/07/A.P1D.20020708T000000Z.aust.K_490.nc',
 'http://thredds.aodn.org.au/thredds/dodsC/IMOS/SRS/OC/gridded/aqua/P1D/2002/07/A.P1D.20020709T000000Z.aust.K_490.nc',
 'http://thredds.aodn.org.au/thredds/dodsC/IMOS/SRS/OC/gridded/aqua/P1D/2002/07/A.P1D.20020710T000000Z.aust.K_490.nc']

### Probably good to archive all the URLs somewhere, quicker than hitting the thredds server again

In [13]:
import yaml
# save/load a yaml file with the URLs for later....
with open('K490_url.yml', 'w') as outfile:
    yaml.dump(all_urls, outfile, default_flow_style=False)

In [14]:
# with open('K490_url.yml', 'r') as infile:
#     all_urls=yaml.safe_load(infile)
# all_urls[0:10]

### And download all the files using the available workers

And we can use dask a bit like multiprocessing 'map' and call a delayed function for each URL. Here I am substituting 'fileServer' for dodsC as I want to download the whole file rather than make an OpenDAP request. We can monitor the progress from the dask dashboard link provided above

In [None]:
import urllib.request, os 

@delayed
def download_file_tds(url):
    dest=os.environ['MYSCRATCH'] + '/aqua_files/' + url.split('/')[-1]
    urllib.request.urlretrieve(url.replace('dodsC','fileServer'), dest)
    return dest

# url format to go directly to S3: https://s3-ap-southeast-2.amazonaws.com/imos-data/IMOS/SRS/OC/gridded/aqua/P1D/2002/07/A.P1D.20020705T000000Z.aust.dt.nc
# with 40 workers on Pawsey, the files downloaded at approximately 2GB/s (!)
@delayed
def download_file_s3(url):
    dest=os.environ['MYSCRATCH'] + '/aqua_files/' + url.split('/')[-1]
    urllib.request.urlretrieve(url.replace('http://thredds.aodn.org.au/thredds/dodsC/','https://s3-ap-southeast-2.amazonaws.com/imos-data/'), dest)
    return dest

futures = [download_file_s3(url) for url in all_urls]

files = compute(futures)[0]