Skip to content

Commit

Permalink
Add "direct" raw-to-packets conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
mjkramer committed Oct 7, 2023
1 parent 11fc1aa commit 51c1a3f
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 61 deletions.
126 changes: 71 additions & 55 deletions larpix/format/hdf5format.py
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,69 @@ def _encode_packet(packet, version, packet_dset_name):
return(tuple(encoded_packet))
return False

def init_file(f: h5py.File, version=None, chip_list=None):
message_dset, configs_dset = None, None

if "_header" not in f.keys():
header = f.create_group("_header")
if version is None:
version = latest_version
header.attrs["version"] = version
header.attrs["created"] = time.time()
else:
header = f["_header"]
file_version = header.attrs["version"]
if version is None:
version = file_version
elif file_version != version:
raise RuntimeError(
"Incompatible versions: existing: %s, "
"specified: %s" % (file_version, version)
)
header.attrs["modified"] = time.time()

if version != "0.0":
message_dset_name = "messages"
message_dtype = dtypes[version][message_dset_name]
if message_dset_name not in f.keys():
message_dset = f.create_dataset(
message_dset_name, shape=(0,), maxshape=(None,), dtype=message_dtype
)
# message_start_index = 0
else:
message_dset = f[message_dset_name]
# message_start_index = message_dset.shape[0]

if version >= "2.4":
configs = []
configs_dset_name = "configs"
configs_dtype = dtypes[version][configs_dset_name]
if configs_dset_name not in f.keys():
configs_dset = f.create_dataset(
configs_dset_name, shape=(0,), maxshape=(None,), dtype=configs_dtype
)
configs_start_index = 0
else:
configs_dset = f[configs_dset_name]
configs_start_index = configs_dset.shape[0]
if chip_list:
configs_dset.attrs["asic_version"] = str(chip_list[-1].asic_version)
for i, chip in enumerate(chip_list):
encoded_config = _format_method_lookup[version][configs_dset_name][
chip.__class__
](
chip,
counter=configs_start_index + len(configs),
timestamp=header.attrs["modified"],
)
configs.append(encoded_config)
if configs:
configs_dset.resize(configs_start_index + len(configs), axis=0)
configs_dset[configs_start_index:] = np.concatenate(configs)

return version, message_dset, configs_dset


def to_file(filename, packet_list=None, chip_list=None, mode='a', version=None, workers=None):
'''
Save the given packets to the given file.
Expand Down Expand Up @@ -889,22 +952,7 @@ def to_file(filename, packet_list=None, chip_list=None, mode='a', version=None,
workers = max(min(os.cpu_count(), int(len(packet_list)//10000)),1)

with h5py.File(filename, mode) as f:
# Create header
if '_header' not in f.keys():
header = f.create_group('_header')
if version is None:
version = latest_version
header.attrs['version'] = version
header.attrs['created'] = time.time()
else:
header = f['_header']
file_version = header.attrs['version']
if version is None:
version = file_version
elif header.attrs['version'] != version:
raise RuntimeError('Incompatible versions: existing: %s, '
'specified: %s' % (file_version, version))
header.attrs['modified'] = time.time()
version, message_dset, _configs_dset = init_file(f, version, chip_list)

# Create datasets
if version == '0.0':
Expand Down Expand Up @@ -958,36 +1006,9 @@ def to_file(filename, packet_list=None, chip_list=None, mode='a', version=None,
start_index = packet_dset.shape[0]
packet_dset.resize(start_index + len(packet_list), axis=0)

if version != '0.0':
message_dset_name = 'messages'
message_dtype = dtypes[version][message_dset_name]
if message_dset_name not in f.keys():
message_dset = f.create_dataset(message_dset_name,
shape=(0,), maxshape=(None,),
dtype=message_dtype)
message_start_index = 0
else:
message_dset = f[message_dset_name]
message_start_index = message_dset.shape[0]

if version >= '2.4':
configs_dset_name = 'configs'
configs_dtype = dtypes[version][configs_dset_name]
if configs_dset_name not in f.keys():
configs_dset = f.create_dataset(configs_dset_name,
shape=(0,), maxshape=(None,),
dtype=configs_dtype)
configs_start_index = 0
else:
configs_dset = f[configs_dset_name]
configs_start_index = configs_dset.shape[0]
if chip_list:
configs_dset.attrs['asic_version'] = str(chip_list[-1].asic_version)

# Fill dataset
encoded_packets = []
messages = []
configs = []

if workers > 1:
packet_args = zip(packet_list, [version]*len(packet_list), [packet_dset_name]*len(packet_list))
Expand All @@ -996,24 +1017,19 @@ def to_file(filename, packet_list=None, chip_list=None, mode='a', version=None,
else:
encoded_packets = list(filter(bool, [_encode_packet(packet, version, packet_dset_name) for packet in packet_list]))

for i, packet in enumerate(packet_list):
if version != '0.0' and packet.__class__ in _format_method_lookup[version].get(message_dset_name, tuple()):
encoded_message = _format_method_lookup[version][message_dset_name][packet.__class__](packet, counter=message_start_index + len(messages))
messages.append(encoded_message)

for i,chip in enumerate(chip_list):
if version >= '2.4':
encoded_config = _format_method_lookup[version][configs_dset_name][chip.__class__](chip, counter=configs_start_index + len(configs), timestamp=header.attrs['modified'])
configs.append(encoded_config)
if message_dset:
message_dset_name = message_dset.name.removeprefix('/')
for i, packet in enumerate(packet_list):
if packet.__class__ in _format_method_lookup[version].get(message_dset_name, tuple()):
encoded_message = _format_method_lookup[version][message_dset_name][packet.__class__](packet, counter=message_dset.shape[0] + len(messages))
messages.append(encoded_message)

if encoded_packets:
packet_dset[start_index:] = encoded_packets
if version != '0.0' and messages:
message_start_index = message_dset.shape[0]
message_dset.resize(message_start_index + len(messages), axis=0)
message_dset[message_start_index:] = messages
if version >= '2.4' and configs:
configs_dset.resize(configs_start_index + len(configs), axis=0)
configs_dset[configs_start_index:] = np.concatenate(configs)

def from_file(filename, version=None, start=None, end=None, load_configs=None):
'''
Expand Down
179 changes: 179 additions & 0 deletions larpix/format/hdf5format_direct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import time

