In [None]:
import subprocess
import sys

COLAB = "google.colab" in sys.modules


def _install(package):
    if COLAB:
        ans = input(f"Install { package }? [y/n]:")
        if ans.lower() in ["y", "yes"]:
            subprocess.check_call(
                [sys.executable, "-m", "pip", "install", "--quiet", package]
            )
            print(f"{ package } installed!")


def _colab_install_missing_deps(deps):
    import importlib

    for dep in deps:
        if importlib.util.find_spec(dep) is None:
            if dep == "iris":
                dep = "scitools-iris"
            _install(dep)


deps = ["ckanapi", "geopandas"]


_colab_install_missing_deps(deps)

# Programmatically query the IOOS Data Catalog for a specific observation type

Created: 2024-09-17

Updated: 2025-03-06

Author: [Mathew Biddle](mailto:mathew.biddle@noaa.gov)

In this notebook we highlight the ability to search the [IOOS Data Catalog](https://data.ioos.us/) for a specific subset of observations using the [CKAN](https://ckan.org/) web accessible Application Programming Interface (API). 

For this example, we want to look for observations of oxygen in the water column across the IOOS Catalog. As part of the [IOOS Metadata Profile](https://ioos.github.io/ioos-metadata/), which the US IOOS community uses to publish datasets, we know that each Regional Association and DAC will be following the [Climate and Forecast (CF) Conventions](http://cfconventions.org/) and using CF `standard_names` to describe their datasets. So, with that assumption, we can search across the IOOS Data catalog for datasets with the CF standard names that contain `oxygen` and `sea_water`. Then, we can build a simple map to show the geographical distribution of those datasets.

## Build CKAN API query base.

Uses https://github.com/ckan/ckanapi

In [None]:
from ckanapi import RemoteCKAN


ioos_catalog = RemoteCKAN(
    address="https://data.ioos.us",
    user_agent="ckanapiioos/1.0 (+https://ioos.us/)",
)


ioos_catalog

## What organizations are in the catalog?

Tell me what organizations are there.

In [None]:
orgs = ioos_catalog.action.organization_list()
print(orgs)

## How many datasets are we searching across?

Grab all the datasets available and return the count.

In [None]:
datasets = ioos_catalog.action.package_search()
datasets["count"]

## Grab the most recent applicable CF standard names

Collect [CF standard names](https://cfconventions.org/Data/cf-standard-names/current/build/cf-standard-name-table.html) that contain `oxygen` and `sea_water` from the CF standard name list.

In [None]:
import pandas as pd


url = "https://cfconventions.org/Data/cf-standard-names/current/src/cf-standard-name-table.xml"

tbl_version = pd.read_xml(url, xpath="./*")["version_number"][0].astype(int)
df = pd.read_xml(url, xpath="entry")

std_names = df.loc[
    (df["id"].str.contains("oxygen") & df["id"].str.contains("sea_water"))
]

print(f"CF Standard Name Table: {tbl_version}")

std_names[["id", "description"]]

## Search across IOOS Data Catalog using CKAN API

Search the IOOS Data Catalog for CF standard names that match those above.

In [None]:
import time

from ckanapi import RemoteCKAN
from ckanapi.errors import CKANAPIError

from urllib3.exceptions import IncompleteRead
from requests.exceptions import ChunkedEncodingError


ua = "ckanapiioos/1.0 (+https://ioos.us/)"

ioos_catalog = RemoteCKAN("https://data.ioos.us", user_agent=ua)
ioos_catalog

df_out = pd.DataFrame()

for std_name in std_names["id"]:

    print(std_name)

    fq = f"+cf_standard_names:{std_name}"

    result_count = 0

    df_std_name = pd.DataFrame()
    
    while True:
        try:
            datasets = ioos_catalog.action.package_search(fq=fq, rows=500, start=result_count)
        except (CKANAPIError, IncompleteRead, ChunkedEncodingError):
            continue

        num_results = datasets["count"]

        print(f"num_results: {num_results}, result_count: {result_count}")

        for dataset in datasets["results"]:
            df = pd.DataFrame(
                {
                    "title": [dataset["title"]],
                    "url": [dataset["resources"][0]["url"]],
                    "org": [dataset["organization"]["title"]],
                    "std_name": std_name,
                }
            )

            df_std_name = pd.concat([df_std_name, df], ignore_index=True)
            result_count = df_std_name.shape[0]

        time.sleep(1)

        if result_count >= num_results:
            print(f"num_results: {num_results}, result_count: {result_count}")
            break
            
    df_out = pd.concat([df_out, df_std_name], ignore_index=True)
    
    print(f"num_results: {num_results}, result_count: {result_count}, total_result_count: {df_out.shape[0]}")
    
df_out.shape

## Do some summarizing of the responses

The DataFrame of the matching datasets is quite large. I wonder what the distribution of those datasets across organizations looks like? Let's use [pandas.groupby()](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.groupby.html) to generate some statistics about how many datasets are provided, matching our criteria, by which organization.

In [None]:
df_out.groupby(by="org").count()

## Drop the Glider DAC data

Glider DAC data are already making it to NCEI, so we can drop those entries.

In [None]:
df_out_no_glider = df_out.loc[~df_out["org"].str.contains("Glider DAC")]
df_out_no_glider.groupby(by="org").count()

## Digging into some of the nuances

There are still quite a lot of datasets from each organization. As our search above looked for each CF standard_name across all the datasets, there might be duplicate datasets which have multiple matching CF standard names. ie. one dataset might have both `mass_concentration_of_oxygen_in_sea_water` and `fractional_saturation_of_oxygen_in_sea_water`, but we only need to know that it's one dataset.

As we only need to know about the unique datasets, let's count how many unique dataset urls we have.

In [None]:
df_out_no_glider.groupby(by="url").count()

## Drop duplicate records

As you can see above, there are a lot of duplicate dataset urls which we can simplify down. We identify duplicates by looking at the URL, which should be unique for each dataset, and drop the duplicates.

In [None]:
df_out_nodups_no_glider = df_out_no_glider.drop_duplicates(subset=["url"], keep="last")

df_out_nodups_no_glider

## How many endpoints are not ERDDAP?

Now we have a unique list of datasets which match our CF standard name criteria. Since we have some background in using [ERDDAP to query for data](https://ioos.github.io/ioos_code_lab/content/code_gallery/data_access_notebooks/2017-03-21-ERDDAP_IOOS_Sensor_Map.html), let's take a look at what other endpoints each of the datasets are using.

_Hint: We know ERDDAP systems typically have `erddap` in their urls._

In [None]:
df_out_nodups_no_glider.loc[~df_out_nodups_no_glider["url"].str.contains("erddap")]

## What's the remaining distribution?

This is the distribution of unique datasets found in the IOOS Data Catalog which have a CF Standard Name that contains the work `oxygen` and `sea_water`. We've dropped out the Glider DAC datasets as, theoretically, those are in NCEI already.

In [None]:
df_out_nodups_no_glider.groupby(by="org").count()

## Ingest data

Let's rip through all of the datasets, grab the data as a table (including units) and make a monster dictionary. This takes a bit.

In [None]:
import multiprocessing
from tqdm import tqdm
from urllib.error import HTTPError

import joblib
import stamina


@stamina.retry(on=HTTPError, attempts=3)
def request_df(url):
    """Thin layer to handle retries."""
    return pd.read_csv(url, low_memory=False)


def error_handling_layer(row):
    """Even with stamina we may hit servers that will fail."""
    title = row["title"]
     # Requesting only the position.
    url = f"{row['url']}.csvp?latitude,longitude&distinct()"
    try:
        df = request_df(url)
    except Exception as err:
        msg = f"Failed to fetch {url}. {err}."
        print(msg)
        df = None
    return title, df




n_iter = len(df_out_nodups_no_glider)
num_cores = multiprocessing.cpu_count()

downloads = [
    r for r in tqdm(
        joblib.Parallel(return_as="generator", n_jobs=num_cores, max_nbytes=5000)(
            joblib.delayed(error_handling_layer)(row) for _, row in df_out_nodups_no_glider.iterrows()
        ), total=n_iter)
]


dict_out_final = dict(downloads)

Let's take a quick look at one of the DataFrames.

Transpose it when we print, so we can see all the columns.

In [None]:
dict_out_final[
    '"Deepwater CTD - pe972218.ctd.nc - 29.25N, -87.89W - 1997-03-21"'
].head(5).T

## Let's make a nice map of the distribution of observations

Below we create a mapping function to plot the unique dataset points on a map. Then, we use that function with our full response. We have to do a little reorganizing of the data to build one DataFrame for all the coordinates.

In [None]:
import cartopy.io.shapereader as shpreader
import geopandas as gpd
import matplotlib.pyplot as plt


def make_map(df):
    fig, ax = plt.subplots(figsize=(8, 6))
    shpfilename = shpreader.natural_earth(
        resolution="110m",
        category="cultural",
        name="admin_0_countries",
    )
    countries = gpd.read_file(shpfilename)

    countries[countries["NAME"] == "United States of America"].plot(
        color="lightgrey", ax=ax
    )

    df.plot(
        x="longitude (degrees_east)",
        y="latitude (degrees_north)",
        kind="scatter",
        ax=ax,
    )

    ax.grid(visible=True, alpha=0.5)

    return ax

In [None]:
df_coords = pd.DataFrame(
    columns=["latitude (degrees_north)", "longitude (degrees_east)"]
)

for key in dict_out_final.keys():
    df_coords = pd.concat(
        [
            df_coords,
            dict_out_final[key][
                ["latitude (degrees_north)", "longitude (degrees_east)"]
            ],
        ]
    )

# drop all duplicates
df_coords_clean = df_coords.drop_duplicates(ignore_index=True)

# make the map
make_map(df_coords_clean)

## Lets explore those points on an interactive map

Just for fun, we can us [`geopandas.explore()`](https://geopandas.org/en/stable/docs/reference/api/geopandas.GeoDataFrame.explore.html) to plot these points on an interactive map to browse around.

In [None]:
gdf = gpd.GeoDataFrame(
    df_coords_clean,
    geometry=gpd.points_from_xy(
        df_coords_clean["longitude (degrees_east)"],
        df_coords_clean["latitude (degrees_north)"],
    ),
    crs="EPSG:4326",
)

gdf.explore()

We hope this example demonstrates the flexibility of direct requests to the IOOS Data Catalog CKAN server and all the possibilities it provides. In this notebook we:

* Search the IOOS Data Catalog CKAN API with keywords.
* Found datasets matching our specified criteria.
* Collected all the data from each of the datasets matching our criteria.
* Created a simple map of the distribution of datasets which match our criteria.

To take this one step further, since we collected all the data from each of the datasets (in the dictionary `dict_out_final`) a user could integrate all of the oxygen observations together and start to build a comprehensive dataset. 

Additionally, a user could modify the CKAN query to search for terms outside of the CF standard names to potentially gather more datasets. 