In [1]:
import os, sys
import multiprocessing as mp
import warnings
import h5py as h5
import numpy as np
import pandas as pd
import hepfile as hf

sys.path.append(os.path.abspath(os.pardir))
sys.path.insert(0, os.path.abspath("../src/hepfile"))

from src.hepfile.constants import *
from errors import *
from src.hepfile.read import *

In [32]:
def _load_subset(packed_args) -> dict:
    
    '''
    Reads a subset of the data, from the HDF5 file to fill a data dictionary.

    Args:
	packed_args (list): list with 1 x 4 dimensions in the order: infile, subset, verbose, desired_groups. 
                        subset, verbose, and desired_groups should be the same for all rows
    
    Returns:
	data (dict): Selected data from HDF5 file

    '''
    
    # unpack args    
    infile, subset, verbose, desired_groups = zip(*packed_args)
    
    # Create the initial data and bucket dictionary to hold the data
    data = {}
    bucket = {}

    # We'll fill the data dictionary with some extra fields, though we won't
    # need them all for the bucket
    data["_MAP_DATASETS_TO_COUNTERS_"] = {}
    data["_MAP_DATASETS_TO_INDEX_"] = {}
    data["_LIST_OF_COUNTERS_"] = []
    data["_LIST_OF_DATASETS_"] = []
    data["_META_"] = {}

    # Get the number of buckets.
    # In HEP (High Energy Physics), this would be the number of events
    data["_NUMBER_OF_BUCKETS_"] = infile.attrs["_NUMBER_OF_BUCKETS_"]

    if type(subset) is tuple:
        subset = list(subset)

    if type(subset) is int:
        print("Single subset value of {subset} being interpreted as a high range")
        print(f"subset being set to a range of (0,{subset})\n")
        subset = [0, subset]

    # If the user has specified `subset` incorrectly, then let's return
    # an empty data and bucket
    if subset[1]-subset[0]<=0:
        raise RangeSubsetError(f"The range in subset is either 0 or negative! {subset[1]} - {subset[0]} = {subset[1] - subset[0]}")

    # Make sure the user is not asking for something bigger than the file!
    nbuckets = data["_NUMBER_OF_BUCKETS_"]

    if subset[0] > nbuckets:
        raise RangeSubsetError(f'Range for subset starts greater than number of buckets in file! {subset[0]} > {nbuckets}')

    if subset[1] > nbuckets:
        warnings.warn(f'Range for subset is greater than number of buckets in file!\n{subset[1]} > {nbuckets}\nHigh range of subset will be set to {nbuckets}\n')
        subset[1] = nbuckets

    data["_NUMBER_OF_BUCKETS_"] = subset[1] - subset[0]

    nbuckets = data["_NUMBER_OF_BUCKETS_"]

    ############################################################################
    # Get the datasets and counters
    ############################################################################
    dc = infile["_MAP_DATASETS_TO_COUNTERS_"]
    for vals in dc:

        if verbose:
            print(f"Map datasets to counters: {vals}")

        # The decode is there because vals were stored as numpy.bytes
        counter = vals[1].decode()
        index = f"{counter}_INDEX"
        data["_MAP_DATASETS_TO_COUNTERS_"][vals[0].decode()] = counter
        data["_MAP_DATASETS_TO_INDEX_"][vals[0].decode()] = index
        data["_LIST_OF_COUNTERS_"].append(vals[1].decode())
        data["_LIST_OF_DATASETS_"].append(vals[0].decode())
        data["_LIST_OF_DATASETS_"].append(vals[1].decode())  # Get the counters as well

    # We may have added some counters and datasets multiple times.
    # So just to be sure, only keep the unique values
    data["_LIST_OF_COUNTERS_"] = np.unique(data["_LIST_OF_COUNTERS_"]).tolist()
    data["_LIST_OF_DATASETS_"] = np.unique(data["_LIST_OF_DATASETS_"]).tolist()
    ############################################################################            

    ############################################################################
    # Pull out the SINGLETON datasets
    ############################################################################
    sg = infile["_SINGLETONSGROUPFORSTORAGE_"][0]  # This is a numpy array of strings
    decoded_string = sg[1].decode()

    vals = decoded_string.split("__:__")
    vals.remove("COUNTER")

    data["_SINGLETONS_GROUP_"] = vals
    ############################################################################

    ############################################################################
    # Get the list of datasets and groups
    ############################################################################
    all_datasets = data["_LIST_OF_DATASETS_"]

    if verbose:
        print(f"all_datasets: {all_datasets}")
    ############################################################################

    ############################################################################
    # Only keep select data from file, if we have specified desired_groups
    ############################################################################
    if desired_groups is not None:
        if type(desired_groups) != list:
            desired_groups = list(desired_groups)

        # Count backwards because we'll be removing stuff as we go.
        i = len(all_datasets) - 1
        while i >= 0:
            entry = all_datasets[i]

            is_dropped = True
            # This is looking to see if the string is anywhere in the name
            # of the dataset
            for desdat in desired_groups:
                if desdat in entry:
                    is_dropped = False
                    break

            if is_dropped == True:
                print(f"Not reading out {entry} from the file....")
                all_datasets.remove(entry)

            i -= 1

        if verbose:
            print(f"After only selecting certain datasets ----- ")
            print(f"all_datasets: {all_datasets}")
    ###########################################################################

    # We might need the counter for SINGLETONS so let's pull it out
    data["_SINGLETONS_GROUP_/COUNTER"] = infile["_SINGLETONS_GROUP_"]['COUNTER']

    if verbose == True:
        print("\nDatasets and counters:")
        print(data["_MAP_DATASETS_TO_COUNTERS_"])
        print("\nList of counters:")
        print(data["_LIST_OF_COUNTERS_"])
        print("\n_SINGLETONS_GROUP_/COUNTER:")
        print(data["_SINGLETONS_GROUP_/COUNTER"])
        print("\n")

    ############################################################################
    # Pull out the counters and build the indices
    ############################################################################
    print("Building the indices...\n")

    if verbose:
        print("data.keys()")
        print(data.keys())
        print("\n")

    # We will need to keep track of the indices in the entire file
    # This way, if the user specifies a subset of the data, we have the full 
    # indices already calculated
    full_file_indices = {}

    for counter_name in data["_LIST_OF_COUNTERS_"]:

        if verbose:
            print(f"counter name: ------------ {counter_name}\n")

        full_file_counters = infile[counter_name]
        full_file_index = calculate_index_from_counters(full_file_counters)

        if verbose:
            print(f"full file counters: {full_file_counters}\n")
            print(f"full file index: {full_file_index}\n")

        # If we passed in subset, grab that slice of the data from the file
        if subset is not None and subset[1] <= subset[0]:
            raise RangeSubsetError(f"Unable to read anything in! High range of {subset[1]} is less than or equal to low range of {subset[0]}")

        elif subset is not None:
            # We tack on +1 to the high range of subset when we pull out the counters
            # and index because we want to get all of the entries for the last entry.
            data[counter_name] = infile[counter_name][subset[0] : subset[1]+1]
            index = full_file_index[subset[0] : subset[1]+1]
        else:
            data[counter_name] = infile[counter_name][:]
            index = full_file_index

        subset_index = index
        # If the file is *not* empty....
        # Just to make sure the "local" index of the data dictionary starts at 0
        if len(index)>0:
            subset_index = index - index[0]

        index_name = "%s_INDEX" % (counter_name)

        data[index_name] = subset_index
        full_file_indices[index_name] = index

    print("Built the indices!")

    if verbose:
        print("full_file_index: ")
        print(f"{full_file_indices}\n")

    # Loop over the all_datasets we want and pull out the data.
    for name in all_datasets:

        # If this is a counter, we're going to have to grab the indices
        # differently than for a "normal" dataset
        IS_COUNTER = True
        index_name = None
        if name not in data["_LIST_OF_COUNTERS_"]:
            index_name = data["_MAP_DATASETS_TO_INDEX_"][name]
            IS_COUNTER = False # We will use different indices for the counters

        if verbose == True:
            print(f"------ {name}")
            print(f"index_name: {index_name}\n")

        dataset = infile[name]

        if verbose:
            print(f"dataset type: {type(dataset)}")

        # This will ignore the groups
        if type(dataset) == h5.Dataset:
            dataset_name = name

            if subset is not None:
                if IS_COUNTER:
                    # If this is a counter, then the subset indices
                    # map on to the same locations for any counters
                    lo = subset[0]
                    hi = subset[1]
                else:
                    lo = full_file_indices[index_name][0]
                    hi = full_file_indices[index_name][-1]
                if verbose:
                    print(f"dataset name/lo/hi: {dataset_name},{lo},{hi}\n")
                data[dataset_name] = dataset[lo : hi]
            else:
                data[dataset_name] = dataset[:]

            bucket[dataset_name] = None  # This will be filled for individual bucket
            if verbose == True:
                print(dataset)

        # write the metadata for that group to data if it exists
        if name not in constants.protected_names and 'meta' in dataset.attrs.keys():
            data['_META_'][name] = dataset.attrs['meta']
                        
    print("Data is read in and input file is closed.")

    # edit data so it matches the format of the data dict that was saved to the file
    # this makes it so that data can be directly passed to write_to_file
    # 1) add back in _GROUP_
    datasets = np.array(data['_LIST_OF_DATASETS_'])

    allgroups = np.array([d.split('/')[0] for d in datasets])
    
    singletons_group = set(data['_SINGLETONS_GROUP_'])
    groups = {}
    
    groups['_SINGLETONS_GROUP_'] = data['_SINGLETONS_GROUP_'] # copy over the data
    
    for key in np.unique(allgroups):

        if key in singletons_group: continue
        if key in constants.protected_names: continue
        
        where_groups = np.where((key == allgroups) * (key != datasets))[0]
        groups[key] = [dataset.split('/')[-1] for dataset in datasets[where_groups]]
        
    data['_GROUPS_'] = groups

    # 2) add back in _MAP_DATASETS_TO_DATA_TYPES
    dtypes = {}
    for key in data['_LIST_OF_DATASETS_']:

        if key not in data.keys():
            continue
        
        if isinstance(data[key], list):
            data[key] = np.array(data[key])

        dtypes[key] = data[key].dtype
        
    data['_MAP_DATASETS_TO_DATA_TYPES_'] = dtypes

    return data, bucket


