Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions reframe/frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,9 +1083,7 @@ def restrict_logging():
lambda htype: htype != 'stream')
with exit_gracefully_on_error('failed to retrieve session data',
printer):
printer.info(jsonext.dumps(reporting.session_info(
options.describe_stored_sessions
), indent=2))
printer.info(reporting.session_info(options.describe_stored_sessions))
sys.exit(0)

if options.describe_stored_testcases:
Expand Down
6 changes: 4 additions & 2 deletions reframe/frontend/reporting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,8 +860,10 @@ def testcase_data(spec, namepatt=None, test_filter=None):
@time_function
def session_info(query):
'''Retrieve session details as JSON'''

return StorageBackend.default().fetch_sessions(parse_query_spec(query))
sessions = StorageBackend.default().fetch_sessions(
parse_query_spec(query), False
)
return rf'[{",".join(sessions)}]'


@time_function
Expand Down
85 changes: 54 additions & 31 deletions reframe/frontend/reporting/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ def fetch_testcases(self, selector: QuerySelector, name_patt=None,
'''

@abc.abstractmethod
def fetch_sessions(self, selector: QuerySelector):
def fetch_sessions(self, selector: QuerySelector, decode=True):
'''Fetch sessions based on the specified query selector.

:arg selector: an instance of :class:`QuerySelector` that will specify
the actual type of query requested.
:returns: A list of matching sessions.
:arg decode: If set to :obj:`False`, do not decode the returned
sessions and leave them JSON-encoded.
:returns: A list of matching sessions, either decoded or not.
'''

@abc.abstractmethod
Expand Down Expand Up @@ -101,6 +103,7 @@ def _db_file(self):

self._db_create()

self._db_create_indexes()
self._db_schema_check()
return self.__db_file

Expand Down Expand Up @@ -161,12 +164,20 @@ def _db_create(self):
'uuid TEXT, '
'FOREIGN KEY(session_uuid) '
'REFERENCES sessions(uuid) ON DELETE CASCADE)')

# Update DB file mode
os.chmod(self.__db_file, self.__db_file_mode)

def _db_create_indexes(self):
clsname = type(self).__name__
getlogger().debug(f'{clsname}: creating database indexes if needed')
with self._db_connect(self.__db_file) as conn:
conn.execute('CREATE INDEX IF NOT EXISTS index_testcases_time '
'on testcases(job_completion_time_unix)')
conn.execute('CREATE TABLE IF NOT EXISTS metadata('
'schema_version TEXT)')
# Update DB file mode
os.chmod(self.__db_file, self.__db_file_mode)
conn.execute('CREATE INDEX IF NOT EXISTS index_sessions_time '
'on sessions(session_start_unix)')

def _db_schema_check(self):
with self._db_read(self.__db_file) as conn:
Expand Down Expand Up @@ -232,10 +243,16 @@ def store(self, report, report_file=None):
return self._db_store_report(conn, report, report_file)

@time_function
def _decode_sessions(self, results, sess_filter):
'''Decode sessions from the raw DB results.
def _mass_json_decode(self, *json_objs):
data = rf'[{",".join(json_objs)}]'
getlogger().debug(f'decoding JSON raw data of length {len(data)}')
return json.loads(data)

Return a map of session uuids to decoded session data
@time_function
def _fetch_sessions(self, results, sess_filter):
'''Fetch JSON-encoded sessions from the DB by applying a filter.

:returns: A list of the JSON-encoded valid sessions.
'''
sess_info_patt = re.compile(
r'\"session_info\":\s+(?P<sess_info>\{.*?\})'
Expand All @@ -245,34 +262,31 @@ def _decode_sessions(self, results, sess_filter):
def _extract_sess_info(s):
return sess_info_patt.search(s).group('sess_info')

@time_function
def _mass_json_decode(json_objs):
data = '[' + ','.join(json_objs) + ']'
getlogger().debug(
f'decoding JSON raw data of length {len(data)}'
)
return json.loads(data)

session_infos = {}
sessions = {}
for uuid, json_blob in results:
sessions.setdefault(uuid, json_blob)
session_infos.setdefault(uuid, _extract_sess_info(json_blob))

# Find the UUIDs to decode fully by inspecting only the session info
# Find the relevant sessions by inspecting only the session info
uuids = []
for sess_info in _mass_json_decode(session_infos.values()):
infos = self._mass_json_decode(*session_infos.values())
for sess_info in infos:
try:
if self._db_filter_json(sess_filter, sess_info):
uuids.append(sess_info['uuid'])
except Exception:
continue

# Decode selected sessions
reports = _mass_json_decode(sessions[uuid] for uuid in uuids)
return [sessions[uuid] for uuid in uuids]

# Return only the selected sessions
return {rpt['session_info']['uuid']: rpt for rpt in reports}
def _decode_and_index_sessions(self, json_blobs):
'''Decode the sessions and index them by their uuid.

:returns: A dictionary with uuids as keys and the sessions as values.
'''
return {sess['session_info']['uuid']: sess
for sess in self._mass_json_decode(*json_blobs)}

@time_function
def _fetch_testcases_raw(self, condition):
Expand All @@ -289,7 +303,11 @@ def _fetch_testcases_raw(self, condition):
results = conn.execute(query).fetchall()

getprofiler().exit_region()
sessions = self._decode_sessions(results, None)

# Fetch, decode and index the sessions by their uuid
sessions = self._decode_and_index_sessions(
self._fetch_sessions(results, None)
)

# Extract the test case data by extracting their UUIDs
getprofiler().enter_region('sqlite testcase query')
Expand Down Expand Up @@ -319,8 +337,8 @@ def _fetch_testcases_raw(self, condition):
return testcases

@time_function
def _fetch_testcases_from_session(self, selector,
name_patt=None, test_filter=None):
def _fetch_testcases_from_session(self, selector, name_patt=None,
test_filter=None):
query = 'SELECT uuid, json_blob from sessions'
if selector.by_session_uuid():
query += f' WHERE uuid == "{selector.uuid}"'
Expand All @@ -338,9 +356,11 @@ def _fetch_testcases_from_session(self, selector,
if not results:
return []

sessions = self._decode_sessions(
results,
selector.sess_filter if selector.by_session_filter() else None
sessions = self._decode_and_index_sessions(
self._fetch_sessions(
results,
selector.sess_filter if selector.by_session_filter() else None
)
)
return [tc for sess in sessions.values()
for run in sess['runs'] for tc in run['testcases']
Expand All @@ -366,15 +386,15 @@ def fetch_testcases(self, selector: QuerySelector,
name_patt=None, test_filter=None):
if selector.by_session():
return self._fetch_testcases_from_session(
selector, name_patt, test_filter
selector, name_patt, test_filter,
)
else:
return self._fetch_testcases_time_period(
*selector.time_period, name_patt, test_filter
)

@time_function
def fetch_sessions(self, selector: QuerySelector):
def fetch_sessions(self, selector: QuerySelector, decode=True):
query = 'SELECT uuid, json_blob FROM sessions'
if selector.by_time_period():
ts_start, ts_end = selector.time_period
Expand All @@ -389,11 +409,14 @@ def fetch_sessions(self, selector: QuerySelector):
results = conn.execute(query).fetchall()

getprofiler().exit_region()
session = self._decode_sessions(
raw_sessions = self._fetch_sessions(
results,
selector.sess_filter if selector.by_session_filter() else None
)
return [*session.values()]
if decode:
return [*self._decode_and_index_sessions(raw_sessions).values()]
else:
return raw_sessions

def _do_remove(self, conn, uuids):
'''Remove sessions'''
Expand Down
4 changes: 3 additions & 1 deletion reframe/frontend/reporting/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ def by_session_filter(self):

def __repr__(self):
clsname = type(self).__name__
return f'{clsname}(value={self.__value}, kind={self.__kind})'
return (f'{clsname}(uuid={self.__uuid!r}, '
f'time_period={self.__time_period!r}, '
f'sess_filter={self.__sess_filter!r})')


def parse_time_period(s):
Expand Down
Loading