Skip to content

Commit

Permalink
Merge pull request #355 from qutech/better_tabor_inspection
Browse files Browse the repository at this point in the history
Better tabor inspection
  • Loading branch information
terrorfisch committed Aug 23, 2018
2 parents ddb69a3 + 132aa85 commit 6c712fb
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 51 deletions.
73 changes: 46 additions & 27 deletions qctoolkit/hardware/awgs/tabor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
import weakref
import itertools
import operator
from typing import List, Tuple, Set, NamedTuple, Callable, Optional, Any, Sequence, cast, Generator, Union, Dict
from enum import Enum
from collections import OrderedDict
Expand Down Expand Up @@ -41,8 +42,8 @@ def __init__(self,
if ch_a is not None and ch_b is not None and len(ch_a) != len(ch_b):
raise TaborException('Channel entries to have to have the same length')

self.ch_a = ch_a
self.ch_b = ch_b
self.ch_a = None if ch_a is None else np.asarray(ch_a, dtype=np.uint16)
self.ch_b = None if ch_b is None else np.asarray(ch_b, dtype=np.uint16)

self.marker_a = None if marker_a is None else np.asarray(marker_a, dtype=bool)
self.marker_b = None if marker_b is None else np.asarray(marker_b, dtype=bool)
Expand All @@ -52,10 +53,41 @@ def __init__(self,
if marker_b is not None and len(marker_b)*2 != self.num_points:
raise TaborException('Marker A has to have half of the channels length')

@classmethod
def from_binary_segment(cls, segment_data: np.ndarray) -> 'TaborSegment':
data_a = segment_data.reshape((-1, 16))[1::2, :].reshape((-1, ))
data_b = segment_data.reshape((-1, 16))[0::2, :].ravel()
return cls.from_binary_data(data_a, data_b)

@classmethod
def from_binary_data(cls, data_a: np.ndarray, data_b: np.ndarray) -> 'TaborSegment':
ch_b = data_b

channel_mask = np.uint16(2**14 - 1)
ch_a = np.bitwise_and(data_a, channel_mask)

marker_a_mask = np.uint16(2**14)
marker_b_mask = np.uint16(2**15)
marker_data = data_a.reshape(-1, 8)[1::2, :].reshape((-1, ))

marker_a = np.bitwise_and(marker_data, marker_a_mask)
marker_b = np.bitwise_and(marker_data, marker_b_mask)

return cls(ch_a=ch_a,
ch_b=ch_b,
marker_a=marker_a,
marker_b=marker_b)

def __hash__(self) -> int:
return hash(tuple(0 if data is None else bytes(data)
for data in (self.ch_a, self.ch_b, self.marker_a, self.marker_b)))

def __eq__(self, other: 'TaborSegment'):
return (np.array_equal(self.ch_a, other.ch_a) and
np.array_equal(self.ch_b, other.ch_b) and
np.array_equal(self.marker_a, other.marker_a) and
np.array_equal(self.marker_b, other.marker_b))

@property
def data_a(self) -> np.ndarray:
"""channel_data and marker data"""
Expand Down Expand Up @@ -604,18 +636,10 @@ class PlottableProgram:
('jump_flag', int)])

def __init__(self,
waveforms: Tuple[Tuple[np.ndarray, ...], Tuple[np.ndarray, ...]],
segments: List[TaborSegment],
sequence_tables: List[List[Tuple[int, int, int]]],
advanced_sequence_table: List[Tuple[int, int, int]]):
waveforms_0, waveforms_1 = waveforms
if len(waveforms_0) != len(waveforms_1):
raise ValueError('Different number of waveforms on channels')

for wf_0, wf_1 in zip(waveforms_0, waveforms_1):
if len(wf_0) != len(wf_1):
raise ValueError('Not all waveforms have the same length')

