Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

oisst test #267

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
63 changes: 63 additions & 0 deletions feedstock/recipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dataclasses import dataclass

import apache_beam as beam
import pandas as pd
import s3fs
import xarray as xr
from beam_pyspark_runner.pyspark_runner import PySparkRunner
from pangeo_forge_ndpyramid.transforms import StoreToPyramid

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.storage import FSSpecTarget
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray

dates = pd.date_range('1981-09-01', '1981-09-03', freq='D')

URL_FORMAT = (
'https://www.ncei.noaa.gov/data/sea-surface-temperature-optimum-interpolation/'
'v2.1/access/avhrr/{time:%Y%m}/oisst-avhrr-v02r01.{time:%Y%m%d}.nc'
)


def make_url(time):
return URL_FORMAT.format(time=time)


time_concat_dim = ConcatDim('time', dates, nitems_per_file=1)
pattern = FilePattern(make_url, time_concat_dim)


# NOTE: target uses the EMR serverless execution role (veda-data-reader-dev)
target_fsspec_kwargs = {'anon': False, 'client_kwargs': {'region_name': 'us-west-2'}}
fs_target = s3fs.S3FileSystem(**target_fsspec_kwargs)
target_root = FSSpecTarget(fs_target, 's3://veda-pforge-emr-outputs-v4')


@dataclass
class SelectSingleZlev(beam.PTransform):
def select_single_zlev(self, ds: xr.Dataset) -> xr.Dataset:
return ds.isel(zlev=0).drop('zlev')

def expand(self, pcoll):
return pcoll | 'Select single zlev' >> beam.MapTuple(
lambda k, v: (k, self.select_single_zlev(v))
)


with beam.Pipeline(runner=PySparkRunner()) as p:
(
p
| beam.Create(pattern.items())
| OpenURLWithFSSpec(fsspec_sync_patch=True)
| OpenWithXarray(file_type=pattern.file_type)
| SelectSingleZlev()
| 'Write Pyramid Levels'
>> StoreToPyramid(
target_root=target_root,
store_name='oisst_pyramid_3_lvl_3_day_pyramid_sync.zarr',
epsg_code='4326',
rename_spatial_dims={'lon': 'longitude', 'lat': 'latitude'},
levels=3,
combine_dims=pattern.combine_dim_keys,
)
)
11 changes: 11 additions & 0 deletions feedstock/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
s3fs
boto3
requests
git+https://github.com/ranchodeluxe/beam-pyspark-runner@patch-2
git+https://github.com/carbonplan/pangeo-forge-ndpyramid
apache-beam==2.53.0
git+https://github.com/pangeo-forge/pangeo-forge-recipes@feature/optional-sync-http
xarray>=2024.1.1
zarr==2.16.1
rioxarray
httpfs_sync
7 changes: 0 additions & 7 deletions recipes/README.md

This file was deleted.