Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dask_to_root #1085

Merged
merged 28 commits into from Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0b95eda
Dask write first push. Does have expected output with a simple example
zbilodea Jan 15, 2024
51eb7a9
dask to root, seems to work with simple examples
zbilodea Jan 15, 2024
26bccd2
Merge branch 'main' into feat-add-dask-to-root
zbilodea Jan 19, 2024
2a4807c
Merge branch 'main' into feat-add-dask-to-root
zbilodea Jan 19, 2024
c69a21c
Added dask dependency to fix webdocs build...will change back later
zbilodea Jan 19, 2024
660e923
switched dependency again
zbilodea Jan 19, 2024
eb2fc69
changing dependencies again for building webdocs.
zbilodea Jan 19, 2024
1d5b3e3
build issues
zbilodea Jan 19, 2024
2202d01
style: pre-commit fixes
pre-commit-ci[bot] Jan 19, 2024
ade66d8
added more tests
zbilodea Jan 19, 2024
8eb786f
Merge branch 'feat-add-dask-to-root' of https://github.com/scikit-hep…
zbilodea Jan 19, 2024
e633885
style: pre-commit fixes
pre-commit-ci[bot] Jan 19, 2024
d72f864
Fixed storage options not passing to uproot.recreate
zbilodea Jan 19, 2024
7db1363
Merge branch 'feat-add-dask-to-root' of https://github.com/scikit-hep…
zbilodea Jan 19, 2024
e35036f
style: pre-commit fixes
pre-commit-ci[bot] Jan 19, 2024
74fe866
removed dependencies and no longer tracking version.py, added test
zbilodea Jan 22, 2024
0610018
style: pre-commit fixes
pre-commit-ci[bot] Jan 22, 2024
e016a1f
fix
zbilodea Jan 22, 2024
098b7b4
Merge branch 'feat-add-dask-to-root' of https://github.com/scikit-hep…
zbilodea Jan 22, 2024
2ea4d6d
some fixes didn't work.
zbilodea Jan 22, 2024
3d77e18
left a test directrory in samples
zbilodea Jan 22, 2024
31f6e54
fixing pytest error 'PosixPath is not iterable'
zbilodea Jan 22, 2024
4585a84
trying to fix pytest
zbilodea Jan 22, 2024
271a15a
still pytest issue
zbilodea Jan 22, 2024
b9f003f
style: pre-commit fixes
pre-commit-ci[bot] Jan 22, 2024
a7bc155
Merge branch 'main' into feat-add-dask-to-root
zbilodea Jan 22, 2024
4bd8a29
Update tests/test_1085_dask_write.py
zbilodea Jan 22, 2024
335fc77
Update tests/test_1085_dask_write.py
zbilodea Jan 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Binary file added docs-img/.DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions src/uproot/__init__.py
Expand Up @@ -123,6 +123,7 @@
from uproot.writing import WritableTree
from uproot.writing import WritableBranch
from uproot.writing import to_writable
from uproot.writing import dask_write

