Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making appending work in the beam refactor #447

Open
rabernat opened this issue Dec 5, 2022 · 10 comments
Open

Making appending work in the beam refactor #447

rabernat opened this issue Dec 5, 2022 · 10 comments

Comments

@rabernat
Copy link
Contributor

rabernat commented Dec 5, 2022

This issue is for @alxmrs, who indicated some willingness to work on it.

In order to append to existing Zarr datasets, we need to update the StoreToZarr PTransform such that it can handle an existing dataset at the target_url.

That PTransform first figures out the schema of the input data, then pass this to the PrepareZarrTarget schema:

@dataclass
class PrepareZarrTarget(beam.PTransform):
"""From a singleton PCollection containing a dataset schema, initialize a
Zarr store with the correct variables, dimensions, attributes and chunking.
Note that the dimension coordinates will be initialized with dummy values.
:param target_url: Where to store the target Zarr dataset.
:param target_chunks: Dictionary mapping dimension names to chunks sizes.
If a dimension is a not named, the chunks will be inferred from the schema.
If chunking is present in the schema for a given dimension, the length of
the first chunk will be used. Otherwise, the dimension will not be chunked.
"""
target_url: str
target_chunks: Dict[str, int] = field(default_factory=dict)
def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
target = FSSpecTarget.from_url(self.target_url)
store = target.get_mapper()
initialized_target = pcoll | beam.Map(
schema_to_zarr, target_store=store, target_chunks=self.target_chunks
)
return initialized_target

which ultimately calls the function schema_to_zarr

def schema_to_zarr(
schema: XarraySchema,
target_store: zarr.storage.FSStore,
target_chunks: Optional[Dict[str, int]] = None,
) -> zarr.storage.FSStore:
"""Initialize a zarr group based on a schema."""
ds = schema_to_template_ds(schema, specified_chunks=target_chunks)
# using mode="w" makes this function idempotent
ds.to_zarr(target_store, mode="w", compute=False)
return target_store

One way to implement this would be to add an append option to this function. If active, we could try to first open the target zarr dataset and see if it is compatible with the schema (e.g. same variables and dimensions [other than the concat dim]). If so, we could skip initializing / overwriting it. Instead, we would simply have to resize the arrays (e.g. using the zarr resize function). The original zarr dataset size would be used to determine an offset for future writes. (Note there is a tricky edge case if the offset does not evenly divide by the target chunk size.)

The other change that would have to be made is in store_dataset_fragment

def store_dataset_fragment(
item: Tuple[Index, xr.Dataset], target_store: zarr.storage.FSStore
) -> None:
"""Store a piece of a dataset in a Zarr store.
:param item: The index and dataset to be stored
:param target_store: The destination to store in
"""
index, ds = item
zgroup = zarr.open_group(target_store)
# TODO: check that the dataset and the index are compatible
# only store coords if this is the first item in a merge dim
if _is_first_in_merge_dim(index):
for vname, da in ds.coords.items():
# if this variable contains a concat dim, we always store it
possible_concat_dims = [index.find_concat_dim(dim) for dim in da.dims]
if any(possible_concat_dims) or _is_first_item(index):
_store_data(vname, da.variable, index, zgroup)
for vname, da in ds.data_vars.items():
_store_data(vname, da.variable, index, zgroup)

We would need some way to pass the offset (determined in schema_to_zarr as described above) through the pipeline so that the new data is written at the correct location.

xref:

@DarshanSP19
Copy link

@rabernat The first step is fine to add an option append and verify the dimensions and variables. For the other half can we just append the arrays with new data. zarr has already a function for appending the data. Here. Can this be an approach to consider?

@alxmrs
Copy link
Contributor

alxmrs commented Aug 18, 2023

Darshan is working on an implementation of this feature. However, it occurs to us that this may be made more complicated by the existence of consolidated dimensions (like time): #556 (comment)

@rabernat or @cisaacstern: Any pointers or ideas on how the two features could be compatible?

@cisaacstern
Copy link
Member

@alxmrs & @DarshanSP19 thanks for looking into this!

As a naive starting place, could we just say that appending is incompatible with consolidated dims? And then deal with the more complex case of appending + consolidated dims in a follow-on PR?

@cisaacstern
Copy link
Member

@alxmrs & @DarshanSP19, before we get too deep into implementation here, @rabernat requested that we do a design review. Could I ask we all fill out the following when2meet with availability: https://www.when2meet.com/?21157486-cpxSV ?

Note the poll covers:

  • ✅ Week of Mon Sept 4 (next week)
  • Week of Mon Sept 11 (Ryan and I will be at a conference)
  • ✅ Week of Mon Sept 18
  • ✅ Week of Mon Sept 25

@DarshanSP19 I'm not sure what timezone you are in, if the time ranges in the poll need to be adjusted please let me know.

@cisaacstern
Copy link
Member

Quick re-ping for @rabernat and @DarshanSP19 to complete the when2meet.

I was going to suggest we just meet about this during the Pangeo Forge Coordination Meeting at 11am ET on Sept 25, but based your when2meet response looks like you are not available then, @alxmrs?

@alxmrs
Copy link
Contributor

alxmrs commented Sep 8, 2023 via email

@alxmrs
Copy link
Contributor

alxmrs commented Sep 15, 2023

Charles, would you mind sending out another When2Meet with earlier times as options? I'd like to better accommodate Darshan's availability (IST time zone).

@cisaacstern
Copy link
Member

cisaacstern commented Sep 21, 2023

Apologies for the delay on this. Here is a new poll, with times available in the range of:

  • PT 6am - 10am
  • ET 9am - 1pm
  • IST 6:30pm - 10:30pm

@alxmrs & @DarshanSP19, does this look workable?

@alxmrs
Copy link
Contributor

alxmrs commented Sep 22, 2023

Looks great to me! Thanks for your flexibility in times. I submitted my availability.

@cisaacstern
Copy link
Member

Thanks all for filling out the poll. I've emailed an invite (to Alex and Ryan):

Design Review - Pangeo Forge Zarr Appending
Thursday, October 5 · 10:00 – 11:00am
Time zone: America/New_York
Google Meet joining info
Video call link: https://meet.google.com/rzz-tmeb-hxa
Or dial: ‪(US) +1 470-228-6632‬ PIN: ‪884 316 756‬#

@DarshanSP19 could you share your email so I can add you to the invite? Either here or cstern@ldeo.columbia.edu.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants