# Earthdata Crawler

Check the following for every collection:
- Presence of cloud opendap URL
- Presence of a DMR++ URL
- Whether DMR++ is openable by VirtualiZarr's DMR++ parser
- Whether DMR++ is downloadable
- (not yet) Quality of dmrpp (are they flattened?)

Note: Borrowed `get_info` from https://github.com/opengeos/NASA-Earth-Data/blob/main/nasa_earth_data.py

In [1]:
import logging
import requests

import earthaccess
import pandas as pd
import virtualizarr as vz
from dask.distributed import LocalCluster, Client
from tqdm.notebook import tqdm


# Handy text formatting codes.
bold_start = '\033[1m'
bold_end = '\033[0m'

In [2]:
_ = earthaccess.login()

In [3]:
%%time
collections = earthaccess.search_datasets(keyword="*")
len(collections)

CPU times: user 2.2 s, sys: 417 ms, total: 2.62 s
Wall time: 32.9 s


9905

In [4]:
def get_info(dataset):
    info = {}
    info["ShortName"] = dataset["umm"]["ShortName"]
    info["EntryTitle"] = dataset["umm"]["EntryTitle"]
    try:
        info["DOI"] = dataset["umm"]["DOI"]["DOI"]
    except:
        info["DOI"] = ""
    info["concept-id"] = dataset["meta"]["concept-id"]
    info["provider-id"] = dataset["meta"]["provider-id"]

    try:
        info["s3-links"] = dataset["meta"]["s3-links"]
    except:
        info["s3-links"] = ""
    # info["granule-count"] = dataset["meta"]["granule-count"]

    try:
        start_time = dataset["umm"]["TemporalExtents"][0]["RangeDateTimes"][0][
            "BeginningDateTime"
        ]
        info["start-time"] = start_time
    except:
        info["start-time"] = ""
        
    try:
        start_time = dataset["umm"]["TemporalExtents"][0]["RangeDateTimes"][0][
            "BeginningDateTime"
        ]
        info["start-time"] = start_time
    except:
        info["start-time"] = ""
        
    try:
        end_time = dataset["umm"]["TemporalExtents"][0]["RangeDateTimes"][0][
            "EndingDateTime"
        ]
        info["end-time"] = end_time
    except:
        info["end-time"] = ""

    if info["DOI"] != "":
        info["Linkage"] = "https://doi.org/" + info["DOI"]
    else:
        info["Linkage"] = ""
    

    return info

## Cycle through collections, checking a single granule for each.

### Set up session, cluster, logging.

In [5]:
fs = earthaccess.get_fsspec_https_session()

In [6]:
if "dask_client" not in locals():
    # cluster = LocalCluster(threads_per_worker=1)
    cluster = LocalCluster(processes=True)
    dask_client = Client(cluster)

dask_client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /user/danielfromearth/proxy/8787/status,

0,1
Dashboard: /user/danielfromearth/proxy/8787/status,Workers: 4
Total threads: 4,Total memory: 3.71 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:35233,Workers: 4
Dashboard: /user/danielfromearth/proxy/8787/status,Total threads: 4
Started: Just now,Total memory: 3.71 GiB

0,1
Comm: tcp://127.0.0.1:42161,Total threads: 1
Dashboard: /user/danielfromearth/proxy/44977/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:43877,
Local directory: /tmp/dask-scratch-space/worker-h9koeau3,Local directory: /tmp/dask-scratch-space/worker-h9koeau3

0,1
Comm: tcp://127.0.0.1:39047,Total threads: 1
Dashboard: /user/danielfromearth/proxy/45991/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:38043,
Local directory: /tmp/dask-scratch-space/worker-zwcnndth,Local directory: /tmp/dask-scratch-space/worker-zwcnndth

0,1
Comm: tcp://127.0.0.1:34525,Total threads: 1
Dashboard: /user/danielfromearth/proxy/43875/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:45111,
Local directory: /tmp/dask-scratch-space/worker-zj73ku6p,Local directory: /tmp/dask-scratch-space/worker-zj73ku6p

0,1
Comm: tcp://127.0.0.1:35419,Total threads: 1
Dashboard: /user/danielfromearth/proxy/36897/status,Memory: 0.93 GiB
Nanny: tcp://127.0.0.1:36857,
Local directory: /tmp/dask-scratch-space/worker-j6eh43__,Local directory: /tmp/dask-scratch-space/worker-j6eh43__


