In [1]:
import requests
import json
import os
import pandas
import util

In [2]:
TOKEN = ""
BEACON_QUERY_URL = "https://beacon-cora-ts.maris.nl/api/query"

In [3]:
query_parameters = []

# METADATA
query_parameters.append(util.value("BEACON_CORA_TS", alias="SOURCE_BDI"))
query_parameters.append(util.column("@identifier"))
query_parameters.append(util.column("@identifier", alias="SOURCE_BDI_DATASET_ID"))
query_parameters.append(util.column("PLATFORM_NUMBER"))
query_parameters.append(util.column("WMO_INST_TYPE"))
query_parameters.append(util.column("INSTITUTION_EDMO_CODE"))

query_parameters.append(util.column("INSTITUTION_EDMO_CODE", alias="COMMON_EDMO_CODE"))

# Common ODV Tag
query_parameters.append(
    util.function_call("concat", ["@identifier", "PLATFORM_NUMBER"], "COMMON_ODV_TAG")
)

# Time
query_parameters.append(util.column("TIME"))
query_parameters.append(util.column("TIME_QC"))
query_parameters.append(
    util.function_call("to_timestamp_nanos", ["TIME"], "COMMON_TIME")
)
query_parameters.append(util.column("TIME_QC", alias="COMMON_TIME_QC"))

# Location
query_parameters.append(util.column("LONGITUDE"))
query_parameters.append(util.column("LATITUDE"))
query_parameters.append(util.column("POSITION_QC"))

query_parameters.append(util.column("LONGITUDE", alias="COMMON_LONGITUDE"))
query_parameters.append(
    util.column("LONGITUDE.standard_name", alias="COMMON_LONGITUDE_STANDARD_NAME")
)
query_parameters.append(util.column("LONGITUDE.units", alias="COMMON_LONGITUDE_UNITS"))
query_parameters.append(util.value("SDN:P01::ALONZZ01", alias="COMMON_LONGITUDE_P01"))
query_parameters.append(util.value("SDN:P06::DEGE", alias="COMMON_LONGITUDE_P06"))
query_parameters.append(
    util.column("LATITUDE.standard_name", alias="COMMON_LATITUDE_STANDARD_NAME")
)
query_parameters.append(util.column("LATITUDE.units", alias="COMMON_LATITUDE_UNITS"))
query_parameters.append(util.column("LATITUDE", alias="COMMON_LATITUDE"))
query_parameters.append(util.value("SDN:P01::ALATZZ01", alias="COMMON_LATITUDE_P01"))
query_parameters.append(util.value("SDN:P06::DEGN", alias="COMMON_LATITUDE_P06"))

query_parameters.append(util.column("POSITION_QC", alias="COMMON_POSITION_QC"))

# Depth
query_parameters.append(util.column("PRES"))
query_parameters.append(util.column("PRES_QC"))

query_parameters.append(util.column("DEPH"))
query_parameters.append(util.column("DEPH_QC"))

query_parameters.append(
    util.coalesce_columns(
        [
            util.function_call("pressure_to_depth", ["PRES", "LATITUDE"]),
            "DEPH",
        ],
        "COMMON_DEPTH",
    )
)
query_parameters.append(
    util.coalesce_columns(["PRES_QC", "DEPH_QC"], "COMMON_DEPTH_QC")
)

query_parameters.append(util.value("SDN:P01::ADEPZZ", alias="COMMON_DEPTH_P01"))
query_parameters.append(util.value("SDN:P06::ULAA", alias="COMMON_DEPTH_P06"))

query_parameters.append(util.value("depth", alias="COMMON_DEPTH_STANDARD_NAME"))
query_parameters.append(util.value("m", alias="COMMON_DEPTH_UNITS"))

# TEMPERATURE
query_parameters.append(util.column("TEMP"))
query_parameters.append(util.column("TEMP_QC"))
query_parameters.append(util.column("TEMP_ADJUSTED"))
query_parameters.append(util.column("TEMP_ADJUSTED_QC"))
query_parameters.append(
    util.coalesce_columns(["TEMP_ADJUSTED", "TEMP"], "COMMON_TEMPERATURE")
)
query_parameters.append(
    util.coalesce_columns(["TEMP_ADJUSTED_QC", "TEMP_QC"], "COMMON_TEMPERATURE_QC")
)
query_parameters.append(util.value("SDN:P01::TEMPPR01", alias="COMMON_TEMPERATURE_P01"))
query_parameters.append(util.value("SDN:P06::UPAA", alias="COMMON_TEMPERATURE_P06"))
query_parameters.append(
    util.value(
        "Temperature of the water body", alias="COMMON_TEMPERATURE_STANDARD_NAME"
    )
)
query_parameters.append(util.value("degrees_celsius", alias="COMMON_TEMPERATURE_UNITS"))

query_parameters.append(
    util.function_call(
        "map_cora_instrument_l05",
        [util.function_call("btrim", ["WMO_INST_TYPE"], try_cast="Int64")],
        "COMMON_TEMPERATURE_L05",
    )
)

query_parameters.append(
    util.function_call(
        "map_cora_instrument_l22",
        [util.function_call("btrim", ["WMO_INST_TYPE"], try_cast="Int64")],
        "COMMON_TEMPERATURE_L22",
    )
)


# SALINITY

query_parameters.append(util.column("PSAL"))
query_parameters.append(util.column("PSAL_QC"))
query_parameters.append(util.column("PSAL_ADJUSTED"))
query_parameters.append(util.column("PSAL_ADJUSTED_QC"))