import h5py
import numba
import numpy as np

from .hdf5format import dtypes, init_file

VERSION = "2.4"
DTYPE = dtypes[VERSION]["packets"]
BUFSIZE = 100000


@numba.njit
def calc_parity(data: np.array):
"data is an array of 8 uint8s"
# ugh the following doesn't work: x = data.view(np.uint64)[0]
# so instead, HACK:
x = np.uint64((data[0] << 56) | (data[1] << 48) | (data[2] << 40) | (data[3] << 32) |
(data[4] << 24) | (data[5] << 16) | (data[6] << 8) | (data[7]))
x ^= x >> 32
x ^= x >> 16
x ^= x >> 8
x ^= x >> 4
x ^= x >> 2
x ^= x >> 1
return x & 1


@numba.njit
def parse_msg(msg: np.array, packets: np.array, io_group=0) -> int:
"packets is output parameter. array types are uint8"
# assert msg[0] == ord("D")
pacman_timestamp = msg[1] | (msg[2] << 8) | (msg[3] << 16) | (msg[4] << 24)
nwords = msg[6] | (msg[7] << 8)

npackets = nwords + 1

for i in range(npackets):
io_channel = 0
chip_id = 0
packet_type = 0
downstream_marker = 0
parity = 0
valid_parity = 0
channel_id = 0
timestamp = 0
dataword = 0
trigger_type = 0
local_fifo = 0
shared_fifo = 0
register_address = 0
register_data = 0
direction = 0
local_fifo_events = 0
shared_fifo_events = 0
counter = 0
fifo_diagnostics_enabled = 0
first_packet = 0
receipt_timestamp = 0

if i == 0:
packet_type = 4 # timestamp
timestamp = pacman_timestamp
else:
wordstart = 8 + (16 * (i - 1))
word = msg[wordstart : (wordstart + 16)]
header, data = word[:8], word[8:]

wordtype = header[0]

if wordtype == 0x44: # 'D'
packet_type = data[0] & 3
io_channel = header[1]
receipt_timestamp = (
header[2] | (header[3] << 8) | (header[4] << 16) | (header[5] << 24)
)
chip_id = (data[0] >> 2) | ((data[1] << 6) & 0xFF)
channel_id = data[1] >> 2
timestamp = (
data[2]
| (data[3] << 8)
| (data[4] << 16)
| ((data[5] & 0x7F) << 24)
)
first_packet = (data[5] >> 7) & 1
dataword = data[6]
trigger_type = data[7] & 0x03
local_fifo = (data[7] >> 2) & 0x03
shared_fifo = (data[7] >> 4) & 0x03
downstream_marker = (data[7] >> 6) & 1
parity = (data[7] >> 7) & 1
# valid_parity = np.unpackbits(data).sum() % 2
valid_parity = calc_parity(data)

register_address = (data[1] >> 2) | ((data[2] << 6) & 0xFF)
register_data = (data[2] >> 2) | ((data[3] << 6) & 0xFF)

elif wordtype == 0x53: # 'S'
packet_type = 6 # sync
trigger_type = sync_type = header[1]
dataword = clk_source = header[2] & 0x01
timestamp = (
header[4] | (header[5] << 8) | (header[6] << 16) | (header[7] << 24)
)