In [7]:
logger = logging.getLogger("distributed.worker")

In [8]:
# dask_client.shutdown()

In [9]:
# There will be many HTTP 404 "Not Found" Errors so let's not print them.
class MyFilter(logging.Filter):
    def filter(self, record):
        # Example: Filter out messages containing "debug"
        return "Error while downloading the file" not in record.getMessage()

logging.getLogger("earthaccess.store").addFilter(MyFilter())

### Main logic is in these functions.

In [10]:
def get_single_granule_from_collection(collection_concept_id: str):
    try:
        a_granule = earthaccess.search_data(
            concept_id=collection_concept_id,
            count=1
        )
    except RuntimeError as err:
        logger.warning(err)
        return None, []
        

    try:
        links = a_granule[0].data_links(access="indirect")
    except IndexError as err:
        logger.warning(err)
        return None, []
    
    return a_granule[0], links

In [11]:
def open_dmrpp_from_url(a_url: str):
    """Try opening, and if not openable, then at least try downloading, a DMR++ file."""
    downloadable = False
    openable = False
    
    dmrpp_url = a_url + ".dmrpp"
    logger.info(f"check_dmrpp_for_a_url for {dmrpp_url}")

    # TEST 1 - Try opening the DMR++ with VirtualiZarr
    try:
        opened_dmrpp = vz.open_virtual_dataset(
            dmrpp_url, 
            filetype="dmrpp", 
            indexes={}, 
            reader_options={"storage_options": fs.storage_options}
        )
        downloadable = True
        openable = True
        logger.info("opened_dmrpp")
        
    except FileNotFoundError as err:
        logger.info("Not downloadable")

    # TEST 2 - Try downloading the DMR++
    except Exception as err:
        logging.info(f"Unexpected error when opening dmrpp: {err}. Trying download.")
        # If opening didn't work, let's just try downloading
        try:
            files = earthaccess.download(dmrpp_url, "./dmrpp/", pqdm_kwargs={"disable": True})
            downloadable = True
            logger.info("downloaded dmrpp")
        except Exception as errh:
            logger.info(f"exception when downloading dmrpp: {errh}.")
        except RuntimeError as err:
            logger.info("Unexpected Failure")
            raise err

    # TEST 3 - Traverse all groups.
    # TODO - walk groups inside the DMR++, and pass each group to virtualizarr.open_virtual_dataset().
            
    return downloadable, openable

In [21]:
def inspect_collection(a_dataset):
    collection_info = get_info(a_dataset)
    concept_id = collection_info['concept-id']

    # Check if a cloud opendap or .dmrpp URL exists.
    url_results = {
        "concept_id": concept_id,
        "provider_id": collection_info['provider-id'],
        "cloud_opendap_url": None,
        "dmrpp_url_in_cmr": None,
        "downloadable_dmrpp": False,
        "openable_dmrpp": False
    }
    
    a_granule, the_get_data_urls = get_single_granule_from_collection(concept_id)
    if a_granule is not None:
        try:
            url_fields = a_granule['umm']['RelatedUrls']
        except KeyError:
            logger.warning("Keys ['umm']['RelatedUrls'] were not found.")
            return collection_info, url_results
            
        for url_field in url_fields:  # Iterate over RelatedUrls in each request step
            url = url_field['URL']
            logger.info(f"url-{url}")
            
            if url.startswith("https://opendap.earthdata.nasa.gov/"):
                url_results["cloud_opendap_url"] = url
                
            elif url.endswith(".dmrpp"):
                url_results["dmrpp_url_in_cmr"] = url
    
            if url in the_get_data_urls:
                downloaded, opened = open_dmrpp_from_url(url)
                if downloaded:
                    url_results["downloadable_dmrpp"] = True
                if opened:
                    url_results["openable_dmrpp"] = True
                
    return collection_info, url_results

### Execution of functions happens here.

In [22]:
collections_to_check = collections[0:10]
num_collections = len(collections_to_check)

In [23]:
futures = dask_client.map(inspect_collection, collections_to_check) # Map the function to a sequence of inputs
inspection_results = dask_client.gather(futures) # Collect the results

In [24]:
# dask_client.get_worker_logs()

In [25]:
expanded_results = []
for result in inspection_results:
    collection_info_dict = result[0]
    url_results_dict = result[1]
    
    expanded_results.append((
        url_results_dict["concept_id"], 
        url_results_dict["provider_id"], 
        url_results_dict["cloud_opendap_url"], 
        url_results_dict["dmrpp_url_in_cmr"], 
        url_results_dict["downloadable_dmrpp"], 
        url_results_dict["openable_dmrpp"]
    ))

