Skip to content

Commit

Permalink
projector: fix RunPaths result aliasing (#4479)
Browse files Browse the repository at this point in the history
Summary:
The projector uses `multiplexer.RunPaths()` to find each run’s logdir on
disk, but it’s not clear from the code how this stays up to date as more
runs are loaded. The answer is insidious: `RunPaths` returns the raw
underlying dictionary, not a shallow copy of it, so changes to the
internal structures of the multiplexer are immediately reflected in the
projector plugin. This is a bit terrifying, since it means both that the
code is wildly non-threadsafe and that it’s hard to tell when the data
has actually changed. (The terror is somewhat abated by remembering that
concurrency safety around dicts in Python is all just pretend, anyway.)

This patch modifies the projector plugin to always shallow-copy the
result of `RunPaths`, and makes the necessary follow-up changes to
observe updates to the data. The result is that `RunPaths` is called
more times, up to once on each HTTP handler or `is_active` call. This is
fine: it’s expected that plugin routes may hit the multiplexer or data
provider, and since the projector plugin only really works locally, this
should be fast enough to not matter.

In doing so, we also improve the update semantics, since we can test
precisely for cache invalidation rather than just looking at the count
of runs. There are almost certainly still observable concurrency issues;
this patch doesn’t try to fix them all.

The eventual goal of this series of patches is to remove the dependency
on the multiplexer entirely, so the call to `RunPaths` will become a
call to a data provider’s `list_runs` and merging in `flags.logdir`.

Test Plan:
The projector plugin still works in the normal case (shows embeddings
with sprites, etc.). Edge cases to check:

  - Point TensorBoard at a logdir containing only non-projector data.
    Launch TensorBoard and wait a bit for the projector to finish
    determining that it has no data. Then add one projector run to the
    logdir and refresh TensorBoard (but do not restart the server). Note
    that the projector plugin properly picks up the new run. Repeat, and
    note that it picks up the newer run, too (i.e., the cache can be
    updated even when it’s non-empty).

  - Point TensorBoard at a logdir where the `projector_config.pbtxt` is
    inside a logdir’s plugin assets directory:

    ```
    logs/run/plugins/org_tensorflow_tensorboard_projector/projector_config.pbtxt
    ```

    Note that the config is still resolved properly. You can tell that
    it’s using the plugin assets because if you comment out the
    `_append_plugin_asset_directories` call, the embeddings will still
    be displayed but will lack sprite information.

wchargin-branch: projector-fix-aliasing
  • Loading branch information
wchargin committed Dec 17, 2020
1 parent 9e5adb3 commit 638014e
Showing 1 changed file with 43 additions and 37 deletions.
80 changes: 43 additions & 37 deletions tensorboard/plugins/projector/projector_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,8 @@ def __init__(self, context):
self.multiplexer = context.multiplexer
self.logdir = context.logdir
self.readers = {}
self.run_paths = None
self._run_paths = None
self._configs = {}
self.old_num_run_paths = None
self.config_fpaths = None
self.tensor_cache = LRUCache(_TENSOR_CACHE_CAPACITY)

Expand All @@ -260,8 +259,7 @@ def __init__(self, context):
# active. If such a thread exists, do not start a duplicate thread.
self._thread_for_determining_is_active = None

if self.multiplexer:
self.run_paths = self.multiplexer.RunPaths()
self._update_run_paths()

def get_plugin_apps(self):
asset_prefix = "tf_projector_plugin"
Expand Down Expand Up @@ -325,44 +323,52 @@ def frontend_metadata(self):
disable_reload=True,
)

def _update_run_paths(self):
"""Updates `self._run_paths`, testing for updates.
Returns:
`True` if this call changed the value of `self._run_paths`,
else `False`.
"""
if self.multiplexer:
run_paths = dict(self.multiplexer.RunPaths())
else:
run_paths = None
changed = run_paths != self._run_paths
self._run_paths = run_paths
return changed

def _determine_is_active(self):
"""Determines whether the plugin is active.
This method is run in a separate thread so that the plugin can
offer an immediate response to whether it is active and
determine whether it should be active in a separate thread.
"""
if self.configs:
self._update_configs()
if self._configs:
self._is_active = True
self._thread_for_determining_is_active = None