import uproot.models.TObject
import uproot.models.TString
Expand Down
2 changes: 2 additions & 0 deletions src/uproot/writing/__init__.py
Expand Up @@ -11,6 +11,7 @@
"""
from __future__ import annotations

from uproot.writing._dask_write import dask_write
from uproot.writing.identify import (
to_TArray,
to_TH1x,
Expand Down Expand Up @@ -51,4 +52,5 @@
"create",
"recreate",
"update",
"dask_write",
]
185 changes: 185 additions & 0 deletions src/uproot/writing/_dask_write.py
@@ -0,0 +1,185 @@
from __future__ import annotations

import math
from typing import Any

from fsspec import AbstractFileSystem
from fsspec.core import url_to_fs

import uproot


class _ToROOTFn:
def __init__(
self,
fs: AbstractFileSystem,
path: str,
npartitions: int,
prefix: str | None = None,
storage_options: dict | None = None,
**kwargs: Any,
):
self.fs = fs
self.path = path
self.prefix = prefix
self.zfill = math.ceil(math.log(npartitions, 10))
self.storage_options = storage_options
self.fs.mkdirs(self.path, exist_ok=True)
self.protocol = (
self.fs.protocol
if isinstance(self.fs.protocol, str)
else self.fs.protocol[0]
)
self.kwargs = kwargs

def __call__(self, data, block_index):
filename = f"part{str(block_index[0]).zfill(self.zfill)}.root"
if self.prefix is not None:
filename = f"{self.prefix}-{filename}"
filename = f"{self.protocol}://{self.path}/{filename}"
return ak_to_root(
filename, data, **self.kwargs, storage_options=self.storage_options
)


def dask_write(
array,
destination,
compute=True,
storage_options=None,
prefix: str | None = None,
tree_name="tree",
title="",
field_name=lambda outer, inner: inner if outer == "" else outer + "_" + inner,
initial_basket_capacity=10,
counter_name=lambda counted: "n" + counted,
resize_factor=10.0,
compression="zlib",
compression_level=1,
):
"""
Args:
array (`dask_awkward.Array`): The :obj:`dask_awkward.Array` collection to write to disk.
destination (path-like): Where to store the output; this can be a local filesystem path
or a remote filesystem path.
compute (bool): If ``True``, immediately compute the result (write data to disk). If ``False`` a Scalar collection will be returned such that ``compute`` can be explicitly called.
prefix (str): An addition prefix for output files. If ``None`` all parts
inside the destination directory will be named ``?``; if
defined, the names will be ``f"{prefix}-partN.root"``.
tree_name (str): Name of ttree to be written to. Default is "tree".
title (str): Title of ttree to be written to. Default is "".
field_name (callable of str \u2192 str): Function to generate TBranch
names for columns of an Awkward record array or a Pandas DataFrame.
initial_basket_capacity (int): Number of TBaskets that can be written to the
TTree without rewriting the TTree metadata to make room.
resize_factor (float): When the TTree metadata needs to be rewritten,
this specifies how many more TBasket slots to allocate as a multiplicative
factor.
compression (:doc:`uproot.compression.Compression` or None): Compression algorithm
and level for new objects added to the file. Can be updated after creating
the :doc:`uproot.writing.writable.WritableFile`. Default is ``uproot.ZLIB(1)``.

Writes a dask-awkward array to a set of ROOT files. Data is written to a TTree

.. code-block:: python
import awkward as ak
import dask_awkward as dak
a = ak.Array([{"a": [1,2,3]}, {"a": [4, 5]}])
d = dask_write(a, npartitions=2)
d.nparatitions
uproot.dask_write(d)