results_df = pd.DataFrame.from_dict(expanded_results)
results_df = results_df.rename(columns={
    0: "collection_id", 
    1: "provider_id", 
    2: "cloud_opendap_url", 
    3: "dmrpp_url_in_cmr",
    4: "downloadable_dmrpp", 
    5: "openable_dmrpp"}
)

In [26]:
results_df

Unnamed: 0,collection_id,provider_id,cloud_opendap_url,dmrpp_url_in_cmr,downloadable_dmrpp,openable_dmrpp
0,C2763266360-LPCLOUD,LPCLOUD,,,False,False
1,C1964798938-LAADS,LAADS,,,False,False
2,C2343115666-LPCLOUD,LPCLOUD,,,False,False
3,C2408750690-LPCLOUD,LPCLOUD,https://opendap.earthdata.nasa.gov/collections...,s3://lp-prod-public/EMITL2ARFL.001/EMIT_L2A_RF...,True,True
4,C1996881146-POCLOUD,POCLOUD,https://opendap.earthdata.nasa.gov/providers/P...,,True,True
5,C2202498116-LPCLOUD,LPCLOUD,,,False,False
6,C2271754179-LPCLOUD,LPCLOUD,,,False,False
7,C1711961296-LPCLOUD,LPCLOUD,,,False,False
8,C1748066515-LPCLOUD,LPCLOUD,,,False,False
9,C2278858993-LPCLOUD,LPCLOUD,,,False,False


In [27]:
results_df[results_df["cloud_opendap_url"].notnull()]

Unnamed: 0,collection_id,provider_id,cloud_opendap_url,dmrpp_url_in_cmr,downloadable_dmrpp,openable_dmrpp
3,C2408750690-LPCLOUD,LPCLOUD,https://opendap.earthdata.nasa.gov/collections...,s3://lp-prod-public/EMITL2ARFL.001/EMIT_L2A_RF...,True,True
4,C1996881146-POCLOUD,POCLOUD,https://opendap.earthdata.nasa.gov/providers/P...,,True,True


In [28]:
results_df[results_df["dmrpp_url_in_cmr"].notnull()]["dmrpp_url_in_cmr"].to_list()

['s3://lp-prod-public/EMITL2ARFL.001/EMIT_L2A_RFL_001_20220810T034103_2222203_001/EMIT_L2A_MASK_001_20220810T034103_2222203_001.nc.dmrpp']

In [29]:
num_url_results = results_df.shape[0]

summary_stats = {}

num_cloud_opendap_urls = results_df["cloud_opendap_url"].notnull().sum()
num_non_cloud_opendap_urls = results_df["dmrpp_url_in_cmr"].notnull().sum()
num_downloadable_dmrpp = results_df["downloadable_dmrpp"].sum()
num_openable_dmrpp = results_df["openable_dmrpp"].sum()

fraction_downloadable_collections = num_downloadable_dmrpp / num_collections * 100
fraction_openable_collections = num_openable_dmrpp / num_collections * 100
fraction_downloadable_urls = num_downloadable_dmrpp / num_url_results * 100
fraction_openable_urls = num_openable_dmrpp / num_url_results * 100

print(f"Number cloud opendap URLs: {num_cloud_opendap_urls}")
print(f"Number dmr++ URLs in CMR: {num_non_cloud_opendap_urls}")
print(f"Fraction of downloadable DMR++ files (by appending '.dmrpp') found: {num_downloadable_dmrpp} out of {num_collections} datasets" 
      f"({fraction_downloadable_collections:0.0f} %) and {num_url_results} URLs ({fraction_downloadable_urls:0.0f} %).")
print(f"Fraction of openable DMR++ files (by appending '.dmrpp') found: {num_openable_dmrpp} out of {num_collections} datasets" 
      f"({fraction_openable_collections:0.0f} %) and {num_url_results} URLs ({fraction_openable_urls:0.0f} %).")

Number cloud opendap URLs: 2
Number dmr++ URLs in CMR: 1
Fraction of downloadable DMR++ files (by appending '.dmrpp') found: 2 out of 10 datasets(20 %) and 10 URLs (20 %).
Fraction of openable DMR++ files (by appending '.dmrpp') found: 2 out of 10 datasets(20 %) and 10 URLs (20 %).
