Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dask_to_root #1085

Merged
merged 28 commits into from Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0b95eda
Dask write first push. Does have expected output with a simple example
zbilodea Jan 15, 2024
51eb7a9
dask to root, seems to work with simple examples
zbilodea Jan 15, 2024
26bccd2
Merge branch 'main' into feat-add-dask-to-root
zbilodea Jan 19, 2024
2a4807c
Merge branch 'main' into feat-add-dask-to-root
zbilodea Jan 19, 2024
c69a21c
Added dask dependency to fix webdocs build...will change back later
zbilodea Jan 19, 2024
660e923
switched dependency again
zbilodea Jan 19, 2024
eb2fc69
changing dependencies again for building webdocs.
zbilodea Jan 19, 2024
1d5b3e3
build issues
zbilodea Jan 19, 2024
2202d01
style: pre-commit fixes
pre-commit-ci[bot] Jan 19, 2024
ade66d8
added more tests
zbilodea Jan 19, 2024
8eb786f
Merge branch 'feat-add-dask-to-root' of https://github.com/scikit-hep…
zbilodea Jan 19, 2024
e633885
style: pre-commit fixes
pre-commit-ci[bot] Jan 19, 2024
d72f864
Fixed storage options not passing to uproot.recreate
zbilodea Jan 19, 2024
7db1363
Merge branch 'feat-add-dask-to-root' of https://github.com/scikit-hep…
zbilodea Jan 19, 2024
e35036f
style: pre-commit fixes
pre-commit-ci[bot] Jan 19, 2024
74fe866
removed dependencies and no longer tracking version.py, added test
zbilodea Jan 22, 2024
0610018
style: pre-commit fixes
pre-commit-ci[bot] Jan 22, 2024
e016a1f
fix
zbilodea Jan 22, 2024
098b7b4
Merge branch 'feat-add-dask-to-root' of https://github.com/scikit-hep…
zbilodea Jan 22, 2024
2ea4d6d
some fixes didn't work.
zbilodea Jan 22, 2024
3d77e18
left a test directrory in samples
zbilodea Jan 22, 2024
31f6e54
fixing pytest error 'PosixPath is not iterable'
zbilodea Jan 22, 2024
4585a84
trying to fix pytest
zbilodea Jan 22, 2024
271a15a
still pytest issue
zbilodea Jan 22, 2024
b9f003f
style: pre-commit fixes
pre-commit-ci[bot] Jan 22, 2024
a7bc155
Merge branch 'main' into feat-add-dask-to-root
zbilodea Jan 22, 2024
4bd8a29
Update tests/test_1085_dask_write.py
zbilodea Jan 22, 2024
335fc77
Update tests/test_1085_dask_write.py
zbilodea Jan 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions pyproject.toml
Expand Up @@ -39,9 +39,7 @@ dependencies = [
"numpy",
"fsspec",
"packaging",
"typing_extensions>=4.1.0; python_version < \"3.11\"",
"dask >=2023.04.0",
"dask-awkward>=2023.12.1"
"typing_extensions>=4.1.0; python_version < \"3.11\""
]
description = "ROOT I/O in pure Python and NumPy."
dynamic = [
Expand Down
Binary file removed src/uproot/my-output/compute-part0.root
Binary file not shown.
Binary file removed src/uproot/my-output/compute-part1.root
Binary file not shown.
Binary file removed src/uproot/my-output/data-part0.root
Binary file not shown.
Binary file removed src/uproot/my-output/data-part1.root
Binary file not shown.
Binary file removed src/uproot/my-output/data-part2.root
Binary file not shown.
19 changes: 0 additions & 19 deletions src/uproot/version.py

This file was deleted.

19 changes: 9 additions & 10 deletions src/uproot/writing/_dask_write.py
Expand Up @@ -3,11 +3,6 @@
import math
from typing import Any

from dask.base import tokenize
from dask.blockwise import BlockIndex
from dask.highlevelgraph import HighLevelGraph
from dask_awkward.layers.layers import AwkwardMaterializedLayer
from dask_awkward.lib.core import map_partitions, new_scalar_object
from fsspec import AbstractFileSystem
from fsspec.core import url_to_fs

Expand Down Expand Up @@ -42,7 +37,7 @@ def __call__(self, data, block_index):
if self.prefix is not None:
filename = f"{self.prefix}-{filename}"
filename = f"{self.protocol}://{self.path}/{filename}"
return to_root(
return ak_to_root(
filename, data, **self.kwargs, storage_options=self.storage_options
)

Expand Down Expand Up @@ -95,6 +90,12 @@ def dask_write(
uproot.dask_write(d)

"""
from dask.base import tokenize
from dask.blockwise import BlockIndex
from dask.highlevelgraph import HighLevelGraph
from dask_awkward.layers.layers import AwkwardMaterializedLayer
from dask_awkward.lib.core import map_partitions, new_scalar_object

fs, path = url_to_fs(destination, **(storage_options or {}))
name = f"write-root-{tokenize(fs, array, destination)}"

Expand All @@ -114,9 +115,7 @@ def dask_write(
initial_basket_capacity=initial_basket_capacity,
),
array,
BlockIndex(
(array.npartitions,)
), # class to provide current block index at each block of the operation...
BlockIndex((array.npartitions,)),
label="to-root",
meta=array._meta,
)
Expand All @@ -138,7 +137,7 @@ def dask_write(
return out


def to_root( # user-defined groups for ak.zip?
def ak_to_root(
destination,
array,
tree_name,
Expand Down
Binary file removed tests/samples/my-output/data-part0.root
Binary file not shown.
Binary file removed tests/samples/my-output/data-part1.root
Binary file not shown.
87 changes: 87 additions & 0 deletions tests/test_1085_dask_write.py
@@ -0,0 +1,87 @@
import math
import pytest
import awkward as ak
import skhep_testdata
import os

dask = pytest.importorskip("dask")
da = pytest.importorskip("dask.array")
dd = pytest.importorskip("dask.distributed")
dask_awkward = pytest.importorskip("dask_awkward")

from distributed import Client
import uproot


def simple_test(tmp_path):
partitions = 2
arr = ak.Array([{"a": [1, 2, 3]}, {"a": [4, 5]}])
dask_arr = dask_awkward.from_awkward(arr, npartitions=partitions)
uproot.dask_write(dask_arr, tmp_path, prefix="data")
file_1 = uproot.open(os.path.join(tmp_path, "data-part0.root"))
assert len(file_1["tree"]["a"].arrays()) == math.ceil(len(arr) / partitions)
assert ak.all(file_1["tree"]["a"].arrays()["a"][0] == arr[0]["a"])
file_2 = uproot.open(os.path.join(tmp_path, "data-part1.root"))
assert len(file_2["tree"]["a"].arrays()) == int(len(arr) / partitions)
assert ak.all(file_2["tree"]["a"].arrays()["a"][0] == arr[1]["a"])


def HZZ_test(tmp_path):
"""
Write data from HZZ with 3 partitions.
"""
partitions = 2
arr = uproot.open(skhep_testdata.data_path("uproot-HZZ.root"))["events"].arrays()
dask_arr = dask_awkward.from_awkward(ak.from_iter(arr), partitions)
uproot.dask_write(dask_arr, tmp_path, prefix="data")
file_1 = uproot.open(os.path.join(tmp_path, "data-part0.root"))
assert len(file_1["tree"]["Jet_Px"].arrays()) == math.ceil(len(arr) / partitions)
assert ak.all(file_1["tree"]["Jet_Px"].arrays()["Jet_Px"][0] == arr["Jet_Px"])
file_2 = uproot.open(os.path.join(tmp_path, "data-part1.root"))
assert len(file_2["tree"]["Jet_Px"].arrays()) == int(len(arr) / partitions)
assert ak.all(file_2["tree"]["Jet_Px"].arrays()["Jet_Px"][0] == arr["Jet_Px"])


def test_graph(tmp_path):
"""
Test compute parameter (want to return highlevelgraph)
"""
partitions = 2
arr = ak.Array([{"a": [1, 2, 3]}, {"a": [4, 5]}])
dask_arr = dask_awkward.from_awkward(arr, npartitions=partitions)
graph = uproot.dask_write(
dask_arr,
str(tmp_path),
prefix="compute",
compute=False,
)
graph.compute()
file_1 = uproot.open(os.path.join(tmp_path, "compute-part0.root"))
assert len(file_1["tree"]["a"].arrays()) == math.ceil(len(arr) / partitions)
assert ak.all(file_1["tree"]["a"].arrays()["a"][0] == arr[0]["a"])
file_2 = uproot.open(os.path.join(tmp_path, "compute-part1.root"))
assert len(file_2["tree"]["a"].arrays()) == int(len(arr) / partitions)
assert ak.all(file_2["tree"]["a"].arrays()["a"][0] == arr[1]["a"])


def test_compute(tmp_path):
print("here")
partitions = 2
arr = uproot.open(skhep_testdata.data_path("uproot-HZZ.root"))["events"].arrays()
dask_arr = dask_awkward.from_awkward(ak.from_iter(arr), partitions)
with Client() as _:
zbilodea marked this conversation as resolved.
Show resolved Hide resolved
graph = uproot.dask_write(
dask_arr, str(tmp_path), prefix="distribute", compute=False
)
dask.compute(graph)
dask_arr = dask_awkward.from_awkward(ak.from_iter(arr), partitions)
file_1 = uproot.open(os.path.join(tmp_path, "distribute-part0.root"))
assert len(file_1["tree"]["Jet_Px"].arrays()) == math.ceil(len(arr) / partitions)
assert ak.all(file_1["tree"]["Jet_Px"].arrays()["Jet_Px"][0] == arr["Jet_Px"])
file_2 = uproot.open(os.path.join(tmp_path, "distribute-part1.root"))
assert len(file_2["tree"]["Jet_Px"].arrays()) == int(len(arr) / partitions)
assert ak.all(file_2["tree"]["Jet_Px"].arrays()["Jet_Px"][0] == arr["Jet_Px"])


# if __name__ == "__main__":
# test_compute("\my-output")
zbilodea marked this conversation as resolved.
Show resolved Hide resolved
159 changes: 0 additions & 159 deletions tests/test_dask_to_root.py

This file was deleted.