From 66e0f6a2b169079c2baf9438d25d721ab5d038ca Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Fri, 21 Nov 2025 14:07:41 +0100 Subject: [PATCH 1/8] Adding LazyUDF --- caterva2/client.py | 36 +++++++++++++++++++++++----------- caterva2/models.py | 7 +++++-- caterva2/services/server.py | 39 ++++++++++++++++++++++++------------- 3 files changed, 55 insertions(+), 27 deletions(-) diff --git a/caterva2/client.py b/caterva2/client.py index 6343342b..9c4de674 100644 --- a/caterva2/client.py +++ b/caterva2/client.py @@ -9,6 +9,7 @@ import ast import functools +import inspect import io import pathlib import sys @@ -1542,7 +1543,7 @@ def lazyexpr(self, name, expression, operands=None, compute=False): ) return pathlib.PurePosixPath(dataset) - def upload_lazyexpr(self, remotepath, expression, compute=False): + def upload_lazyarr(self, remotepath, expression, compute=False): """ Creates a lazy expression dataset. @@ -1568,21 +1569,34 @@ def upload_lazyexpr(self, remotepath, expression, compute=False): Path of the created dataset. """ urlbase, remotepath = _format_paths(self.urlbase, remotepath) - if not isinstance(expression, blosc2.LazyExpr): - raise ValueError("argument ``expression`` must be blosc2.LazyExpr instance.") - operands = expression.operands + operands = expression.operands if hasattr(expression, "operands") else expression.inputs_dict if operands is not None: operands = {k: str(v) for k, v in operands.items()} else: operands = {} - expr = { - "name": None, - "expression": expression.expression, - "operands": operands, - "compute": compute, - } + if isinstance(expression, blosc2.LazyExpr): + expr = { + "name": None, + "expression": expression.expression, + "func": None, + "operands": operands, + "dtype": str(expression.dtype), + "shape": expression.shape, + "compute": compute, + } + elif isinstance(expression, blosc2.LazyUDF): + expr = { + "name": expression.func.__name__, + "expression": None, + "func": inspect.getsource(expression.func), + "operands": operands, + "dtype": str(expression.dtype), + "shape": expression.shape, + "compute": compute, + } + dataset = self._post( - f"{self.urlbase}/api/upload_lazyexpr/{remotepath}", + f"{self.urlbase}/api/upload_lazyarr/{remotepath}", expr, auth_cookie=self.cookie, timeout=self.timeout, diff --git a/caterva2/models.py b/caterva2/models.py index 62087179..9f86ea2b 100644 --- a/caterva2/models.py +++ b/caterva2/models.py @@ -74,10 +74,13 @@ class LazyArray(pydantic.BaseModel): mtime: datetime.datetime | None -class Cat2LazyExpr(pydantic.BaseModel): +class Cat2LazyArr(pydantic.BaseModel): name: str | None - expression: str + expression: str | None + func: str | None operands: dict[str, str] + dtype: str + shape: tuple compute: bool diff --git a/caterva2/services/server.py b/caterva2/services/server.py index 7570aace..3df6db5a 100644 --- a/caterva2/services/server.py +++ b/caterva2/services/server.py @@ -34,6 +34,7 @@ import markdown import nbconvert import nbformat +import numpy as np import PIL.Image import pygments import uvicorn @@ -656,11 +657,8 @@ async def get_chunk( def make_expr( - name: str | None, - expr: str, - operands: dict[str, str], + expr: models.Cat2LazyArr, user: db.User, - compute: bool = False, remotepath: pathlib.Path | None = None, ) -> str: """ @@ -692,7 +690,10 @@ def make_expr( raise srv_utils.raise_unauthorized("Creating lazy expressions requires authentication") # Parse expression - expr = expr.strip() + name = expr.name + operands = expr.operands + compute = expr.compute + expr = expr.expression if not expr or (not remotepath and not name): raise ValueError("name/remotepath and expression should not be empty") vars = blosc2.get_expr_operands(expr) @@ -706,8 +707,20 @@ def make_expr( abspath = get_writable_path(path, user) var_dict[var] = open_b2(abspath, path) - # Create the lazy expression dataset - arr = blosc2.lazyexpr(expr, var_dict) + if hasattr(expr, "func"): + local_ns = {} + exec(expr.func, {"np": np, "blosc2": blosc2}, local_ns) + + if name not in local_ns or not isinstance(local_ns[name], typing.types.FunctionType): + raise ValueError(f"User code must define a function called {name}") + arr = blosc2.lazyudf( + local_ns[name], tuple(var_dict[f"o{i}"] for i in range(len(var_dict))), expr.dtype, expr.shape + ) + + else: + expr = expr.strip() + # Create the lazy expression dataset + arr = blosc2.lazyexpr(expr, var_dict) # Handle name or path if name is None: # provided a path @@ -735,10 +748,10 @@ def make_expr( return path -@app.post("/api/upload_lazyexpr/{path:path}") -async def upload_lazyexpr( +@app.post("/api/upload_lazyarr/{path:path}") +async def upload_lazyarr( path: pathlib.Path, - expr: models.Cat2LazyExpr, + expr: models.Cat2LazyArr, user: db.User = Depends(current_active_user), ) -> str: """ @@ -754,10 +767,8 @@ async def upload_lazyexpr( str The path of the newly created (or overwritten) dataset. """ - if expr.name is not None: - raise ValueError("Cannot provide name and path.") try: - result_path = make_expr(expr.name, expr.expression, expr.operands, user, expr.compute, path) + result_path = make_expr(expr, user, path) except (SyntaxError, ValueError, TypeError) as exc: raise srv_utils.raise_bad_request(f"Invalid name or expression: {exc}") from exc except KeyError as ke: @@ -771,7 +782,7 @@ async def upload_lazyexpr( @app.post("/api/lazyexpr/") async def lazyexpr( - expr: models.Cat2LazyExpr, + expr: models.Cat2LazyArr, user: db.User = Depends(current_active_user), ) -> str: """ From 588448932379f3350c4a014851c7c5391b330b33 Mon Sep 17 00:00:00 2001 From: Luke Shaw Date: Fri, 21 Nov 2025 16:07:15 +0100 Subject: [PATCH 2/8] LazyUDF working for compute=True --- caterva2/client.py | 4 +++- caterva2/services/server.py | 26 +++++++++++--------------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/caterva2/client.py b/caterva2/client.py index 9c4de674..d492caa4 100644 --- a/caterva2/client.py +++ b/caterva2/client.py @@ -1554,7 +1554,7 @@ def upload_lazyarr(self, remotepath, expression, compute=False): ---------- remotepath : str Path to save the lazy expression to. - expression : blosc2.LazyExpr + expression : blosc2.LazyExpr | blosc2.LazyUDF Expression to be evaluated. operands : dict Mapping of variables in the expression to their corresponding dataset paths. @@ -1594,6 +1594,8 @@ def upload_lazyarr(self, remotepath, expression, compute=False): "shape": expression.shape, "compute": compute, } + else: + raise ValueError("expr must be instance of blosc2.LazyUDF or blosc2.LazyExpr.") dataset = self._post( f"{self.urlbase}/api/upload_lazyarr/{remotepath}", diff --git a/caterva2/services/server.py b/caterva2/services/server.py index 3df6db5a..4f46ba4b 100644 --- a/caterva2/services/server.py +++ b/caterva2/services/server.py @@ -685,32 +685,28 @@ def make_expr( str The path of the newly created (or overwritten) dataset. """ - if not user: raise srv_utils.raise_unauthorized("Creating lazy expressions requires authentication") # Parse expression name = expr.name - operands = expr.operands + vars = expr.operands + func = expr.func compute = expr.compute - expr = expr.expression - if not expr or (not remotepath and not name): + expression = expr.expression + if (not expression and not func) or (not remotepath and not name): raise ValueError("name/remotepath and expression should not be empty") - vars = blosc2.get_expr_operands(expr) - # Open expression datasets var_dict = {} - for var in vars: - path = operands[var] + for var, path in vars.items(): # Detect special roots path = pathlib.Path(path) abspath = get_writable_path(path, user) var_dict[var] = open_b2(abspath, path) - if hasattr(expr, "func"): + if func is not None: local_ns = {} - exec(expr.func, {"np": np, "blosc2": blosc2}, local_ns) - + exec(func, {"np": np, "blosc2": blosc2}, local_ns) if name not in local_ns or not isinstance(local_ns[name], typing.types.FunctionType): raise ValueError(f"User code must define a function called {name}") arr = blosc2.lazyudf( @@ -718,9 +714,11 @@ def make_expr( ) else: - expr = expr.strip() + expression = expression.strip() # Create the lazy expression dataset - arr = blosc2.lazyexpr(expr, var_dict) + arr = blosc2.lazyexpr(expression, var_dict) + if any(method in arr.expression for method in linalg_funcs): + compute = True # Handle name or path if name is None: # provided a path @@ -738,8 +736,6 @@ def make_expr( abspath.mkdir(exist_ok=True, parents=True) - if any(method in expr for method in linalg_funcs): - compute = True if compute: arr.compute(urlpath=urlpath, mode="w") else: From 32293f3fa2efb6349981b24e203227704958c42f Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Mon, 24 Nov 2025 08:58:43 +0100 Subject: [PATCH 3/8] Resolve failing lazyexpr test --- caterva2/client.py | 10 +++++++++- caterva2/models.py | 4 ++-- caterva2/services/server.py | 2 +- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/caterva2/client.py b/caterva2/client.py index 9c4de674..d0f45ae8 100644 --- a/caterva2/client.py +++ b/caterva2/client.py @@ -1537,7 +1537,15 @@ def lazyexpr(self, name, expression, operands=None, compute=False): operands = {k: str(v) for k, v in operands.items()} else: operands = {} - expr = {"name": name, "expression": expression, "operands": operands, "compute": compute} + expr = { + "name": name, + "expression": expression, + "func": None, + "operands": operands, + "dtype": None, # calculated server-side + "shape": None, # calculated server-side + "compute": compute, + } dataset = self._post( f"{self.urlbase}/api/lazyexpr/", expr, auth_cookie=self.cookie, timeout=self.timeout ) diff --git a/caterva2/models.py b/caterva2/models.py index 9f86ea2b..bc0c8973 100644 --- a/caterva2/models.py +++ b/caterva2/models.py @@ -79,8 +79,8 @@ class Cat2LazyArr(pydantic.BaseModel): expression: str | None func: str | None operands: dict[str, str] - dtype: str - shape: tuple + dtype: str | None + shape: tuple | None compute: bool diff --git a/caterva2/services/server.py b/caterva2/services/server.py index 3df6db5a..ef460cd6 100644 --- a/caterva2/services/server.py +++ b/caterva2/services/server.py @@ -800,7 +800,7 @@ async def lazyexpr( """ try: - result_path = make_expr(expr.name, expr.expression, expr.operands, user, expr.compute) + result_path = make_expr(expr, user) except (SyntaxError, ValueError, TypeError) as exc: raise srv_utils.raise_bad_request(f"Invalid name or expression: {exc}") from exc except KeyError as ke: From 1a011646b912dfdbe826b5f0922ae65db262100c Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Mon, 24 Nov 2025 13:18:18 +0100 Subject: [PATCH 4/8] Standardise upload funcs --- caterva2/client.py | 162 ++++++++++++++++++--------------------------- 1 file changed, 63 insertions(+), 99 deletions(-) diff --git a/caterva2/client.py b/caterva2/client.py index 5d3454bb..3088cae9 100644 --- a/caterva2/client.py +++ b/caterva2/client.py @@ -240,9 +240,7 @@ def upload(self, local_dset, remotepath=None): remotepath = pathlib.PurePosixPath(self.name) / local_dset else: remotepath = pathlib.PurePosixPath(self.name) / pathlib.PurePosixPath(remotepath) - uploadpath = self.client.upload(local_dset, remotepath) - # Remove the first component of the upload path (the root name) and return a new File/Dataset - return self[str(uploadpath.relative_to(self.name))] + return self.client.upload(local_dset, remotepath) def load_from_url(self, urlpath, remotepath=None): """ @@ -266,9 +264,7 @@ def load_from_url(self, urlpath, remotepath=None): remotepath = pathlib.PurePosixPath(self.name) / urlpath else: remotepath = pathlib.PurePosixPath(self.name) / pathlib.PurePosixPath(remotepath) - uploadpath = self.client.load_from_url(urlpath, remotepath) - # Remove the first component of the upload path (the root name) and return a new File/Dataset - return self[str(uploadpath.relative_to(self.name))] + return self.client.load_from_url(urlpath, remotepath) class File: @@ -682,8 +678,8 @@ def append(self, data): Returns ------- - tuple - The new shape of the dataset. + out: Caterva2.Dataset + A pointer to the (modified) dataset. Examples -------- @@ -754,6 +750,16 @@ def __init__(self, urlbase, auth=None, timeout=5): ) def close(self): + """ + Close httpx.Client instance associated with Caterva2 Client. + + Parameters + ---------- + + Returns + ------- + None + """ self.httpx_client.close() def __enter__(self): @@ -930,12 +936,12 @@ def get_roots(self): def get(self, path): """ - Returns an object for the given path. + Returns an object for the given path or object. Parameters ---------- - path : Path - Path to the root, file or dataset. + path : Path | Dataset | File | Root + Either the desired object, or Path to the root, file or dataset. Returns ------- @@ -956,6 +962,8 @@ def get(self, path): >>> ds[:10] array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) """ + if isinstance(path, (File, Dataset, Root)): + return path # Normalize the path to a POSIX path path = pathlib.PurePosixPath(path).as_posix() # Check if the path is a root or a file/dataset @@ -1197,7 +1205,7 @@ def download(self, dataset, localpath=None): Returns ------- Path - The path to the downloaded file. + The path to the downloaded file on local disk. Examples -------- @@ -1224,12 +1232,17 @@ def _upload_file(self, local_dset, remotepath, urlbase, auth_cookie=None): headers = {"Cookie": auth_cookie} if auth_cookie else None if isinstance(local_dset, (str, pathlib.Path)): + if local_dset.suffix == ".b2nd": + obj = blosc2.open(local_dset) + if isinstance(obj, blosc2.LazyArray): # handle LazyArrays saved on-disk + return self._upload_lazyarr(remotepath, obj) with open(local_dset, "rb") as f: response = client.post(url, files={"file": f}, headers=headers) response.raise_for_status() else: - if isinstance(local_dset, blosc2.LazyExpr): - return self.upload_lazyexpr(remotepath, local_dset).path + if isinstance(local_dset, blosc2.LazyArray): + return self._upload_lazyarr(remotepath, local_dset) + # in-memory object ndarray = ( blosc2.asarray(local_dset) if hasattr(local_dset, "shape") @@ -1239,7 +1252,8 @@ def _upload_file(self, local_dset, remotepath, urlbase, auth_cookie=None): f = io.BytesIO(cframe) response = client.post(url, files={"file": f}, headers=headers) response.raise_for_status() - return pathlib.PurePosixPath(response.json()) + path = pathlib.PurePosixPath(response.json()) + return self.get(path) # return reference to object def upload(self, local_dset, remotepath): """ @@ -1258,8 +1272,8 @@ def upload(self, local_dset, remotepath): Returns ------- - Path - Path of the uploaded file on the server. + Object : File, Dataset + Object representing the file or dataset. Examples -------- @@ -1287,7 +1301,8 @@ def _load_from_url(self, urlpath, remotepath, urlbase, auth_cookie=None): headers = {"Cookie": auth_cookie} if auth_cookie else None response = client.post(url, data={"remote_url": urlpath}, headers=headers) response.raise_for_status() - return pathlib.PurePosixPath(response.json()) + path = pathlib.PurePosixPath(response.json()) + return self.get(path) # return reference to object def load_from_url(self, urlpath, dataset): """ @@ -1302,8 +1317,8 @@ def load_from_url(self, urlpath, dataset): Returns ------- - Path - Path of the uploaded file on the server. + Object : File, Dataset + Object representing the file or dataset. """ urlbase, _ = _format_paths(self.urlbase) return self._load_from_url( @@ -1326,8 +1341,8 @@ def append(self, remotepath, data): Returns ------- - tuple - The new shape of the dataset. + out: Dataset + Object representing the modified dataset. Examples -------- @@ -1350,13 +1365,22 @@ def append(self, remotepath, data): ndarray = blosc2.asarray(array) cframe = ndarray.to_cframe() file = io.BytesIO(cframe) + old_shape = self.get(remotepath).shape + append_shape = array.shape + loc_shape = (old_shape[0] + append_shape[0],) + old_shape[1:] client = self.httpx_client url = f"{self.urlbase}/api/append/{remotepath}" headers = {"Cookie": self.cookie} response = client.post(url, files={"file": file}, headers=headers, timeout=self.timeout) response.raise_for_status() - return tuple(response.json()) + new_shape = tuple(response.json()) + if loc_shape == new_shape: + return self.get(remotepath) + else: + raise RuntimeError( + f"Append shape incorrect: server-side shape is {new_shape} but should be {loc_shape}." + ) def unfold(self, remotepath): """ @@ -1372,8 +1396,8 @@ def unfold(self, remotepath): Returns ------- - Path - The path of the unfolded dataset. + out : str + Root of the unfolded dataset. Examples -------- @@ -1388,7 +1412,7 @@ def unfold(self, remotepath): result = self._post( f"{self.urlbase}/api/unfold/{path}", auth_cookie=self.cookie, timeout=self.timeout ) - return PurePosixPath(result) + return PurePosixPath(result) # return path to top directory of dset def remove(self, path): """ @@ -1424,7 +1448,7 @@ def remove(self, path): result = self._post( f"{self.urlbase}/api/remove/{path}", auth_cookie=self.cookie, timeout=self.timeout ) - return pathlib.PurePosixPath(result) + return pathlib.PurePosixPath(result) # path from which file removed def move(self, src, dst): """ @@ -1439,8 +1463,8 @@ def move(self, src, dst): Returns ------- - Path - New path of the moved dataset or directory. + Object : Dataset, File + Reference to object in new location. Examples -------- @@ -1464,7 +1488,8 @@ def move(self, src, dst): auth_cookie=self.cookie, timeout=self.timeout, ) - return pathlib.PurePosixPath(result) + path = pathlib.PurePosixPath(result) + return self.get(path) # get reference to object def copy(self, src, dst): """ @@ -1479,8 +1504,8 @@ def copy(self, src, dst): Returns ------- - Path - New path of the copied dataset or directory. + Object : Dataset, File + Reference to copied object in copy location. Examples -------- @@ -1507,67 +1532,10 @@ def copy(self, src, dst): auth_cookie=self.cookie, timeout=self.timeout, ) - return pathlib.PurePosixPath(result) - - def lazyexpr(self, name, expression, operands=None, compute=False): - """ - Creates a lazy expression dataset in personal space. - - A dataset with the specified name will be created or overwritten if already - exists. - - Parameters - ---------- - name : str - Name of the dataset to be created (without extension). - expression : str - Expression to be evaluated, which must yield a lazy expression. - operands : dict - Mapping of variables in the expression to their corresponding dataset paths. - compute : bool, optional - If false, generate lazyexpr and do not compute anything. - If true, compute lazy expression on creation and save (full) result. - Default false. - - Returns - ------- - Path - Path of the created dataset. - - Examples - -------- - >>> import caterva2 as cat2 - >>> import numpy as np - >>> # To create a lazyexpr you need to be a registered used - >>> client = cat2.Client('https://cat2.cloud/demo', ("joedoe@example.com", "foobar")) - >>> src_path = f'@personal/dir{np.random.randint(0, 100)}/ds-4d.b2nd' - >>> path = client.upload('root-example/dir1/ds-2d.b2nd', src_path) - >>> client.lazyexpr('example-expr', 'a + a', {'a': path}) - PurePosixPath('@personal/example-expr.b2nd') - >>> 'example-expr.b2nd' in client.get_list('@personal') - True - """ - urlbase, _ = _format_paths(self.urlbase) - # Convert possible Path objects in operands to strings so that they can be serialized - if operands is not None: - operands = {k: str(v) for k, v in operands.items()} - else: - operands = {} - expr = { - "name": name, - "expression": expression, - "func": None, - "operands": operands, - "dtype": None, # calculated server-side - "shape": None, # calculated server-side - "compute": compute, - } - dataset = self._post( - f"{self.urlbase}/api/lazyexpr/", expr, auth_cookie=self.cookie, timeout=self.timeout - ) - return pathlib.PurePosixPath(dataset) + path = pathlib.PurePosixPath(result) + return self.get(path) # get reference to object - def upload_lazyarr(self, remotepath, expression, compute=False): + def _upload_lazyarr(self, remotepath, expression, compute=False): """ Creates a lazy expression dataset. @@ -1589,8 +1557,8 @@ def upload_lazyarr(self, remotepath, expression, compute=False): Returns ------- - Path - Path of the created dataset. + Object: Dataset + Pointer to server-hosted lazy dataset. """ urlbase, remotepath = _format_paths(self.urlbase, remotepath) operands = expression.operands if hasattr(expression, "operands") else expression.inputs_dict @@ -1628,11 +1596,7 @@ def upload_lazyarr(self, remotepath, expression, compute=False): timeout=self.timeout, ) path = pathlib.PurePosixPath(dataset).as_posix() - - # Return file/dataset object - root_name, file_path = path.split("/", 1) - root = Root(self, root_name) - return root[file_path] + return self.get(path) # return reference to object def adduser(self, newuser, password=None, superuser=False): """ From 59ee761b1a3b541fcad5905e2dbf5ee6220cefb2 Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Mon, 24 Nov 2025 16:01:15 +0100 Subject: [PATCH 5/8] Rectified tests --- caterva2/client.py | 44 +++++-- caterva2/tests/test_api.py | 200 ++++++++++++++---------------- caterva2/tests/test_hdf5_proxy.py | 7 +- 3 files changed, 128 insertions(+), 123 deletions(-) diff --git a/caterva2/client.py b/caterva2/client.py index 3088cae9..199a4064 100644 --- a/caterva2/client.py +++ b/caterva2/client.py @@ -200,7 +200,7 @@ def __len__(self): def __str__(self): return self.name - def upload(self, local_dset, remotepath=None): + def upload(self, local_dset, remotepath=None, compute=None): """ Uploads a local file to this root. @@ -211,6 +211,8 @@ def upload(self, local_dset, remotepath=None): remotepath : Path, optional Remote path where the file will be uploaded. If not provided, the file will be uploaded to the top level of this root. + compute: None | bool + For LazyArray objects, whether to compute the result eagerly or not. Returns ------- @@ -240,7 +242,7 @@ def upload(self, local_dset, remotepath=None): remotepath = pathlib.PurePosixPath(self.name) / local_dset else: remotepath = pathlib.PurePosixPath(self.name) / pathlib.PurePosixPath(remotepath) - return self.client.upload(local_dset, remotepath) + return self.client.upload(local_dset, remotepath, compute) def load_from_url(self, urlpath, remotepath=None): """ @@ -1005,7 +1007,7 @@ def get_info(self, path): Parameters ---------- - path : str + path : str | Dataset | File Path to the dataset. Returns @@ -1024,6 +1026,8 @@ def get_info(self, path): >>> info['shape'] [100, 200] """ + if isinstance(path, (Dataset, File)): + path = path.path urlbase, path = _format_paths(self.urlbase, path) return self._get(f"{self.urlbase}/api/info/{path}", auth_cookie=self.cookie, timeout=self.timeout) @@ -1033,8 +1037,8 @@ def fetch(self, path, slice_=None): Parameters ---------- - path : str - Path to the dataset. + path : str | Dataset + Path or reference to the dataset. slice_ : int, slice, tuple of ints and slices, or None Specifies the slice to fetch. If None, the whole dataset is fetched. @@ -1060,6 +1064,8 @@ def get_slice(self, path, key=None, as_blosc2=True, field=None): Parameters ---------- + path: str, Dataset, File + Desired object to slice. key : int, slice, sequence of slices or str The slice to retrieve. If a single slice is provided, it will be applied to the first dimension. If a sequence of slices is @@ -1086,6 +1092,8 @@ def get_slice(self, path, key=None, as_blosc2=True, field=None): [(1.0000500e-02, 1.0100005), (1.0050503e-02, 1.0100505)]], dtype=[('a', '>> info_schunk['chunksize'] / len(chunk) 6.453000645300064 """ + if isinstance(path, Dataset): + path = path.path urlbase, path = _format_paths(self.urlbase, path) data = self._xget( f"{self.urlbase}/api/chunk/{path}", @@ -1226,22 +1236,29 @@ def download(self, dataset, localpath=None): path = localpath return self._download_url(url, str(path), auth_cookie=self.cookie) - def _upload_file(self, local_dset, remotepath, urlbase, auth_cookie=None): + def _upload_file(self, local_dset, remotepath, urlbase, auth_cookie=None, compute=None): client = self.httpx_client url = f"{urlbase}/api/upload/{remotepath}" headers = {"Cookie": auth_cookie} if auth_cookie else None if isinstance(local_dset, (str, pathlib.Path)): - if local_dset.suffix == ".b2nd": + suffx = local_dset.suffix if hasattr(local_dset, "suffix") else local_dset[-5:] + if suffx == ".b2nd": obj = blosc2.open(local_dset) if isinstance(obj, blosc2.LazyArray): # handle LazyArrays saved on-disk - return self._upload_lazyarr(remotepath, obj) + compute = False if compute is None else compute + return self._upload_lazyarr(remotepath, obj, compute=compute) + if compute is not None: + raise ValueError("compute argument cannot be specified for non-LazyArray objects.") with open(local_dset, "rb") as f: response = client.post(url, files={"file": f}, headers=headers) response.raise_for_status() else: if isinstance(local_dset, blosc2.LazyArray): - return self._upload_lazyarr(remotepath, local_dset) + compute = False if compute is None else compute + return self._upload_lazyarr(remotepath, local_dset, compute=compute) + if compute is not None: + raise ValueError("compute argument cannot be specified for non-LazyArray objects.") # in-memory object ndarray = ( blosc2.asarray(local_dset) @@ -1255,7 +1272,7 @@ def _upload_file(self, local_dset, remotepath, urlbase, auth_cookie=None): path = pathlib.PurePosixPath(response.json()) return self.get(path) # return reference to object - def upload(self, local_dset, remotepath): + def upload(self, local_dset, remotepath, compute=None): """ Uploads a local dataset to a remote repository. @@ -1269,6 +1286,8 @@ def upload(self, local_dset, remotepath): Path to the local dataset or an in-memory object (convertible to blosc2.SChunk). remotepath : Path Remote path to upload the dataset to. + compute: None | bool + For LazyArray objects, boolean flag indicating whether to compute the result eagerly or not. Returns ------- @@ -1292,6 +1311,7 @@ def upload(self, local_dset, remotepath): remotepath, urlbase, auth_cookie=self.cookie, + compute=compute, ) def _load_from_url(self, urlpath, remotepath, urlbase, auth_cookie=None): diff --git a/caterva2/tests/test_api.py b/caterva2/tests/test_api.py index cf152531..c61b53c8 100644 --- a/caterva2/tests/test_api.py +++ b/caterva2/tests/test_api.py @@ -145,16 +145,16 @@ def test_move(auth_client, dirpath, final_dir, fill_auth): new_fname = f"{dirpath}" if dirpath else "" else: new_fname = f"{dirpath}/{fname}" if dirpath else fname - newpath = file.move(f"{myshared.name}/{new_fname}") + newobj = file.move(f"{myshared.name}/{new_fname}") assert fname not in mypublic if final_dir: basename = fname.split("/")[-1] new_path = f"{new_fname}/{basename}" if dirpath else basename - assert str(newpath) == f"{myshared.name}/{new_path}" - assert myshared[new_path].path == newpath + assert str(newobj.path) == f"{myshared.name}/{new_path}" + assert myshared[new_path].path == newobj.path else: - assert str(newpath) == f"{myshared.name}/{new_fname}" - assert myshared[new_fname].path == newpath + assert str(newobj.path) == f"{myshared.name}/{new_fname}" + assert myshared[new_fname].path == newobj.path return None @@ -188,16 +188,16 @@ def test_copy(auth_client, dirpath, final_dir, fill_auth): new_fname = f"{dirpath}" if dirpath else "" else: new_fname = f"{dirpath}/{fname}" if dirpath else fname - newpath = file.copy(f"{myshared.name}/{new_fname}") + newobj = file.copy(f"{myshared.name}/{new_fname}") assert fname in mypublic if final_dir: basename = fname.split("/")[-1] new_path = f"{new_fname}/{basename}" if dirpath else basename - assert str(newpath) == f"{myshared.name}/{new_path}" - assert myshared[new_path].path == newpath + assert str(newobj.path) == f"{myshared.name}/{new_path}" + assert myshared[new_path].path == newobj.path else: - assert str(newpath) == f"{myshared.name}/{new_fname}" - assert myshared[new_fname].path == newpath + assert str(newobj.path) == f"{myshared.name}/{new_fname}" + assert myshared[new_fname].path == newobj.path return None @@ -211,24 +211,20 @@ def test_concat(auth_client, fill_auth, examples_dir): # Copy a 1d dataset to the shared area file = mypublic["ds-1d.b2nd"] copyname = "a.b2nd" - newpath = file.copy(f"@shared/{copyname}") - assert newpath == myshared[copyname].path + newobj = file.copy(f"@shared/{copyname}") + assert newobj.path == myshared[copyname].path copyname2 = "b.b2nd" - newpath2 = file.copy(f"@shared/{copyname2}") - assert newpath2 == myshared[copyname2].path + newobj2 = file.copy(f"@shared/{copyname2}") + assert newobj2.path == myshared[copyname2].path copyname3 = "c.b2nd" - newpath3 = file.copy(f"@shared/{copyname3}") - assert newpath3 == myshared[copyname3].path + newobj3 = file.copy(f"@shared/{copyname3}") + assert newobj3.path == myshared[copyname3].path # Test for File class file = myshared[copyname] - resultname = "result" - finalpath = auth_client.lazyexpr( - resultname, - expression="concat([a, b, c], axis=0)", - operands={"a": newpath, "b": newpath2, "c": newpath3}, - ) - result_ds = mypersonal[resultname + ".b2nd"] + resultpath = "result.b2nd" + lexpr = blosc2.lazyexpr("concat([newobj, newobj2, newobj3], axis=0)") + result_ds = mypersonal.upload(lexpr, resultpath) assert result_ds.shape[0] == 3 * myshared[copyname].shape[0] # check eager evaluation assert "expression" not in auth_client.get_info(result_ds) @@ -237,8 +233,7 @@ def test_concat(auth_client, fill_auth, examples_dir): fname = examples_dir / "ds-1d.b2nd" a = blosc2.open(fname) locres = np.concat([a[:], a[:], a[:]], axis=0) - sfile = auth_client.get(finalpath) - return np.testing.assert_array_equal(sfile[:], locres) + return np.testing.assert_array_equal(result_ds[:], locres) def test_stack(auth_client, fill_auth, examples_dir): @@ -255,23 +250,23 @@ def test_stack(auth_client, fill_auth, examples_dir): s = file.shape news = (s[0], 3, *s[1:]) copyname = "a.b2nd" - newpath = file.copy(f"@shared/{copyname}") - assert newpath == myshared[copyname].path + newobj = file.copy(f"@shared/{copyname}") + assert newobj.path == myshared[copyname].path copyname2 = "b.b2nd" - newpath2 = file.copy(f"@shared/{copyname2}") - assert newpath2 == myshared[copyname2].path + newobj2 = file.copy(f"@shared/{copyname2}") + assert newobj2.path == myshared[copyname2].path copyname3 = "c.b2nd" - newpath3 = file.copy(f"@shared/{copyname3}") - assert newpath3 == myshared[copyname3].path + newobj3 = file.copy(f"@shared/{copyname3}") + assert newobj3.path == myshared[copyname3].path # Test for File class file = myshared[copyname] resultname = "result" - finalpath = auth_client.lazyexpr( - resultname, - expression="stack([a, b, c], axis=1)", - operands={"a": newpath, "b": newpath2, "c": newpath3}, + lexpr = blosc2.lazyexpr( + "stack([a, b, c], axis=1)", + operands={"a": newobj, "b": newobj2, "c": newobj3}, ) + sfile = mypersonal.upload(lexpr, resultname + ".b2nd") result_ds = mypersonal[resultname + ".b2nd"] assert result_ds.shape == news # check eager evaluation @@ -281,7 +276,6 @@ def test_stack(auth_client, fill_auth, examples_dir): fname = examples_dir / fstr a = blosc2.open(fname) locres = np.stack([a[:], a[:], a[:]], axis=1) - sfile = auth_client.get(finalpath) return np.testing.assert_array_equal(sfile[:], locres) @@ -295,8 +289,8 @@ def test_append(auth_client, fields, fill_auth, examples_dir): fname = "ds-1d.b2nd" if not fields else "ds-1d-fields.b2nd" # Copy a 1d dataset to the shared area file = mypublic[fname] - newpath = file.copy(f"@shared/{fname}") - assert newpath == myshared[fname].path + newobj = file.copy(f"@shared/{fname}") + assert newobj.path == myshared[fname].path # Append to the dataset if fields: data = np.asarray( @@ -306,8 +300,8 @@ def test_append(auth_client, fields, fill_auth, examples_dir): else: data = [1, 2, 3] sfile = myshared[fname] - new_shape = sfile.append(data) - assert new_shape == (len(data) + file.meta["shape"][0],) + new_obj = sfile.append(data) + assert new_obj.shape == (len(data) + file.meta["shape"][0],) # Check the data fname = examples_dir / fname @@ -734,36 +728,34 @@ def test_lazyexpr(auth_client): if not auth_client: pytest.skip("authentication support needed") - opnm = "ds" oppt = f"{TEST_CATERVA2_ROOT}/ds-1d.b2nd" - expression = f"{opnm} + 0" - operands = {opnm: oppt} - lxname = "my_expr" + opnm = auth_client.get(oppt) + expression = opnm + 0 + rpath = "@personal/my_expr.b2nd" opinfo = auth_client.get_info(oppt) - lxpath = auth_client.lazyexpr(lxname, expression, operands) - assert lxpath == pathlib.Path(f"@personal/{lxname}.b2nd") + lxobj = auth_client.upload(expression, rpath) + assert lxobj.path == pathlib.Path(rpath) # Check result metadata. - lxinfo = auth_client.get_info(lxpath) + lxinfo = auth_client.get_info(lxobj) assert lxinfo["shape"] == opinfo["shape"] assert lxinfo["dtype"] == opinfo["dtype"] - assert lxinfo["expression"] == expression - assert lxinfo["operands"] == operands + assert lxinfo["expression"] == expression.expression + assert lxinfo["operands"] == {k: str(v) for k, v in expression.operands.items()} # Check result data. - a = auth_client.fetch(oppt) - b = auth_client.fetch(lxpath) + a = opnm + b = lxobj np.testing.assert_array_equal(a[:], b[:]) # test streamlined API - a = auth_client.get(oppt) + a = opnm ls = blosc2.lazyexpr(f"linspace(0, 1, {a.shape[0]})") mylazyexpr = a + 0 mylazyexpr += 2 * ls res = a[:] + 2 * ls[:] - lxpath = auth_client.upload_lazyexpr("@shared/newexpr.b2nd", mylazyexpr) - b = auth_client.fetch(lxpath) + b = auth_client.upload(mylazyexpr, "@shared/newexpr.b2nd") np.testing.assert_array_equal(res, b[:]) @@ -799,11 +791,13 @@ def test_lazyexpr2(expression, examples_dir, tmp_path, auth_client): assert ds_b in remote_root operands = {"a": remote_a, "b": remote_b} - lxpath = auth_client.lazyexpr("myexpr", expression, operands) - assert lxpath == pathlib.Path("@personal/myexpr.b2nd") + rpath = "@personal/myexpr.b2nd" + expr = blosc2.lazyexpr(expression, operands) + lxobj = auth_client.upload(expr, rpath) + assert lxobj.path == pathlib.Path(rpath) # Compute the expression - result = auth_client.get_slice(lxpath) + result = auth_client.get_slice(lxobj) assert isinstance(result, blosc2.NDArray) # Check the data @@ -818,11 +812,12 @@ def test_lazyexpr_getchunk(auth_client, fill_public): opnm = "ds" oppt = f"{TEST_CATERVA2_ROOT}/ds-1d.b2nd" expression = f"{opnm} - 0" - operands = {opnm: oppt} - lxname = "my_expr" + operands = {opnm: auth_client.get(oppt)} + expr = blosc2.lazyexpr(expression, operands) + rpath = "@personal/my_expr.b2nd" - lxpath = auth_client.lazyexpr(lxname, expression, operands) - assert lxpath == pathlib.Path(f"@personal/{lxname}.b2nd") + lxobj = auth_client.upload(expr, rpath) + assert lxobj.path == pathlib.Path(rpath) # Check for chunksize and dtype opinfo = auth_client.get_info(oppt) @@ -831,7 +826,7 @@ def test_lazyexpr_getchunk(auth_client, fill_public): # Get the first chunks chunk_ds = auth_client.get_chunk(oppt, 0) - chunk_expr = auth_client.get_chunk(lxpath, 0) + chunk_expr = auth_client.get_chunk(lxobj, 0) # Check data out = np.empty(chunksize, dtype=dtype) blosc2.decompress2(chunk_ds, out) @@ -902,37 +897,36 @@ def test_expr_from_expr(auth_client): if not auth_client: pytest.skip("authentication support needed") - opnm = "ds" oppt = f"{TEST_CATERVA2_ROOT}/ds-1d.b2nd" - expression = f"{opnm} + 1" - operands = {opnm: oppt} + opnm = auth_client.get(oppt) + expr = opnm + 1 lxname = "my_expr" + rpath = f"@personal/{lxname}.b2nd" opinfo = auth_client.get_info(oppt) - lxpath = auth_client.lazyexpr(lxname, expression, operands) - assert lxpath == pathlib.Path(f"@personal/{lxname}.b2nd") + lxobj = auth_client.upload(expr, rpath) + assert lxobj.path == pathlib.Path(rpath) - expression2 = f"{opnm} * 2" - operands2 = {opnm: f"@personal/{lxname}.b2nd"} - lxname = "expr_from_expr" - lxpath2 = auth_client.lazyexpr(lxname, expression2, operands2) - assert lxpath2 == pathlib.Path(f"@personal/{lxname}.b2nd") + expr2 = lxobj * 2 + rpath = "@personal/expr_from_expr.b2nd" + lxobj2 = auth_client.upload(expr2, rpath) + assert lxobj2.path == pathlib.Path(rpath) # Check result metadata. - lxinfo = auth_client.get_info(lxpath) - lxinfo2 = auth_client.get_info(lxpath2) + lxinfo = auth_client.get_info(lxobj) + lxinfo2 = auth_client.get_info(lxobj2) assert lxinfo["shape"] == opinfo["shape"] == lxinfo2["shape"] assert lxinfo["dtype"] == opinfo["dtype"] == lxinfo2["dtype"] - assert lxinfo["operands"] == operands - assert lxinfo["expression"] == expression + assert lxinfo["operands"] == {k: str(v) for k, v in expr.operands.items()} + assert lxinfo["expression"] == expr.expression - assert lxinfo2["operands"] == operands2 - assert lxinfo2["expression"] == expression2 + assert lxinfo2["operands"] == {k: str(v) for k, v in expr2.operands.items()} + assert lxinfo2["expression"] == expr2.expression # Check result data. - a = auth_client.fetch(oppt) - b = auth_client.fetch(lxpath) - c = auth_client.fetch(lxpath2) + a = opnm + b = expr + c = expr2 np.testing.assert_array_equal(a[:] + 1, b[:]) np.testing.assert_array_equal((a[:] + 1) * 2, c[:]) @@ -941,23 +935,13 @@ def test_expr_no_operand(auth_client): if not auth_client: pytest.skip("authentication support needed") - expression = "linspace(0, 10, num=50)" - lxname = "my_expr" + expression = blosc2.lazyexpr("linspace(0, 10, num=50)") + rpath = "@personal/my_expr.b2nd" - lxpath = auth_client.lazyexpr(lxname, expression) - assert lxpath == pathlib.Path(f"@personal/{lxname}.b2nd") - c = auth_client.get(lxpath) + lxobj = auth_client.upload(expression, rpath) + assert lxobj.path == pathlib.Path(rpath) a = blosc2.linspace(0, 10, num=50) - np.testing.assert_array_equal(a[:], c[:]) - - # Check error when operand should be present but isn't - opnm = "ds" - oppt = f"{TEST_CATERVA2_ROOT}/ds-1d.b2nd" - expression = "ds + linspace(0, 10, num=50)" - lxname = "my_expr" - - with pytest.raises(Exception) as e_info: - lxpath = auth_client.lazyexpr(lxname, expression) + np.testing.assert_array_equal(a[:], lxobj[:]) def test_expr_force_compute(auth_client): @@ -966,18 +950,18 @@ def test_expr_force_compute(auth_client): expression = "linspace(0, 10, num=50)" lxname = "my_expr" + expr = blosc2.lazyexpr(expression) + rpath = f"@personal/{lxname}.b2nd" # Uncomputed lazyexpr is a blosc2 lazyexpr - lxpath = auth_client.lazyexpr(lxname, expression, compute=False) - assert lxpath == pathlib.Path(f"@personal/{lxname}.b2nd") - c = auth_client.get(lxpath) - assert c.meta["expression"] == expression + lxobj = auth_client.upload(expr, rpath, compute=False) + assert lxobj.path == pathlib.Path(rpath) + assert lxobj.meta["expression"] == f"({expression})" # blosc2 forces brackets # Computed lazyexpr is a blosc2 array - lxpath = auth_client.lazyexpr(lxname, expression, compute=True) - assert lxpath == pathlib.Path(f"@personal/{lxname}.b2nd") - c = auth_client.get(lxpath) - assert c.meta.get("expression", None) is None + lxobj = auth_client.upload(expr, rpath, compute=True) + assert lxobj.path == pathlib.Path(rpath) + assert lxobj.meta.get("expression", None) is None # User management @@ -1114,12 +1098,12 @@ def test_listusers_unauthorized(client, auth_client): def test_client_timeout(auth_client): if not auth_client: pytest.skip("authentication support needed") - - lxpath = auth_client.lazyexpr("expr", "linspace(0, 100, 1000_0000)", compute=True) - assert lxpath == pathlib.Path("@personal/expr.b2nd") + expr = blosc2.lazyexpr("linspace(0, 100, 1000_0000)") + lxobj = auth_client.upload(expr, "@personal/expr.b2nd", compute=True) + assert lxobj.path == pathlib.Path("@personal/expr.b2nd") auth_client.timeout = 0.0001 # Try again with pytest.raises(Exception) as e_info: - _ = auth_client.lazyexpr("expr", "linspace(0, 100, 1000_0000)", compute=True) + _ = auth_client.upload(expr, "@personal/expr.b2nd", compute=True) assert "Timeout" in str(e_info) auth_client.timeout = 5 # Reset timeout to default value diff --git a/caterva2/tests/test_hdf5_proxy.py b/caterva2/tests/test_hdf5_proxy.py index 5208c64e..ef43e91c 100644 --- a/caterva2/tests/test_hdf5_proxy.py +++ b/caterva2/tests/test_hdf5_proxy.py @@ -325,9 +325,10 @@ def test_expression(expression, examples_dir, tmp_path, auth_client): remote_b = remote_dir / (ds_b + ".b2nd") assert remote_b in remote_root - operands = {"a": str(remote_root) + "/" + str(remote_a), "b": str(remote_root) + "/" + str(remote_b)} - lxpath = auth_client.lazyexpr("myexpr", expression, operands) - assert lxpath == pathlib.Path("@personal/myexpr.b2nd") + operands = {"a": remote_root[remote_a], "b": remote_root[remote_b]} + lexpr = blosc2.lazyexpr(expression, operands) + lxpath = remote_root.upload(lexpr, "myexpr.b2nd") + assert lxpath.path == pathlib.Path("@shared/myexpr.b2nd") if expression == "matmul(a, b)": # check evaluated eagerly for linalg assert "expression" not in auth_client.get_info(lxpath) From 9780f45d047c86171f5f30bb980566e766c25e99 Mon Sep 17 00:00:00 2001 From: lshaw8317 Date: Tue, 25 Nov 2025 09:35:37 +0100 Subject: [PATCH 6/8] Minor edits --- caterva2/services/server.py | 7 +++++-- caterva2/tests/test_hdf5_proxy.py | 10 +++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/caterva2/services/server.py b/caterva2/services/server.py index 8b519443..a0fdb645 100644 --- a/caterva2/services/server.py +++ b/caterva2/services/server.py @@ -522,7 +522,7 @@ async def fetch_data( if field: container = container[field] - if isinstance(container, blosc2.NDArray | blosc2.LazyExpr | hdf5.HDF5Proxy | blosc2.NDField): + if isinstance(container, blosc2.NDArray | blosc2.LazyArray | hdf5.HDF5Proxy | blosc2.NDField): array = container schunk = getattr(array, "schunk", None) # not really needed typesize = array.dtype.itemsize @@ -554,7 +554,10 @@ async def fetch_data( if isinstance(array, hdf5.HDF5Proxy): data = array.to_cframe(() if slice_ is None else slice_) - elif isinstance(array, blosc2.LazyExpr | blosc2.NDField): + elif isinstance(array, blosc2.LazyArray): + data = array.compute(() if slice_ is None else slice_) + data = data.to_cframe() + elif isinstance(array, blosc2.NDField): data = array[() if slice_ is None else slice_] data = blosc2.asarray(data) data = data.to_cframe() diff --git a/caterva2/tests/test_hdf5_proxy.py b/caterva2/tests/test_hdf5_proxy.py index ef43e91c..2b1a48c1 100644 --- a/caterva2/tests/test_hdf5_proxy.py +++ b/caterva2/tests/test_hdf5_proxy.py @@ -327,13 +327,13 @@ def test_expression(expression, examples_dir, tmp_path, auth_client): operands = {"a": remote_root[remote_a], "b": remote_root[remote_b]} lexpr = blosc2.lazyexpr(expression, operands) - lxpath = remote_root.upload(lexpr, "myexpr.b2nd") - assert lxpath.path == pathlib.Path("@shared/myexpr.b2nd") + lxobj = remote_root.upload(lexpr, "myexpr.b2nd") + assert lxobj.path == pathlib.Path("@shared/myexpr.b2nd") if expression == "matmul(a, b)": # check evaluated eagerly for linalg - assert "expression" not in auth_client.get_info(lxpath) + assert "expression" not in auth_client.get_info(lxobj) # Compute the expression - result = auth_client.get_slice(lxpath) + result = auth_client.get_slice(lxobj) assert isinstance(result, blosc2.NDArray) # Check the data @@ -343,4 +343,4 @@ def test_expression(expression, examples_dir, tmp_path, auth_client): nresult = ne.evaluate(expression, {"a": na, "b": nb}) else: nresult = np.matmul(na, nb) - np.testing.assert_allclose(result[:], nresult) + np.testing.assert_allclose(result[()], nresult) From 0074fa16efd7a9848455c4c027d382e0c0a4bb4c Mon Sep 17 00:00:00 2001 From: Luke Shaw Date: Tue, 25 Nov 2025 10:49:39 +0100 Subject: [PATCH 7/8] Enable saving of lazyUDF --- caterva2/services/server.py | 7 ++++++- caterva2/tests/test_api.py | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/caterva2/services/server.py b/caterva2/services/server.py index a0fdb645..a91a6e71 100644 --- a/caterva2/services/server.py +++ b/caterva2/services/server.py @@ -15,6 +15,7 @@ import io import itertools import json +import linecache import mimetypes import os import pathlib @@ -709,7 +710,11 @@ def make_expr( if func is not None: local_ns = {} - exec(func, {"np": np, "blosc2": blosc2}, local_ns) + filename = f"<{name}>" # any unique name + # Register the source so inspect can find it when saving later on + linecache.cache[filename] = (len(func), None, func.splitlines(True), filename) + exec(compile(func, filename, "exec"), {"np": np, "blosc2": blosc2}, local_ns) + if name not in local_ns or not isinstance(local_ns[name], typing.types.FunctionType): raise ValueError(f"User code must define a function called {name}") arr = blosc2.lazyudf( diff --git a/caterva2/tests/test_api.py b/caterva2/tests/test_api.py index c61b53c8..d8312a56 100644 --- a/caterva2/tests/test_api.py +++ b/caterva2/tests/test_api.py @@ -759,6 +759,40 @@ def test_lazyexpr(auth_client): np.testing.assert_array_equal(res, b[:]) +def test_lazyudf(examples_dir, tmp_path, auth_client): + root = pathlib.Path("@shared") + remote_root = auth_client.get(root) + remote_dir = "arrays" + ds_a = f"{remote_dir}/3d-blosc2-a.b2nd" + ds_b = f"{remote_dir}/3d-blosc2-b.b2nd" + + with contextlib.chdir(tmp_path): + os.makedirs(remote_dir, exist_ok=True) + a = np.linspace(-1, 2, 1000).reshape(10, 10, 10) + blosc2.asarray(a, urlpath=ds_a, chunks=(5, 10, 10)) + remote_a = remote_root.upload(ds_a) + b = np.linspace(-1, 2, 1000).reshape(10, 10, 10) + blosc2.asarray(b, urlpath=ds_b, chunks=(3, 5, 5)) + remote_b = remote_root.upload(ds_b) + assert ds_a in remote_root + assert ds_b in remote_root + + def myudf(inputs, output, offset): + x1, x2 = inputs + output[:] = np.logaddexp(x1, x2) + + dtype = blosc2.result_type(remote_a, remote_b) + ludf = blosc2.lazyudf(myudf, (remote_a, remote_b), dtype=dtype, shape=remote_a.shape) + + # Try uploading with compute True + ludf_remote = auth_client.upload(remotepath="@shared/ludf.b2nd", local_dset=ludf, compute=True) + np.testing.assert_array_equal(ludf[:], ludf_remote[:]) + + # Try uploading with compute False + ludf_remote = auth_client.upload(remotepath="@shared/ludf.b2nd", local_dset=ludf, compute=False) + np.testing.assert_array_equal(ludf[:], ludf_remote[:]) + + # More exercises for the expression evaluation with Blosc2 arrays @pytest.mark.parametrize( "expression", From 1cbd04451b2f9de49988daaa8c84f727b9889e74 Mon Sep 17 00:00:00 2001 From: Luke Shaw Date: Tue, 25 Nov 2025 12:50:54 +0100 Subject: [PATCH 8/8] Rectify lazyudf test for auth --- caterva2/services/server.py | 2 +- caterva2/tests/test_api.py | 45 +++++++++++++++++-------------------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/caterva2/services/server.py b/caterva2/services/server.py index a91a6e71..0c0e4a7e 100644 --- a/caterva2/services/server.py +++ b/caterva2/services/server.py @@ -729,7 +729,7 @@ def make_expr( compute = True # Handle name or path - if name is None: # provided a path + if remotepath is not None: # provided a path # Get the absolute path for this user urlpath = get_writable_path(remotepath, user) abspath = urlpath.parent diff --git a/caterva2/tests/test_api.py b/caterva2/tests/test_api.py index d8312a56..8735043c 100644 --- a/caterva2/tests/test_api.py +++ b/caterva2/tests/test_api.py @@ -759,38 +759,33 @@ def test_lazyexpr(auth_client): np.testing.assert_array_equal(res, b[:]) -def test_lazyudf(examples_dir, tmp_path, auth_client): +def test_lazyudf(auth_client): + if not auth_client: + pytest.skip("authentication support needed") root = pathlib.Path("@shared") remote_root = auth_client.get(root) - remote_dir = "arrays" - ds_a = f"{remote_dir}/3d-blosc2-a.b2nd" - ds_b = f"{remote_dir}/3d-blosc2-b.b2nd" - with contextlib.chdir(tmp_path): - os.makedirs(remote_dir, exist_ok=True) - a = np.linspace(-1, 2, 1000).reshape(10, 10, 10) - blosc2.asarray(a, urlpath=ds_a, chunks=(5, 10, 10)) - remote_a = remote_root.upload(ds_a) - b = np.linspace(-1, 2, 1000).reshape(10, 10, 10) - blosc2.asarray(b, urlpath=ds_b, chunks=(3, 5, 5)) - remote_b = remote_root.upload(ds_b) - assert ds_a in remote_root - assert ds_b in remote_root + a = np.linspace(1, 2, 1000).reshape(10, 10, 10) + ds_a = blosc2.asarray(a, chunks=(5, 10, 10)) + remote_a = remote_root.upload(ds_a, "3d-blosc2-a.b2nd") + b = np.linspace(1, 2, 1000).reshape(10, 10, 10) + ds_b = blosc2.asarray(b, chunks=(3, 5, 5)) + remote_b = remote_root.upload(ds_b, "3d-blosc2-b.b2nd") - def myudf(inputs, output, offset): - x1, x2 = inputs - output[:] = np.logaddexp(x1, x2) + def myudf(inputs, output, offset): + x1, x2 = inputs + output[:] = np.logaddexp(x1, x2) - dtype = blosc2.result_type(remote_a, remote_b) - ludf = blosc2.lazyudf(myudf, (remote_a, remote_b), dtype=dtype, shape=remote_a.shape) + dtype = blosc2.result_type(remote_a, remote_b) + ludf = blosc2.lazyudf(myudf, (remote_a, remote_b), dtype=dtype, shape=remote_a.shape) - # Try uploading with compute True - ludf_remote = auth_client.upload(remotepath="@shared/ludf.b2nd", local_dset=ludf, compute=True) - np.testing.assert_array_equal(ludf[:], ludf_remote[:]) + # Try uploading with compute True + ludf_remote = auth_client.upload(remotepath="@shared/ludf.b2nd", local_dset=ludf, compute=True) + np.testing.assert_array_equal(ludf[:], ludf_remote[:]) - # Try uploading with compute False - ludf_remote = auth_client.upload(remotepath="@shared/ludf.b2nd", local_dset=ludf, compute=False) - np.testing.assert_array_equal(ludf[:], ludf_remote[:]) + # Try uploading with compute False + ludf_remote = auth_client.upload(remotepath="@shared/ludf.b2nd", local_dset=ludf, compute=False) + np.testing.assert_array_equal(ludf[:], ludf_remote[:]) # More exercises for the expression evaluation with Blosc2 arrays