From 92e44f69153338a72b766fa838887a28ba91f765 Mon Sep 17 00:00:00 2001 From: Victor Ananyev Date: Thu, 10 Sep 2020 17:18:10 +0200 Subject: [PATCH] add batch iter over tsv files --- mc_utils/io/lhe_io.py | 37 ++++++++++++++++++ mc_utils/io/reader.py | 44 ++++++++++++++++++++++ mc_utils/io/tsv_io.py | 66 ++++++++++++++++++++++++++++++++ mc_utils/lhe_io.py | 88 ------------------------------------------- 4 files changed, 147 insertions(+), 88 deletions(-) create mode 100644 mc_utils/io/lhe_io.py create mode 100644 mc_utils/io/reader.py create mode 100644 mc_utils/io/tsv_io.py delete mode 100644 mc_utils/lhe_io.py diff --git a/mc_utils/io/lhe_io.py b/mc_utils/io/lhe_io.py new file mode 100644 index 0000000..d8f512b --- /dev/null +++ b/mc_utils/io/lhe_io.py @@ -0,0 +1,37 @@ +import logging +import re + +from itertools import chain + +from mc_utils.fastlhe import parse_batch +from mc_utils.io.reader import iter_file_chunked + +logger = logging.getLogger("mc_utils.io.lhe_io") + +EVENT_START_MARKER = b"\n" +EVENT_END_MARKER_LEN = len(EVENT_END_MARKER) + +RE_multispace = re.compile(br"\s+") + + +def _parse_event_batch(buf, event_batch, event_postprocess=None): + logger.debug("Start parsing event batch") + arr, buf = parse_batch(5, buf, event_batch) + if event_postprocess is not None: + arr = event_postprocess(arr) + logger.debug("Built array. Return.") + return arr, buf + + +def lhe_iter_files(paths, chunksize=None, gzipped=None, event_postprocess=None): + if not isinstance(paths, list): + yield from lhe_iter_files([paths], chunksize, gzipped, event_postprocess) + return + raw_event_chunks = (iter_file_chunked(path, chunksize, gzipped) for path in paths) + buf = "" + for raw_event_chunk in chain.from_iterable(raw_event_chunks): + logger.debug("Got chunk. Start parsing batch") + arr, buf = _parse_event_batch(buf, raw_event_chunk, event_postprocess) + yield arr + logger.debug("Yielded array. Fetch next raw chunk") diff --git a/mc_utils/io/reader.py b/mc_utils/io/reader.py new file mode 100644 index 0000000..3844856 --- /dev/null +++ b/mc_utils/io/reader.py @@ -0,0 +1,44 @@ +import gzip +import zlib + +from io import BufferedIOBase + +try: + from XRootD import client + from XRootD.client.flags import OpenFlags + XROOTD_AVAILABLE = True +except ModuleNotFoundError: + XROOTD_AVAILABLE = False + + +def _iter_file(path, chunksize, gzipped): + opener = open if not gzipped else gzip.open + with opener(path, "rb") as raw_e: + while True: + chunk = raw_e.read(chunksize) + if chunk == b"": + break + yield chunk + + +def _iter_xrootd(filepath, chunksize, gzipped): + with client.File() as f: + f.open(filepath, OpenFlags.READ) + if gzipped: + dec = zlib.decompressobj(32 + zlib.MAX_WBITS) + for chunk in f.readchunks(offset=0, chunksize=chunksize): + yield dec.decompress(chunk) + else: + for chunk in f.readchunks(offset=0, chunksize=chunksize): + yield chunk + + +def iter_file_chunked(path, chunksize=None, gzipped=None): + chunksize = 1000 if chunksize is None else chunksize + + is_root = path.startswith("root://") + is_gzip = (path.endswith(".gz") and gzipped is None) or gzipped + + f_iterator = _iter_xrootd if is_root else _iter_file + raw_event_chunks = f_iterator(path, chunksize, is_gzip) + yield from raw_event_chunks diff --git a/mc_utils/io/tsv_io.py b/mc_utils/io/tsv_io.py new file mode 100644 index 0000000..9ac832e --- /dev/null +++ b/mc_utils/io/tsv_io.py @@ -0,0 +1,66 @@ +import logging +from io import StringIO +from itertools import chain + +import numpy as np + +from mc_utils.io.reader import iter_file_chunked + +logger = logging.getLogger("mc_utils.io.tsv_io") + +HEADER_MARKER = b"#" +NEWLINE_MARKER = b"\n" + + +def _skip_preamble(chunks_iter): + events_started = False + logger.debug("Start reading file") + for chunk in chunks_iter: + if not events_started: + logger.debug("Skiping preamble") + events_loc = chunk.rfind(HEADER_MARKER) + if events_loc == -1: + continue + events_loc = chunk.find(NEWLINE_MARKER, events_loc) + events_started = True + chunk = chunk[events_loc+1:] + logger.debug("Yield raw chunk") + yield chunk + + +def _parse_batch(buf, batch): + full_batch_idx = batch.rfind(NEWLINE_MARKER) + if buf: + batch = buf + batch + full_batch_idx = full_batch_idx + len(buf) if full_batch_idx != -1 else -1 + if full_batch_idx == -1: + return None, batch + newbuf = batch[full_batch_idx+1:] + batch = StringIO(batch[:full_batch_idx+1].decode("ascii")) + return np.genfromtxt(batch, dtype=np.complex), newbuf + + +def _parse_event_batch(buf, event_batch, event_postprocess=None): + logger.debug("Start parsing event batch") + arr, buf = _parse_batch(buf, event_batch) + if arr is None: + return None, buf + if event_postprocess is not None: + arr = event_postprocess(arr) + logger.debug("Built array. Return.") + return arr, buf + + +def tsv_iter_files(paths, chunksize=None, gzipped=None, event_postprocess=None): + if not isinstance(paths, list): + yield from tsv_iter_files([paths], chunksize, gzipped, event_postprocess) + return + raw_event_chunks = (_skip_preamble(iter_file_chunked(path, chunksize, gzipped)) for path in paths) + buf = b"" + for raw_event_chunk in chain.from_iterable(raw_event_chunks): + logger.debug("Got chunk. Start parsing batch") + arr, buf = _parse_event_batch(buf, raw_event_chunk, event_postprocess) + if arr is None: + continue + yield arr + logger.debug("Yielded array. Fetch next raw chunk") diff --git a/mc_utils/lhe_io.py b/mc_utils/lhe_io.py deleted file mode 100644 index 202bda8..0000000 --- a/mc_utils/lhe_io.py +++ /dev/null @@ -1,88 +0,0 @@ -import gzip -import logging -import re -import zlib - -from itertools import chain - -from mc_utils.fastlhe import parse_batch - -try: - from XRootD import client - from XRootD.client.flags import OpenFlags - XROOTD_AVAILABLE = True -except ModuleNotFoundError: - XROOTD_AVAILABLE = False - -logger = logging.getLogger("mc_utils.lhe_io") - -EVENT_START_MARKER = b"\n" -EVENT_END_MARKER_LEN = len(EVENT_END_MARKER) - -RE_multispace = re.compile(br"\s+") - - -def _iter_file(path, chunksize, gzipped): - opener = open if not gzipped else gzip.open - with opener(path, "rb") as raw_e: - while True: - chunk = raw_e.read(chunksize) - if chunk == b"": - break - yield chunk - - -def _iter_xrootd(filepath, chunksize, gzipped): - with client.File() as f: - f.open(filepath, OpenFlags.READ) - if gzipped: - dec = zlib.decompressobj(32 + zlib.MAX_WBITS) - for chunk in f.readchunks(offset=0, chunksize=chunksize): - yield dec.decompress(chunk) - else: - for chunk in f.readchunks(offset=0, chunksize=chunksize): - yield chunk - - -def _skip_preamble(chunks_iter): - events_started = False - logger.debug("Start reading file") - for chunk in chunks_iter: - if not events_started: - logger.debug("Skiping preamble") - events_loc = chunk.find(EVENT_START_MARKER) - if events_loc == -1: - continue - events_started = True - chunk = chunk[events_loc:] - logger.debug("Yield raw chunk") - yield chunk - - -def _parse_event_batch(buf, event_batch, event_postprocess=None): - logger.debug("Start parsing event batch") - arr, buf = parse_batch(5, buf, event_batch) - if event_postprocess is not None: - arr = event_postprocess(arr) - logger.debug("Built array. Return.") - return arr, buf - - -def lhe_iter_files(paths, chunksize=100000000, gzipped=None, event_postprocess=None): - if not isinstance(paths, list): - yield from lhe_iter_files([paths], chunksize, gzipped, event_postprocess) - return - - is_root = [path.startswith("root://") for path in paths] - is_gzip = [((path.endswith(".gz") and gzipped is None) or gzipped) for path in paths] - - f_iterator = [(_iter_xrootd if i_is_root else _iter_file) for i_is_root in is_root] - raw_event_chunks = (i_f_iterator(path, chunksize, i_is_gzip) for - i_f_iterator, path, i_is_gzip in zip(f_iterator, paths, is_gzip)) - buf = "" - for raw_event_chunk in chain.from_iterable(raw_event_chunks): - logger.debug("Got chunk. Start parsing batch") - arr, buf = _parse_event_batch(buf, raw_event_chunk, event_postprocess) - yield arr - logger.debug("Yielded array. Fetch next raw chunk")