In [64]:
def load(filename:str, num_cores:int=mp.cpu_count(), verbose:bool=False, desired_groups:list[str]=None, subset:int=None, return_awkward:bool=False) -> tuple[dict, dict]:
    '''
    Reads all, or a subset of the data, from the HDF5 file to fill a data dictionary.
    Returns an empty dictionary to be filled later with data from individual buckets.

    Args:
	filename (string): Name of the input file
	
	verbose (boolean): True if debug output is required

	desired_groups (list): Groups to be read from input file, 

	subset (int): Number of buckets to be read from input file

        return_awkward (boolean): If True, returns an awkward array Record. Default is False

    Returns:
	data (dict): Selected data from HDF5 file
	
	bucket (dict): An empty bucket dictionary to be filled by data from select buckets

    '''
    
    with h5.File(filename, 'r+') as infile:
        
        nbuckets = infile.attrs["_NUMBER_OF_BUCKETS_"]
        if subset is None:
            subset = (0, nbuckets)
        
        # check number of cores
        if num_cores > nbuckets:
            warnings.warn('num_cores is greater than nbuckets, reducing number of cores used to the number of buckets!')
            num_cores = nbuckets
        
        # pack up the arguments to pass to multiprocessing pool
        per_core = np.ceil((subset[1] - subset[0]) / num_cores)
        all_subsets = []
        min_val = subset[0]
        max_val = -1
        while max_val < subset[1]:
            max_val = min_val + per_core
            if max_val > nbuckets:
                max_val = nbuckets
            all_subsets.append((int(min_val), int(max_val)))
            min_val += per_core
        
        n = len(all_subsets)
        packed_args = list(zip([infile]*n, all_subsets, [verbose]*n, [desired_groups]*n))
        
        with mp.Pool(num_cores) as p:
            out = p.apply(_load_subset, packed_args)
        
        #data, bucket = _load_subset(infile, verbose=verbose, desired_groups=desired_groups, subset=subset)
        print(out)
