Skip to content

Commit

Permalink
Add lena.flow.{ReadROOTFile,ReadROOTTree} and lena.output.WriteROOTTree.
Browse files Browse the repository at this point in the history
Mostly tested, to be done.
For Lena to become interactive.
  • Loading branch information
ynikitenko committed Mar 8, 2021
1 parent c6d17d3 commit be5f43f
Show file tree
Hide file tree
Showing 9 changed files with 715 additions and 1 deletion.
8 changes: 8 additions & 0 deletions lena/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from lena.flow.group_scale import GroupScale
from lena.flow.iterators import Chain, CountFrom, ISlice
from lena.flow.print_ import Print
from lena.flow.read_root_file import ReadROOTFile
from lena.flow.read_root_tree import ReadROOTTree
from lena.flow.selectors import Not, Selector
from lena.flow.split_into_bins import (
SplitIntoBins, ReduceBinContent, TransformBins,
Expand All @@ -16,14 +18,20 @@


__all__ = [
# elements
'Cache', 'Count', 'DropContext', 'End', 'Print',
'Chain', 'CountFrom', 'ISlice',
'ReadROOTFile',
'ReadROOTTree',
'Zip',
# functions
'get_context', 'get_data', 'get_data_context',
# groups
'GroupBy', 'GroupScale',
'Not', 'Selector',
'seq_map',
'TransformIf',
# split into bins
'SplitIntoBins',
'ReduceBinContent', 'TransformBins',
'get_example_bin',
Expand Down
142 changes: 142 additions & 0 deletions lena/flow/read_root_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import inspect
import copy
import sys

import ROOT

import lena


class ReadROOTFile():
"""Read ROOT files from flow."""

def __init__(self, types=None, keys=None, selector=None):
"""Keyword arguments specify which objects should be read
from ROOT files.
*types* sets the list of possible objects types.
*keys* specifies a list of allowed objects' names.
Only simple keys are currently allowed (no regular expressions).
If both *types* and *keys* are provided, then
objects that satisfy any of *types* *or* *keys*
are read.
*selector* is a general function, which accepts
an object from a ROOT file and returns a boolean.
If *selector* is given, both *types* and *keys* must
be omitted, or :exc:`.LenaValueError` is raised.
"""
if selector is not None:
if keys or types:
raise lena.core.LenaValueError(
"if selector is provided, keys and types "
"must not be passed"
)
if not callable(selector):
raise lena.core.LenaTypeError(
"selector must be callable"
)
self._selector = selector
return

if keys is not None:
if not isinstance(keys, list):
raise lena.core.LenaTypeError(
"keys must be a list of strings"
)
# ROOT keys can have unicode names
if (sys.version[0] == 2 and
any((not isinstance(key, basestring) for key in keys))) or \
(sys.version[0] > 2 and
any((not isinstance(key, str) for key in keys))):
raise lena.core.LenaValueError(
"keys must contain only strings"
)
# todo: allow regular expressions
# todo: allow ROOT object versions
keys_selector = [lambda obj: obj.GetName() == key
for key in keys]

if types is not None:
if not isinstance(types, list):
raise lena.core.LenaTypeError(
"types must be a list of types"
)
# maybe inspect is needed only for Python 2 types
# not derived from object. Otherwise use isinstance(_, type)
if any((not inspect.isclass(tp) for tp in types)):
raise lena.core.LenaTypeError(
"types must must contain only types"
)
# in Lena "and" means a list, while "or" means a tuple.
# In Python isinstance requires a tuple.
types = tuple(types)
types_selector = lambda obj: isinstance(obj, types)

if types is None and keys is None:
self._selector = None
elif keys:
if types:
self._selector = lena.flow.Selector(
[types_selector, keys_selector]
)
else:
self._selector = lena.flow.Selector(keys_selector)

def run(self, flow):
"""Read ROOT files from *flow* and yield objects they contain.
For file to be read,
data part of the value must be a string and
*context.data.read_root_file* must not be `False`.
*context.data.root_file_path* is updated
with the path to the ROOT file.
Warning
=======
After a ROOT file is closed,
all its contained objects are destroyed.
Make all processing within one flow:
don't save yielded values to a list,
or make proper copies of them in advance.
"""
for val in flow:
data, context = lena.flow.get_data_context(val)

# skip not ROOT files
if sys.version[0] == 2:
str_type = basestring
else:
str_type = str
if not isinstance(data, str_type) or not \
lena.context.get_recursively(context, "data.read_root_file",
True):
yield val
continue

root_file = ROOT.TFile(data, "read")
# context of separate keys shall be updated
# when they are transformed to other types
# in other elements
lena.context.update_recursively(
context, {"data": {"root_file_path": data}}
)

def get_key_names(fil):
return [key.GetName() for key in fil.GetListOfKeys()]
key_names = get_key_names(root_file)

for key_name in key_names:
# result of TFile.Get is not a TKey, but a proper type
obj = root_file.Get(key_name)
if self._selector:
if not self._selector(obj):
continue
yield (obj, copy.deepcopy(context))

# will be closed after
# following elements used its data
root_file.Close()
127 changes: 127 additions & 0 deletions lena/flow/read_root_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# needs ROOT installed
import collections
import copy
import sys

import lena


class ReadROOTTree():
"""Read ROOT trees coming from *flow*."""

def __init__(self, branches=None, get_entry=None):
"""There are two ways in which trees could be read.
In the first variant, *branches* is a list of strings
that enables to read the specified tree branches,
and only them (thus to speed up data reading).
Tree entries are yielded as named tuples
with fields named after *branches*.
In the second variant, the tree is set up elsewhere.
It has an associated object, which is filled with tree entries
and returned with *get_entry*.
Exactly one of *branches* or *get_entry* (not both)
must be provided, otherwise :exc:`.LenaTypeError` is raised.
Note
====
If you plan to collect the resulting values
(not use them on the fly), make sure that you use
e.g. *copy.deepcopy* in *get_entry*.
Otherwise all items collected will be the last read value.
"""
# todo: should this class belong
# to lena.flow or lena.input, lena.readers?

# This loads other classes faster,
# and if ROOT is not installed,
# still enables "from lena.flow import ReadROOTTree",
# instead of "from lena.flow.read_root_tree import ReadROOTTree"
import ROOT

if branches is not None:
err_msg = ""
if not isinstance(branches, list):
err_msg = "branches must be a list of strings"
if sys.version_info.major == 2:
if any((not isinstance(br, basestring) for br in branches)):
# ROOT allows unicode names.
err_msg = "branches must be a list of strings"
else:
if any((not isinstance(br, str) for br in branches)):
err_msg = "branches must be a list of strings"
if err_msg:
raise lena.core.LenaTypeError(err_msg)
# todo: maybe allow regexps in the future.
if any(('*' in br for br in branches)):
raise lena.core.LenaValueError(
"branches must be strings without regular expressions"
)
if get_entry is not None:
raise lena.core.LenaTypeError(
"either branches or get_entry should be supplied, "
"not both"
)
else:
if get_entry is None:
raise lena.core.LenaTypeError(
"initialize branches or get_entry"
)
# todo: allow empty branches to signify all branches.
# Use TTree:GetListOfBranches()
# This would be not a particularly good design,
# because it's suboptimal to read all data instead of needed,
# but that would decouple data from code.

if get_entry is not None and not callable(get_entry):
raise lena.core.LenaTypeError("get_entry must be callable")

self._branches = branches
self._get_entry = get_entry

def _read_branches(self, tree):
branches = self._branches
# disable all branches
tree.SetBranchStatus("*", 0)
# enable allowed branches
for br in branches:
tree.SetBranchStatus(br, 1)
# create output type
tree_name = tree.GetName()
tup_name = tree_name + "_entry" if tree_name else "tree_entry"
entry_tuple = collections.namedtuple(tup_name, branches)
# yield entries
for entry in tree:
yield entry_tuple(*(getattr(entry, br) for br in branches))

def run(self, flow):
import ROOT

for val in flow:
# get tree
tree, context = lena.flow.get_data_context(val)
if not isinstance(tree, ROOT.TTree):
yield val
continue

# add context.data
data_c = {}
tree_dir = tree.GetDirectory()
# if a ROOT file was opened in a Sequence,
# its path will be already in the context.
## a tree can exist outside of a file, in memory.
# if tree_dir:
# file_name = tree_dir.GetName()
# data_c["root_file_path"] = file_name
data_c["root_tree_name"] = tree.GetName()
lena.context.update_recursively(context, {"data": data_c})

# get entries
if self._branches:
for data in self._read_branches(tree):
yield (data, copy.deepcopy(context))
elif self._get_entry:
for entry in tree:
yield (self._get_entry(), copy.deepcopy(context))
2 changes: 1 addition & 1 deletion lena/flow/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, sequences, name="zip", fields=[]):
with these fields and *name* (by default "zip").
*Fields* in this case must have same length as *sequences*
(unless they are a string),
or :exc:`.LenaTypeError* is raised.
or :exc:`.LenaTypeError` is raised.
"""
if not sequences:
raise exceptions.LenaTypeError(
Expand Down

0 comments on commit be5f43f

Please sign in to comment.