query_parameters.append(
    util.coalesce_columns(["PSAL_ADJUSTED", "PSAL"], "COMMON_SALINITY")
)
query_parameters.append(
    util.coalesce_columns(["PSAL_ADJUSTED_QC", "PSAL_QC"], "COMMON_SALINITY_QC")
)

query_parameters.append(util.value("SDN:P01::PSLTZZ01", alias="COMMON_SALINITY_P01"))
query_parameters.append(util.value("SDN:P06::UUUU", alias="COMMON_SALINITY_P06"))
query_parameters.append(
    util.value("Salinity of the water body", alias="COMMON_SALINITY_STANDARD_NAME")
)
query_parameters.append(util.value("Dimensionless", alias="COMMON_SALINITY_UNITS"))

query_parameters.append(
    util.function_call(
        "map_cora_instrument_l05",
        [util.function_call("btrim", ["WMO_INST_TYPE"], try_cast="Int64")],
        "COMMON_SALINITY_L05",
    )
)

query_parameters.append(
    util.function_call(
        "map_cora_instrument_l22",
        [util.function_call("btrim", ["WMO_INST_TYPE"], try_cast="Int64")],
        "COMMON_SALINITY_L22",
    )
)

In [4]:
def build_query(start_time, end_time) -> dict:
    return {
        "select": query_parameters,
        "filters": [
            {
                "column": "TIME",
                "min": start_time,
                "max": end_time,
            },
            {
                "or": [
                    {"is_not_null": {"column": "TEMP"}},
                    {"is_not_null": {"column": "PSAL"}},
                ]
            },
        ],
        "output": {"format": "parquet"},
    }

In [5]:
os.makedirs("data", exist_ok=True)

for year in range(2000, 2001):
    start_time = f"{year}-01-01T00:00:00.000"
    end_time = f"{year}-12-31T23:59:59.999"
    print(f"Downloading {start_time} - {end_time}")
    print(json.dumps(build_query(start_time, end_time)))
    # Download data
    with requests.post(
        BEACON_QUERY_URL,
        json=build_query(start_time, end_time),
        headers={"Authorization": f"Bearer {TOKEN}"},
        stream=True,
    ) as response:
        if response.status_code != 200:
            print(f"Error: {response.status_code}")
            print(response.text)
            exit(1)
        response.raise_for_status()
        with open(f"data/CORA_TS_{year}.parquet", "wb") as f:
            for chunk in response.iter_content(chunk_size=4096 * 128):
                if chunk:
                    f.write(chunk)

Downloading 2000-01-01T00:00:00.000 - 2000-12-31T23:59:59.999
{"select": [{"value": "BEACON_CORA_TS", "alias": "SOURCE_BDI"}, {"column": "@identifier", "alias": null}, {"column": "@identifier", "alias": "SOURCE_BDI_DATASET_ID"}, {"column": "PLATFORM_NUMBER", "alias": null}, {"column": "WMO_INST_TYPE", "alias": null}, {"column": "INSTITUTION_EDMO_CODE", "alias": null}, {"column": "INSTITUTION_EDMO_CODE", "alias": "COMMON_EDMO_CODE"}, {"function": "concat", "args": ["@identifier", "PLATFORM_NUMBER"], "alias": "COMMON_ODV_TAG"}, {"column": "TIME", "alias": null}, {"column": "TIME_QC", "alias": null}, {"function": "to_timestamp_nanos", "args": ["TIME"], "alias": "COMMON_TIME"}, {"column": "TIME_QC", "alias": "COMMON_TIME_QC"}, {"column": "LONGITUDE", "alias": null}, {"column": "LATITUDE", "alias": null}, {"column": "POSITION_QC", "alias": null}, {"column": "LONGITUDE", "alias": "COMMON_LONGITUDE"}, {"column": "LONGITUDE.standard_name", "alias": "COMMON_LONGITUDE_STANDARD_NAME"}, {"column":

In [6]:
import pyarrow.parquet as pq

parquet_file = pq.ParquetFile("data/CORA_TS_2000.parquet")

# Read each row group as a pandas DataFrame
print(parquet_file.num_row_groups)
table = parquet_file.read_row_group(0)
df = table.to_pandas()
df[
    [
        "PLATFORM_NUMBER",
        "COMMON_TIME",
        "COMMON_DEPTH",
        "COMMON_TEMPERATURE",
        "COMMON_SALINITY",
    ]
]

21


Unnamed: 0,PLATFORM_NUMBER,COMMON_TIME,COMMON_DEPTH,COMMON_TEMPERATURE,COMMON_SALINITY
0,FinnMaid,2000-05-05 00:25:05,5.0,5.300000,6.500000
1,FinnMaid,2000-05-05 01:25:00,5.0,5.100000,6.100000
2,FinnMaid,2000-05-05 02:20:05,5.0,5.000000,5.800000
3,FinnMaid,2000-05-05 03:16:59,5.0,4.600000,6.200000
4,FinnMaid,2000-05-05 04:15:04,5.0,5.800000,5.800000
...,...,...,...,...,...
1048571,52084,2000-04-15 05:30:00,50.0,29.129002,34.590000
1048572,52084,2000-04-15 05:30:00,75.0,29.074001,34.755001
1048573,52084,2000-04-15 05:30:00,100.0,29.056002,34.853001
1048574,52084,2000-04-15 05:30:00,125.0,27.440001,34.490002