@property
def configs(self):
"""Returns a map of run paths to `ProjectorConfig` protos."""
run_path_pairs = list(self.run_paths.items())
def _update_configs(self):
"""Updates `self._configs`."""
run_paths_changed = self._update_run_paths()
run_path_pairs = list(self._run_paths.items())
self._append_plugin_asset_directories(run_path_pairs)
# Also accept the root logdir as a model checkpoint directory,
# so that the projector still works when there are no runs.
# (Case on `run` rather than `path` to avoid issues with
# absolute/relative paths on any filesystems.)
if not any(run == "." for (run, path) in run_path_pairs):
if "." not in self._run_paths:
run_path_pairs.append((".", self.logdir))
if self._run_paths_changed() or _latest_checkpoints_changed(
if run_paths_changed or _latest_checkpoints_changed(
self._configs, run_path_pairs
):
self.readers = {}
self._configs, self.config_fpaths = self._read_latest_config_files(
run_path_pairs
)
self._augment_configs_with_checkpoint_info()
return self._configs

def _run_paths_changed(self):
num_run_paths = len(list(self.run_paths.keys()))
if num_run_paths != self.old_num_run_paths:
self.old_num_run_paths = num_run_paths
return True
return False

def _augment_configs_with_checkpoint_info(self):
for run, config in self._configs.items():
Expand Down Expand Up @@ -527,7 +533,7 @@ def _append_plugin_asset_directories(self, run_path_pairs):
if metadata.PROJECTOR_FILENAME not in assets:
continue
assets_dir = os.path.join(
self.run_paths[run],
self._run_paths[run],
metadata.PLUGINS_DIR,
metadata.PLUGIN_ASSETS_NAME,
)
Expand All @@ -545,7 +551,8 @@ def _serve_file(self, file_path, request):
@wrappers.Request.application
def _serve_runs(self, request):
"""Returns a list of runs that have embeddings."""
return Respond(request, list(self.configs.keys()), "application/json")
self._update_configs()
return Respond(request, list(self._configs.keys()), "application/json")

@wrappers.Request.application
def _serve_config(self, request):
Expand All @@ -554,12 +561,12 @@ def _serve_config(self, request):
return Respond(
request, 'query parameter "run" is required', "text/plain", 400
)
if run not in self.configs:
self._update_configs()
config = self._configs.get(run)
if config is None:
return Respond(
request, 'Unknown run: "%s"' % run, "text/plain", 400
)

config = self.configs[run]
return Respond(
request, json_format.MessageToJson(config), "application/json"
)
Expand Down Expand Up @@ -587,12 +594,12 @@ def _serve_metadata(self, request):
400,
)

if run not in self.configs:
self._update_configs()
config = self._configs.get(run)
if config is None:
return Respond(
request, 'Unknown run: "%s"' % run, "text/plain", 400
)

config = self.configs[run]
fpath = self._get_metadata_file_for_tensor(name, config)
if not fpath:
return Respond(
Expand Down Expand Up @@ -647,13 +654,12 @@ def _serve_tensor(self, request):
400,
)

if run not in self.configs:
self._update_configs()
config = self._configs.get(run)
if config is None:
return Respond(
request, 'Unknown run: "%s"' % run, "text/plain", 400
)

config = self.configs[run]

tensor = self.tensor_cache.get((run, name))
if tensor is None:
# See if there is a tensor file in the config.
Expand Down Expand Up @@ -714,12 +720,12 @@ def _serve_bookmarks(self, request):
request, 'query parameter "name" is required', "text/plain", 400
)

if run not in self.configs:
self._update_configs()
config = self._configs.get(run)
if config is None:
return Respond(
request, 'Unknown run: "%s"' % run, "text/plain", 400
)

config = self.configs[run]
fpath = self._get_bookmarks_file_for_tensor(name, config)
if not fpath:
return Respond(
Expand Down Expand Up @@ -757,14 +763,14 @@ def _serve_sprite_image(self, request):
request, 'query parameter "name" is required', "text/plain", 400
)

if run not in self.configs:
self._update_configs()
config = self._configs.get(run)
if config is None:
return Respond(
request, 'Unknown run: "%s"' % run, "text/plain", 400
)

config = self.configs[run]
embedding_info = self._get_embedding(name, config)

if not embedding_info or not embedding_info.sprite.image_path:
return Respond(
request,
Expand Down

0 comments on commit 638014e

Please sign in to comment.