Skip to content

Commit

Permalink
add batch iter over tsv files
Browse files Browse the repository at this point in the history
  • Loading branch information
vindex10 committed Sep 10, 2020
1 parent 6427f1a commit 92e44f6
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 88 deletions.
37 changes: 37 additions & 0 deletions mc_utils/io/lhe_io.py
@@ -0,0 +1,37 @@
import logging
import re

from itertools import chain

from mc_utils.fastlhe import parse_batch
from mc_utils.io.reader import iter_file_chunked

logger = logging.getLogger("mc_utils.io.lhe_io")

EVENT_START_MARKER = b"<event"
EVENT_END_MARKER = b"</event>\n"
EVENT_END_MARKER_LEN = len(EVENT_END_MARKER)

RE_multispace = re.compile(br"\s+")


def _parse_event_batch(buf, event_batch, event_postprocess=None):
logger.debug("Start parsing event batch")
arr, buf = parse_batch(5, buf, event_batch)
if event_postprocess is not None:
arr = event_postprocess(arr)
logger.debug("Built array. Return.")
return arr, buf


def lhe_iter_files(paths, chunksize=None, gzipped=None, event_postprocess=None):
if not isinstance(paths, list):
yield from lhe_iter_files([paths], chunksize, gzipped, event_postprocess)
return
raw_event_chunks = (iter_file_chunked(path, chunksize, gzipped) for path in paths)
buf = ""
for raw_event_chunk in chain.from_iterable(raw_event_chunks):
logger.debug("Got chunk. Start parsing batch")
arr, buf = _parse_event_batch(buf, raw_event_chunk, event_postprocess)
yield arr
logger.debug("Yielded array. Fetch next raw chunk")
44 changes: 44 additions & 0 deletions mc_utils/io/reader.py
@@ -0,0 +1,44 @@
import gzip
import zlib

from io import BufferedIOBase

try:
from XRootD import client
from XRootD.client.flags import OpenFlags
XROOTD_AVAILABLE = True
except ModuleNotFoundError:
XROOTD_AVAILABLE = False


def _iter_file(path, chunksize, gzipped):
opener = open if not gzipped else gzip.open
with opener(path, "rb") as raw_e:
while True:
chunk = raw_e.read(chunksize)
if chunk == b"":
break
yield chunk


def _iter_xrootd(filepath, chunksize, gzipped):
with client.File() as f:
f.open(filepath, OpenFlags.READ)
if gzipped:
dec = zlib.decompressobj(32 + zlib.MAX_WBITS)
for chunk in f.readchunks(offset=0, chunksize=chunksize):
yield dec.decompress(chunk)
else:
for chunk in f.readchunks(offset=0, chunksize=chunksize):
yield chunk


def iter_file_chunked(path, chunksize=None, gzipped=None):
chunksize = 1000 if chunksize is None else chunksize

is_root = path.startswith("root://")
is_gzip = (path.endswith(".gz") and gzipped is None) or gzipped

f_iterator = _iter_xrootd if is_root else _iter_file
raw_event_chunks = f_iterator(path, chunksize, is_gzip)
yield from raw_event_chunks
66 changes: 66 additions & 0 deletions mc_utils/io/tsv_io.py
@@ -0,0 +1,66 @@
import logging
from io import StringIO
from itertools import chain

import numpy as np

from mc_utils.io.reader import iter_file_chunked

logger = logging.getLogger("mc_utils.io.tsv_io")

HEADER_MARKER = b"#"
NEWLINE_MARKER = b"\n"


def _skip_preamble(chunks_iter):
events_started = False
logger.debug("Start reading file")
for chunk in chunks_iter:
if not events_started:
logger.debug("Skiping preamble")
events_loc = chunk.rfind(HEADER_MARKER)
if events_loc == -1:
continue
events_loc = chunk.find(NEWLINE_MARKER, events_loc)
events_started = True
chunk = chunk[events_loc+1:]
logger.debug("Yield raw chunk")
yield chunk


def _parse_batch(buf, batch):
full_batch_idx = batch.rfind(NEWLINE_MARKER)
if buf:
batch = buf + batch
full_batch_idx = full_batch_idx + len(buf) if full_batch_idx != -1 else -1
if full_batch_idx == -1:
return None, batch
newbuf = batch[full_batch_idx+1:]
batch = StringIO(batch[:full_batch_idx+1].decode("ascii"))
return np.genfromtxt(batch, dtype=np.complex), newbuf


def _parse_event_batch(buf, event_batch, event_postprocess=None):
logger.debug("Start parsing event batch")
arr, buf = _parse_batch(buf, event_batch)
if arr is None:
return None, buf
if event_postprocess is not None:
arr = event_postprocess(arr)
logger.debug("Built array. Return.")
return arr, buf


def tsv_iter_files(paths, chunksize=None, gzipped=None, event_postprocess=None):
if not isinstance(paths, list):
yield from tsv_iter_files([paths], chunksize, gzipped, event_postprocess)
return
raw_event_chunks = (_skip_preamble(iter_file_chunked(path, chunksize, gzipped)) for path in paths)
buf = b""
for raw_event_chunk in chain.from_iterable(raw_event_chunks):
logger.debug("Got chunk. Start parsing batch")
arr, buf = _parse_event_batch(buf, raw_event_chunk, event_postprocess)
if arr is None:
continue
yield arr
logger.debug("Yielded array. Fetch next raw chunk")
88 changes: 0 additions & 88 deletions mc_utils/lhe_io.py

This file was deleted.

0 comments on commit 92e44f6

Please sign in to comment.