Skip to content

Commit

Permalink
Merge pull request #76 from zakv/dataframe-subset
Browse files Browse the repository at this point in the history
Dataframe subset
  • Loading branch information
philipstarkey committed Nov 4, 2020
2 parents 0910447 + f80f182 commit 695c220
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 26 deletions.
141 changes: 123 additions & 18 deletions lyse/__init__.py
Expand Up @@ -76,28 +76,133 @@ class _RoutineStorage(object):
routine_storage = _RoutineStorage()


def data(filepath=None, host='localhost', port=_lyse_port, timeout=5):
def data(filepath=None, host='localhost', port=_lyse_port, timeout=5, n_sequences=None, filter_kwargs=None):
"""Get data from the lyse dataframe or a file.
This function allows for either extracting information from a run's hdf5
file, or retrieving data from lyse's dataframe. If `filepath` is provided
then data will be read from that file and returned as a pandas series. If
`filepath` is not provided then the dataframe in lyse, or a portion of it,
will be returned.
Often only part of the lyse dataframe is needed, so the `n_sequences` and
`filter_kwargs` arguments provide ways to restrict what parts of the lyse
dataframe are returned. The dataframe can be quite large, so only requesting
a small part of it can speed up the execution of `lyse.data()` noticeably.
Setting `n_sequences` makes this function return only the rows of the lyse
dataframe that correspond to the `n_sequences` most recent sequences, where
one sequence corresponds to one call to engage in runmanager. Additionally,
the `Dataframe.filter()` method can be called on the dataframe before it is
transmitted, and the arguments specified in `filter_kwargs` are passed to
that method.
Args:
filepath (str, optional): The path to a run's hdf5 file. If a value
other than `None` is provided, then this function will return a
pandas series containing data associated with the run. In particular
it will contain the globals, singleshot results, multishot results,
etc. that would appear in the run's row in the Lyse dataframe, but
the values will be read from the file rather than extracted from the
lyse dataframe. If `filepath` is `None, then this function will
instead return a section of the lyse dataframe. Note that if
`filepath` is not None, then the other arguments will be ignored.
Defaults to `None`.
host (str, optional): The address of the computer running lyse. Defaults
to `'localhost'`.
port (int, optional): The port on which lyse is listening. Defaults to
the entry for lyse's port in the labconfig, with a fallback value of
42519 if the labconfig has no such entry.
timeout (float, optional): The timeout, in seconds, for the
communication with lyse. Defaults to 5.
n_sequences (int, optional): The maximum number of sequences to include
in the returned dataframe where one sequence corresponds to one call
to engage in runmanager. The dataframe rows for the most recent
`n_sequences` sequences are returned. If the dataframe contains
fewer than `n_sequences` sequences, then all rows will be returned.
If set to `None`, then all rows are returned. Defaults to `None`.
filter_kwargs (dict, optional): A dictionary of keyword arguments to
pass to the `Dataframe.filter()` method before the lyse dataframe is
returned. For example to call `filter()` with `like='temperature'`,
set `filter_kwargs` to `{'like':'temperature'}`. If set to `None`
then `Dataframe.filter()` will not be called. See
:meth:`pandas:pandas.DataFrame.filter` for more information.
Defaults to `None`.
Raises:
ValueError: If `n_sequences` isn't `None` or a nonnegative integer, then
a `ValueError` is raised. Note that no `ValueError` is raised if
`n_sequences` is greater than the number of sequences available. In
that case as all available sequences are returned, i.e. the entire
lyse dataframe is returned.
Returns:
:obj:`pandas:pandas.DataFrame` or :obj:`pandas:pandas.Series`: If
`filepath` is provided, then a pandas series with the data read from
that file is returned. If `filepath` is omitted or set to `None` then
the lyse dataframe, or a subset of it, is returned.
"""
if filepath is not None:
return _get_singleshot(filepath)
else:
df = zmq_get(port, host, 'get dataframe', timeout)
try:
padding = ('',)*(df.columns.nlevels - 1)
try:
integer_indexing = _labconfig.getboolean('lyse', 'integer_indexing')
except (LabConfig.NoOptionError, LabConfig.NoSectionError):
integer_indexing = False
if integer_indexing:
df.set_index(['sequence_index', 'run number', 'run repeat'], inplace=True, drop=False)
else:
df.set_index([('sequence',) + padding,('run time',) + padding], inplace=True, drop=False)
df.index.names = ['sequence', 'run time']
except KeyError:
# Empty DataFrame or index column not found, so fall back to RangeIndex instead
pass
df.sort_index(inplace=True)
if n_sequences is not None:
if not (type(n_sequences) is int and n_sequences >= 0):
msg = """n_sequences must be None or an integer greater than 0 but
was {n_sequences}.""".format(n_sequences=n_sequences)
raise ValueError(dedent(msg))
if filter_kwargs is not None:
if type(filter_kwargs) is not dict:
msg = """filter must be None or a dictionary but was
{filter_kwargs}.""".format(filter_kwargs=filter_kwargs)
raise ValueError(dedent(msg))

# Allow sending 'get dataframe' (without the enclosing list) if
# n_sequences and filter_kwargs aren't provided. This is for backwards
# compatability in case the server is running an outdated version of
# lyse.
if n_sequences is None and filter_kwargs is None:
command = 'get dataframe'
else:
command = ('get dataframe', n_sequences, filter_kwargs)
df = zmq_get(port, host, command, timeout)
if isinstance(df, str) and df.startswith('error: operation not supported'):
# Sending a tuple for command to an outdated lyse servers causes it
# to reply with an error message.
msg = """The lyse server does not support n_sequences or filter_kwargs.
Call this function without providing those arguments to communicate
with this server, or upgrade the version of lyse running on the
server."""
raise ValueError(dedent(msg))
# Ensure conversion to multiindex is done, which needs to be done here
# if the server is running an outdated version of lyse.
_rangeindex_to_multiindex(df, inplace=True)
return df