self._waveforms = (waveforms_0, waveforms_1)
self._segments = segments
self._sequence_tables = [[self.TableEntry(*sequence_table_entry)
for sequence_table_entry in sequence_table]
for sequence_table in sequence_tables]
Expand All @@ -626,16 +650,10 @@ def __init__(self,
def from_read_data(cls, waveforms: List[np.ndarray],
sequence_tables: List[Tuple[np.ndarray, np.ndarray, np.ndarray]],
advanced_sequence_table: Tuple[np.ndarray, np.ndarray, np.ndarray]) -> 'PlottableProgram':
return cls(cls._reformat_waveforms(waveforms),
return cls([TaborSegment.from_binary_segment(wf) for wf in waveforms],
[cls._reformat_rep_seg_jump(seq_table) for seq_table in sequence_tables],
cls._reformat_rep_seg_jump(advanced_sequence_table))

@staticmethod
def _reformat_waveforms(waveforms: List[np.ndarray]) -> Tuple[Tuple[np.ndarray], Tuple[np.ndarray]]:
"""De-interleave the individual channels' waveform data"""
return tuple(zip(*((waveform.reshape((-1, 16))[1::2, :].ravel(), waveform.reshape((-1, 16))[0::2, :].ravel())
for waveform in waveforms)))

@classmethod
def _reformat_rep_seg_jump(cls, rep_seg_jump_tuple: Tuple[np.ndarray, np.ndarray, np.ndarray]) -> List[TableEntry]:
return list(cls.TableEntry(int(rep), int(seg_no), int(jump))
Expand Down Expand Up @@ -666,9 +684,9 @@ def iter_waveforms_and_repetitions(self,
channel: int,
with_first_idle=False,
with_last_idles=False) -> Generator[Tuple[np.ndarray, int], None, None]:
ch_waveforms = self._waveforms[channel]
ch_getter = (operator.attrgetter('ch_a'), operator.attrgetter('ch_b'))[channel]
for segment_repeat, segment_no, _ in self._iter_segment_table_entry(with_first_idle, with_last_idles):
yield ch_waveforms[segment_no - 1], segment_repeat
yield ch_getter(self._segments[segment_no - 1]), segment_repeat

def iter_samples(self, channel: int,
with_first_idle=False,
Expand All @@ -678,7 +696,7 @@ def iter_samples(self, channel: int,
for _ in range(repetition):
yield from waveform

def get_as_single_waveform(self, channel: int, max_total_length: int=10**9) -> np.ndarray:
def get_as_single_waveform(self, channel: int, max_total_length: int=10**9) -> Optional[np.ndarray]:
waveforms = self.get_waveforms(channel)
repetitions = self.get_repetitions()
waveform_lengths = np.fromiter((wf.size for wf in waveforms), count=len(waveforms), dtype=np.uint64)
Expand All @@ -699,7 +717,8 @@ def get_as_single_waveform(self, channel: int, max_total_length: int=10**9) -> n
return result

def get_waveforms(self, channel: int) -> List[np.ndarray]:
return [self._waveforms[channel][segment_no - 1]
ch_getter = (operator.attrgetter('ch_a'), operator.attrgetter('ch_b'))[channel]
return [ch_getter(self._segments[segment_no - 1])
for _, segment_no, _ in self._iter_segment_table_entry()]

def get_repetitions(self) -> np.ndarray:
Expand All @@ -715,17 +734,17 @@ def __eq__(self, other):
return True

def to_builtin(self) -> dict:
waveforms = [[wf.tolist() for wf in self._waveforms[0]],
[wf.tolist() for wf in self._waveforms[1]]]
waveforms = [[wf.data_a.tolist() for wf in self._segments],
[wf.data_b.tolist() for wf in self._segments]]
return {'waveforms': waveforms,
'seq_tables': self._sequence_tables,
'adv_seq_table': self._advanced_sequence_table}

@classmethod
def from_builtin(cls, data: dict) -> 'PlottableProgram':
waveforms = data['waveforms']
waveforms = (tuple(np.array(wf, dtype=np.uint16) for wf in waveforms[0]),
tuple(np.array(wf, dtype=np.uint16) for wf in waveforms[1]))
waveforms = [TaborSegment.from_binary_data(np.array(data_a, dtype=np.uint16), np.array(data_b, dtype=np.uint16))
for data_a, data_b in zip(*waveforms)]
return cls(waveforms, data['seq_tables'], data['adv_seq_table'])


Expand Down
68 changes: 44 additions & 24 deletions tests/hardware/tabor_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def test_data_a(self):
with self.assertRaises(NotImplementedError):
TaborSegment(ch_a=None, ch_b=ch_b, marker_a=marker_a, marker_b=marker_b).data_a


def test_data_b(self):
ch_a = np.asarray(100 + np.arange(6), dtype=np.uint16)
ch_b = np.asarray(1000 + np.arange(6), dtype=np.uint16)
Expand All @@ -88,6 +87,37 @@ def test_data_b(self):

self.assertIs(ts.data_b, ch_b)

def test_from_binary_segment(self):
ch_a = np.asarray(100 + np.arange(32), dtype=np.uint16)
ch_b = np.asarray(1000 + np.arange(32), dtype=np.uint16)

marker_a = np.ones(16, dtype=bool)
marker_b = np.asarray(list(range(5)) + list(range(6)) + list(range(5)), dtype=np.uint16)

segment = TaborSegment(ch_a=ch_a, ch_b=ch_b, marker_a=marker_a, marker_b=marker_b)

binary = segment.get_as_binary()

reconstructed = TaborSegment.from_binary_segment(binary)

self.assertEqual(segment, reconstructed)

def test_from_binary_data(self):
ch_a = np.asarray(100 + np.arange(32), dtype=np.uint16)
ch_b = np.asarray(1000 + np.arange(32), dtype=np.uint16)

marker_a = np.ones(16, dtype=bool)
marker_b = np.asarray(list(range(5)) + list(range(6)) + list(range(5)), dtype=np.uint16)

segment = TaborSegment(ch_a=ch_a, ch_b=ch_b, marker_a=marker_a, marker_b=marker_b)

data_a = segment.data_a
data_b = segment.data_b

reconstructed = TaborSegment.from_binary_data(data_a, data_b)

self.assertEqual(segment, reconstructed)


class TaborProgramTests(unittest.TestCase):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -372,37 +402,27 @@ def make_read_waveform(data):

self.waveforms = ((np.arange(32, dtype=np.uint16), np.arange(32, 48, dtype=np.uint16)),
(1000+np.arange(32, dtype=np.uint16), 1000+np.arange(32, 48, dtype=np.uint16)))
self.segments = [TaborSegment.from_binary_data(a, b) for a, b in zip(*self.waveforms)]
self.sequencer_tables = [[(1, 1, 0), (1, 2, 0)],
[(1, 1, 0), (2, 2, 0), (1, 1, 0)]]
self.adv_sequencer_table = [(1, 1, 0), (1, 2, 0), (2, 1, 0)]

def test_init(self):
wrong_waveforms = self.waveforms[0], self.waveforms[1][:-1]
with self.assertRaises(ValueError):
PlottableProgram(wrong_waveforms, self.sequencer_tables, self.adv_sequencer_table)

wrong_waveforms = self.waveforms[0], (self.waveforms[1][0][1:], self.waveforms[1][1])
with self.assertRaises(ValueError):
PlottableProgram(wrong_waveforms, self.sequencer_tables, self.adv_sequencer_table)

prog = PlottableProgram(self.waveforms, self.sequencer_tables, self.adv_sequencer_table)
np.testing.assert_equal(self.waveforms, prog._waveforms)
prog = PlottableProgram(self.segments, self.sequencer_tables, self.adv_sequencer_table)
np.testing.assert_equal(self.segments, prog._segments)
self.assertEqual(self.sequencer_tables, prog._sequence_tables)
self.assertEqual(self.adv_sequencer_table, prog._advanced_sequence_table)

def test_reformat_waveforms(self):
np.testing.assert_equal(self.waveforms, PlottableProgram._reformat_waveforms(self.read_waveforms))

def test_from_read_data(self):
prog = PlottableProgram.from_read_data(self.read_waveforms,
self.read_sequencer_tables,
self.read_adv_sequencer_table)
np.testing.assert_equal(self.waveforms, prog._waveforms)
self.assertEqual(self.segments, prog._segments)
self.assertEqual(self.sequencer_tables, prog._sequence_tables)
self.assertEqual(self.adv_sequencer_table, prog._advanced_sequence_table)

def test_iter(self):
prog = PlottableProgram(self.waveforms, self.sequencer_tables, self.adv_sequencer_table)
prog = PlottableProgram(self.segments, self.sequencer_tables, self.adv_sequencer_table)

ch = itertools.chain(range(32), range(32, 48),
range(32), range(32, 48), range(32, 48), range(32),
Expand All @@ -418,7 +438,7 @@ def test_iter(self):

def test_get_advanced_sequence_table(self):
adv_seq = [(1, 1, 1)] + self.adv_sequencer_table + [(1, 1, 0)]
prog = PlottableProgram(self.waveforms, self.sequencer_tables, adv_seq)
prog = PlottableProgram(self.segments, self.sequencer_tables, adv_seq)

self.assertEqual(prog._get_advanced_sequence_table(), self.adv_sequencer_table)
self.assertEqual(prog._get_advanced_sequence_table(with_first_idle=True),
Expand All @@ -428,23 +448,23 @@ def test_get_advanced_sequence_table(self):
adv_seq)

def test_builtint_conversion(self):
prog = PlottableProgram(self.waveforms, self.sequencer_tables, self.adv_sequencer_table)
prog = PlottableProgram(self.segments, self.sequencer_tables, self.adv_sequencer_table)

prog = PlottableProgram.from_builtin(prog.to_builtin())

np.testing.assert_equal(self.waveforms, prog._waveforms)
np.testing.assert_equal(self.segments, prog._segments)
self.assertEqual(self.sequencer_tables, prog._sequence_tables)
self.assertEqual(self.adv_sequencer_table, prog._advanced_sequence_table)

def test_eq(self):
prog1 = PlottableProgram(self.waveforms, self.sequencer_tables, self.adv_sequencer_table)
prog1 = PlottableProgram(self.segments, self.sequencer_tables, self.adv_sequencer_table)

prog2 = PlottableProgram(self.waveforms, self.sequencer_tables, self.adv_sequencer_table)
prog2 = PlottableProgram(self.segments, self.sequencer_tables, self.adv_sequencer_table)

self.assertEqual(prog1, prog2)

def test_get_waveforms(self):
prog = PlottableProgram(self.waveforms, self.sequencer_tables, self.adv_sequencer_table)
prog = PlottableProgram(self.segments, self.sequencer_tables, self.adv_sequencer_table)

expected_waveforms_0 = [np.arange(32), np.arange(32, 48), np.arange(32),
np.arange(32, 48), np.arange(32), np.arange(32),
Expand All @@ -456,13 +476,13 @@ def test_get_waveforms(self):
np.testing.assert_equal(expected_waveforms_1, prog.get_waveforms(1))

def test_get_repetitions(self):
prog = PlottableProgram(self.waveforms, self.sequencer_tables, self.adv_sequencer_table)
prog = PlottableProgram(self.segments, self.sequencer_tables, self.adv_sequencer_table)

expected_repetitions = [1, 1, 1, 2, 1, 1, 1, 1, 1]
np.testing.assert_equal(expected_repetitions, prog.get_repetitions())

def test_get_as_single_waveform(self):
prog = PlottableProgram(self.waveforms, self.sequencer_tables, self.adv_sequencer_table)
prog = PlottableProgram(self.segments, self.sequencer_tables, self.adv_sequencer_table)

expected_single_waveform_0 = np.fromiter(prog.iter_samples(0), dtype=np.uint16)
expected_single_waveform_1 = np.fromiter(prog.iter_samples(1), dtype=np.uint16)
Expand Down

0 comments on commit 6c712fb

Please sign in to comment.