elif wordtype == 0x54: # 'T'
packet_type = 7 # trigger
trigger_type = header[1]
timestamp = (
header[4] | (header[5] << 8) | (header[6] << 16) | (header[7] << 24)
)

# NOTE: The following methods don't seem to work in Numba:
# packets[i] = ( io_group, io_channel, ... )
# packets[i] = ( np.uint8(io_group), np.uint8(io_channel), ... )

# ...so instead we painfully write the indices
packets[i][0] = io_group
packets[i][1] = io_channel
packets[i][2] = chip_id
packets[i][3] = packet_type
packets[i][4] = downstream_marker
packets[i][5] = parity
packets[i][6] = valid_parity
packets[i][7] = channel_id
packets[i][8] = timestamp
packets[i][9] = dataword
packets[i][10] = trigger_type
packets[i][11] = local_fifo
packets[i][12] = shared_fifo
packets[i][13] = register_address
packets[i][14] = register_data
packets[i][15] = direction
packets[i][16] = local_fifo_events
packets[i][17] = shared_fifo_events
packets[i][18] = counter
packets[i][19] = fifo_diagnostics_enabled
packets[i][20] = first_packet
packets[i][21] = receipt_timestamp

return npackets


def to_file_direct(filename, msg_list=[], io_groups=[], chip_list=[], mode="a"):
with h5py.File(filename, mode) as f:
init_file(f, VERSION, chip_list)

packet_dset_name = "packets"
if packet_dset_name not in f.keys():
packet_dset = f.create_dataset(
packet_dset_name,
shape=(0,),
maxshape=(None,),
dtype=DTYPE,
)
start_index = 0
else:
packet_dset = f[packet_dset_name]
start_index = packet_dset.shape[0]

packets = np.zeros(shape=(2*BUFSIZE,), dtype=DTYPE)
npackets = 0

def write():
nonlocal npackets
nonlocal start_index
packet_dset.resize(packet_dset.shape[0] + npackets, axis=0)
packet_dset[start_index:] = packets[:npackets]
start_index += npackets
npackets = 0

for msg, io_group in zip(msg_list, io_groups):
num_new_pkts = parse_msg(msg, packets[npackets:], io_group)
npackets += num_new_pkts
if npackets > BUFSIZE:
write()

write()
22 changes: 16 additions & 6 deletions scripts/convert_rawhdf5_to_hdf5.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env python3

import argparse
import time

Expand All @@ -8,10 +10,13 @@
from larpix.format.rawhdf5format import from_rawfile, len_rawfile
from larpix.format.pacman_msg_format import parse
from larpix.format.hdf5format import to_file
from larpix.format.hdf5format_direct import to_file_direct

def main(input_filename, output_filename, block_size):
def main(input_filename, output_filename, block_size, direct, max_blocks):
total_messages = len_rawfile(input_filename)
total_blocks = total_messages // block_size + 1
if max_blocks != -1:
total_blocks = min(max_blocks, total_blocks)
last = time.time()
for i_block in range(total_blocks):
start = i_block * block_size
Expand All @@ -22,11 +27,14 @@ def main(input_filename, output_filename, block_size):
print('reading block {} of {}...\r'.format(i_block+1,total_blocks),end='')
last = time.time()
rd = from_rawfile(input_filename, start=start, end=end)
pkts = list()
for i_msg,data in enumerate(zip(rd['msg_headers']['io_groups'], rd['msgs'])):
io_group,msg = data
pkts.extend(parse(msg, io_group=io_group))
to_file(output_filename, packet_list=pkts)
if direct:
to_file_direct(output_filename, rd['msgs'], rd['msg_headers']['io_groups'])
else:
pkts = list()
for i_msg,data in enumerate(zip(rd['msg_headers']['io_groups'], rd['msgs'])):
io_group,msg = data
pkts.extend(parse(msg, io_group=io_group))
to_file(output_filename, packet_list=pkts)
print()

if __name__ == '__main__':
Expand All @@ -35,5 +43,7 @@ def main(input_filename, output_filename, block_size):
parser.add_argument('--output_filename', '-o', type=str, help='''Output hdf5 file,
to be formatted with larpix.format.hdf5format''')
parser.add_argument('--block_size', default=10240, type=int, help='''Max number of messages to store in working memory (default=%(default)s)''')
parser.add_argument('--direct', action='store_true', help='Enable direct conversion (experimental)')
parser.add_argument('--max_blocks', type=int, default=-1)
args = parser.parse_args()
c = main(**vars(args))
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
'bitarray >=0.8',
'pyzmq >= 18.0',
'sphinx_rtd_theme >= 0.5',
'numba >= 0.56',
'numpy >= 1.16',
'h5py >= 3.1',
'bidict >= 0.18.0',
Expand Down
Loading

0 comments on commit 51c1a3f

Please sign in to comment.