"""
from dask.base import tokenize
from dask.blockwise import BlockIndex
from dask.highlevelgraph import HighLevelGraph
from dask_awkward.layers.layers import AwkwardMaterializedLayer
from dask_awkward.lib.core import map_partitions, new_scalar_object

fs, path = url_to_fs(destination, **(storage_options or {}))
name = f"write-root-{tokenize(fs, array, destination)}"

map_res = map_partitions(
_ToROOTFn(
fs=fs,
path=path,
npartitions=array.npartitions,
prefix=prefix,
tree_name=tree_name,
compression=compression,
compression_level=compression_level,
title=title,
field_name=field_name,
counter_name=counter_name,
resize_factor=resize_factor,
initial_basket_capacity=initial_basket_capacity,
),
array,
BlockIndex((array.npartitions,)),
label="to-root",
meta=array._meta,
)
map_res.dask.layers[map_res.name].annotations = {"ak_output": True}
dsk = {}
final_name = name + "-finalize"
dsk[(final_name, 0)] = (lambda *_: None, map_res.__dask_keys__())

graph = HighLevelGraph.from_collections(
final_name,
AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]),
dependencies=[map_res],
)
out = new_scalar_object(graph, final_name, dtype="f8")
if compute:
out.compute()
return None
else:
return out


def ak_to_root(
destination,
array,
tree_name,
compression,
compression_level,
title,
counter_name,
field_name,
initial_basket_capacity,
resize_factor,
storage_options,
):
if compression in ("LZMA", "lzma"):
compression_code = uproot.const.kLZMA
elif compression in ("ZLIB", "zlib"):
compression_code = uproot.const.kZLIB
elif compression in ("LZ4", "lz4"):
compression_code = uproot.const.kLZ4
elif compression in ("ZSTD", "zstd"):
compression_code = uproot.const.kZSTD
else:
msg = f"unrecognized compression algorithm: {compression}. Only ZLIB, LZMA, LZ4, and ZSTD are accepted."
raise ValueError(msg)

out_file = uproot.recreate(
destination,
compression=uproot.compression.Compression.from_code_pair(
compression_code, compression_level
),
**(storage_options or {}),
)

branch_types = {name: array[name].type for name in array.fields}

out_file.mktree(
tree_name,
branch_types,
title=title,
counter_name=counter_name,
field_name=field_name,
initial_basket_capacity=initial_basket_capacity,
resize_factor=resize_factor,
)

out_file[tree_name].extend({name: array[name] for name in array.fields})
87 changes: 87 additions & 0 deletions tests/test_1085_dask_write.py
@@ -0,0 +1,87 @@
import math
import pytest
import awkward as ak
import skhep_testdata
import os

dask = pytest.importorskip("dask")
da = pytest.importorskip("dask.array")
dd = pytest.importorskip("dask.distributed")
dask_awkward = pytest.importorskip("dask_awkward")

from distributed import Client
import uproot


def simple_test(tmp_path):
partitions = 2
arr = ak.Array([{"a": [1, 2, 3]}, {"a": [4, 5]}])
dask_arr = dask_awkward.from_awkward(arr, npartitions=partitions)
uproot.dask_write(dask_arr, tmp_path, prefix="data")
file_1 = uproot.open(os.path.join(tmp_path, "data-part0.root"))
assert len(file_1["tree"]["a"].arrays()) == math.ceil(len(arr) / partitions)
assert ak.all(file_1["tree"]["a"].arrays()["a"][0] == arr[0]["a"])
file_2 = uproot.open(os.path.join(tmp_path, "data-part1.root"))
assert len(file_2["tree"]["a"].arrays()) == int(len(arr) / partitions)
assert ak.all(file_2["tree"]["a"].arrays()["a"][0] == arr[1]["a"])


def HZZ_test(tmp_path):
"""
Write data from HZZ with 3 partitions.
"""
partitions = 2
arr = uproot.open(skhep_testdata.data_path("uproot-HZZ.root"))["events"].arrays()
dask_arr = dask_awkward.from_awkward(ak.from_iter(arr), partitions)
uproot.dask_write(dask_arr, tmp_path, prefix="data")
file_1 = uproot.open(os.path.join(tmp_path, "data-part0.root"))
assert len(file_1["tree"]["Jet_Px"].arrays()) == math.ceil(len(arr) / partitions)
assert ak.all(file_1["tree"]["Jet_Px"].arrays()["Jet_Px"][0] == arr["Jet_Px"])
file_2 = uproot.open(os.path.join(tmp_path, "data-part1.root"))
assert len(file_2["tree"]["Jet_Px"].arrays()) == int(len(arr) / partitions)
assert ak.all(file_2["tree"]["Jet_Px"].arrays()["Jet_Px"][0] == arr["Jet_Px"])


def test_graph(tmp_path):
"""
Test compute parameter (want to return highlevelgraph)
"""
partitions = 2
arr = ak.Array([{"a": [1, 2, 3]}, {"a": [4, 5]}])
dask_arr = dask_awkward.from_awkward(arr, npartitions=partitions)
graph = uproot.dask_write(
dask_arr,
str(tmp_path),
prefix="compute",
compute=False,
)
graph.compute()
file_1 = uproot.open(os.path.join(tmp_path, "compute-part0.root"))
assert len(file_1["tree"]["a"].arrays()) == math.ceil(len(arr) / partitions)
assert ak.all(file_1["tree"]["a"].arrays()["a"][0] == arr[0]["a"])
file_2 = uproot.open(os.path.join(tmp_path, "compute-part1.root"))
assert len(file_2["tree"]["a"].arrays()) == int(len(arr) / partitions)
assert ak.all(file_2["tree"]["a"].arrays()["a"][0] == arr[1]["a"])


def test_compute(tmp_path):
print("here")
partitions = 2
arr = uproot.open(skhep_testdata.data_path("uproot-HZZ.root"))["events"].arrays()
dask_arr = dask_awkward.from_awkward(ak.from_iter(arr), partitions)
with Client() as _:
zbilodea marked this conversation as resolved.
Show resolved Hide resolved
graph = uproot.dask_write(
dask_arr, str(tmp_path), prefix="distribute", compute=False
)
dask.compute(graph)
dask_arr = dask_awkward.from_awkward(ak.from_iter(arr), partitions)
file_1 = uproot.open(os.path.join(tmp_path, "distribute-part0.root"))
assert len(file_1["tree"]["Jet_Px"].arrays()) == math.ceil(len(arr) / partitions)
assert ak.all(file_1["tree"]["Jet_Px"].arrays()["Jet_Px"][0] == arr["Jet_Px"])
file_2 = uproot.open(os.path.join(tmp_path, "distribute-part1.root"))
assert len(file_2["tree"]["Jet_Px"].arrays()) == int(len(arr) / partitions)
assert ak.all(file_2["tree"]["Jet_Px"].arrays()["Jet_Px"][0] == arr["Jet_Px"])


# if __name__ == "__main__":
# test_compute("\my-output")
zbilodea marked this conversation as resolved.
Show resolved Hide resolved