#     if return_awkward:
#         from hepfile.awkward_tools import hepfile_to_awkward
#         return hepfile_to_awkward(data), bucket
    
#     return data, bucket
    

In [66]:
filepath = os.path.abspath('../docs/example_nb/updated-awkward-array.h5')
data, bucket = load(filepath, num_cores=1)

TypeError: h5py objects cannot be pickled

In [28]:
data

{'_MAP_DATASETS_TO_COUNTERS_': {'_SINGLETONS_GROUP_': '_SINGLETONS_GROUP_/COUNTER',
  'nParticles': '_SINGLETONS_GROUP_/COUNTER',
  'jet': 'jet/njet',
  'jet/px': 'jet/njet',
  'jet/py': 'jet/njet',
  'muons': 'muons/nmuons',
  'muons/px': 'muons/nmuons',
  'muons/py': 'muons/nmuons'},
 '_MAP_DATASETS_TO_INDEX_': {'_SINGLETONS_GROUP_': '_SINGLETONS_GROUP_/COUNTER_INDEX',
  'nParticles': '_SINGLETONS_GROUP_/COUNTER_INDEX',
  'jet': 'jet/njet_INDEX',
  'jet/px': 'jet/njet_INDEX',
  'jet/py': 'jet/njet_INDEX',
  'muons': 'muons/nmuons_INDEX',
  'muons/px': 'muons/nmuons_INDEX',
  'muons/py': 'muons/nmuons_INDEX'},
 '_LIST_OF_COUNTERS_': ['_SINGLETONS_GROUP_/COUNTER',
  'jet/njet',
  'muons/nmuons'],
 '_LIST_OF_DATASETS_': ['_SINGLETONS_GROUP_',
  '_SINGLETONS_GROUP_/COUNTER',
  'jet',
  'jet/njet',
  'jet/px',
  'jet/py',
  'muons',
  'muons/nmuons',
  'muons/px',
  'muons/py',
  'nParticles'],
 '_META_': {},
 '_NUMBER_OF_BUCKETS_': 2,
 '_SINGLETONS_GROUP_': array(['nParticles'], dtype='<