def _rangeindex_to_multiindex(df, inplace):
if isinstance(df.index, pandas.MultiIndex):
# The dataframe has already been converted.
return df
try:
padding = ('',)*(df.columns.nlevels - 1)
try:
integer_indexing = _labconfig.getboolean('lyse', 'integer_indexing')
except (LabConfig.NoOptionError, LabConfig.NoSectionError):
integer_indexing = False
if integer_indexing:
out = df.set_index(['sequence_index', 'run number', 'run repeat'], inplace=inplace, drop=False)
# out is None if inplace is True, and is the new dataframe is inplace is False.
if not inplace:
df = out
else:
out = df.set_index([('sequence',) + padding,('run time',) + padding], inplace=inplace, drop=False)
if not inplace:
df = out
df.index.names = ['sequence', 'run time']
except KeyError:
# Empty DataFrame or index column not found, so fall back to RangeIndex instead
pass
df.sort_index(inplace=True)
return df

def globals_diff(run1, run2, group=None):
return dict_diff(run1.get_globals(group), run2.get_globals(group))

Expand Down
73 changes: 65 additions & 8 deletions lyse/__main__.py
Expand Up @@ -49,7 +49,7 @@
from qtutils import inmain_decorator, inmain, UiLoader, DisconnectContextManager
from qtutils.auto_scroll_to_end import set_auto_scroll_to_end
import qtutils.icons
from lyse import LYSE_DIR
from lyse import LYSE_DIR, _rangeindex_to_multiindex

process_tree = ProcessTree.instance()

Expand Down Expand Up @@ -156,14 +156,20 @@ def handler(self, request_data):
logger.info('WebServer request: %s' % str(request_data))
if request_data == 'hello':
return 'hello'
elif isinstance(request_data, tuple) and request_data[0]=='get dataframe' and len(request_data)==3:
_, n_sequences, filter_kwargs = request_data
df = self._retrieve_dataframe()
df = _rangeindex_to_multiindex(df, inplace=True)
# Return only a subset of the dataframe if instructed to do so.
if n_sequences is not None:
df = self._extract_n_sequences_from_df(df, n_sequences)
if filter_kwargs is not None:
df = df.filter(**filter_kwargs)
return df
elif request_data == 'get dataframe':
# infer_objects() picks fixed datatypes for columns that are compatible with
# fixed datatypes, dramatically speeding up pickling. It is called here
# rather than when updating the dataframe as calling it during updating may
# call it needlessly often, whereas it only needs to be called prior to
# sending the dataframe to a client requesting it, as we're doing now.
app.filebox.shots_model.infer_objects()
return app.filebox.shots_model.dataframe
# Ensure backwards compatability with clients using outdated
# versions of lyse.
return self._retrieve_dataframe()
elif isinstance(request_data, dict):
if 'filepath' in request_data:
h5_filepath = shared_drive.path_to_local(request_data['filepath'])
Expand All @@ -181,6 +187,57 @@ def handler(self, request_data):
return ("error: operation not supported. Recognised requests are:\n "
"'get dataframe'\n 'hello'\n {'filepath': <some_h5_filepath>}")

@inmain_decorator(wait_for_return=True)
def _retrieve_dataframe(self):
# infer_objects() picks fixed datatypes for columns that are compatible with
# fixed datatypes, dramatically speeding up pickling. It is called here
# rather than when updating the dataframe as calling it during updating may
# call it needlessly often, whereas it only needs to be called prior to
# sending the dataframe to a client requesting it, as we're doing now.
app.filebox.shots_model.infer_objects()
df = app.filebox.shots_model.dataframe.copy(deep=True)
return df

def _extract_n_sequences_from_df(self, df, n_sequences):
# If the dataframe is empty, just return it, otherwise accessing columns
# below will raise a KeyError.
if df.empty:
return df

# Get a list of all unique sequences, each corresponding to one call to
# engage in runmanager. Each sequence may contain multiple runs. The
# below creates strings to identify sequences. To be from the same
# sequence, two shots have to have the same value for 'sequence' (which
# makes sure that the time when engage was called are the same to within
# 1 second), 'labscript' (must have been generated from the same
# labscript), and 'sequence_index' (a counter which keeps track of how
# many times engage has been called and resets to 0 at the start of each
# day). Typically just the value for sequence, is enough. However it
# only records time down to the second, so if engage() is called twice
# quickly then two different sequences can end up with the same value
# there.
sequences = [str(sequence) for sequence in df['sequence']]
labscripts = [str(labscript) for labscript in df['labscript']]
sequence_indices = [str(index) for index in df['sequence_index']]
# Combine into one string.
criteria = zip(sequences, labscripts, sequence_indices)
indentity_strings = [seq + script + ind for seq, script, ind in criteria]

# Find the distinct values, maintaining their ordering.
unique_identities = np.intersect1d(indentity_strings, indentity_strings)

# Slice the DataFrame so that only the last n_sequences sequences
# remain. Note that slicing unique_identities just returns all of its
# entries if n_sequences is greater than its length; it doesn't raise an
# error.
if n_sequences == 0:
identities_included = []
else:
identities_included = unique_identities[-n_sequences:]
df_subset = df[[id in identities_included for id in indentity_strings]]

return df_subset


class LyseMainWindow(QtWidgets.QMainWindow):
# A signal to show that the window is shown and painted.
Expand Down

0 comments on commit 695c220

Please sign in to comment.