diff --git a/.vscode/settings.json b/.vscode/settings.json index 63f90c36a2f..8b15035c66e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,7 +6,7 @@ }, "git.ignoreLimitWarning": true, - "editor.formatOnSave": false, + "editor.formatOnSave": true, "python.linting.enabled": true, "python.linting.flake8Enabled": true, diff --git a/tests/fixtures/train.py b/tests/fixtures/train.py index 155e40804df..91a3a154020 100644 --- a/tests/fixtures/train.py +++ b/tests/fixtures/train.py @@ -2,21 +2,40 @@ import time import random import wandb +import numpy as np import os import signal +import sys parser = argparse.ArgumentParser() -parser.add_argument('--epochs', type=int, default=2) +parser.add_argument("--epochs", type=int, default=2) +parser.add_argument("--heavy", action="store_true", default=False) +parser.add_argument("--sleep_every", type=int, default=0) args = parser.parse_args() -print("Calling init") +print("Calling init with args: {}", format(args)) +print("Environ: {}".format({k: v for k, v in os.environ.items() if k.startswith("WANDB")})) wandb.init(config=args) -print("Init called") +print("Init called with config {}".format(wandb.config)) -#raise ValueError() -#os.kill(os.getpid(), signal.SIGINT) +# raise ValueError() +# os.kill(os.getpid(), signal.SIGINT) for i in range(0, wandb.config.epochs): loss = random.uniform(0, wandb.config.epochs - i) print("loss: %s" % loss) wandb.log({"loss": loss}, commit=False) + if wandb.config.heavy: + for x in range(50): + wandb.log( + { + "hist_{}".format(x): wandb.Histogram( + np.random.randint(255, size=(1000)) + ) + }, + commit=False, + ) wandb.log({"cool": True}) + if wandb.config.sleep_every > 0 and i % wandb.config.sleep_every == 0: + print("sleeping") + time.sleep(random.random() + 1) + sys.stdout.flush() print("Finished") diff --git a/tests/test_offline_sync.py b/tests/test_offline_sync.py new file mode 100644 index 00000000000..a715090c977 --- /dev/null +++ b/tests/test_offline_sync.py @@ -0,0 +1,77 @@ +import os +import subprocess +import sys +import time +import glob +from .utils import fixture_open + + +def test_sync_in_progress(live_mock_server, test_dir): + with open("train.py", "w") as f: + f.write(fixture_open("train.py").read()) + env = dict(os.environ) + env["WANDB_MODE"] = "offline" + env["WANDB_DIR"] = test_dir + env["WANDB_CONSOLE"] = "off" + stdout = open("stdout.log", "w+") + offline_run = subprocess.Popen( + [ + sys.executable, + "train.py", + "--epochs", + "50", + "--sleep_every", + "15", + "--heavy", + ], + stdout=stdout, + stderr=subprocess.STDOUT, + bufsize=1, + close_fds=True, + env=env, + ) + attempts = 0 + latest_run = os.path.join(test_dir, "wandb", "latest-run") + while not os.path.exists(latest_run) and attempts < 50: + time.sleep(0.1) + # On windows we have no symlinks, so we grab the run dir + if attempts > 0 and attempts % 10 == 0: + if os.path.exists(os.path.join(test_dir, "wandb")): + run_dir = os.listdir(os.path.join(test_dir, "wandb")) + if len(run_dir) > 0: + latest_run = os.path.join(test_dir, "wandb", run_dir[0]) + attempts += 1 + if attempts == 50: + print("cur dir contents: ", os.listdir(test_dir)) + print("wandb dir contents: ", os.listdir(os.path.join(test_dir, "wandb"))) + stdout.seek(0) + print("STDOUT") + print(stdout.read()) + debug = os.path.join("wandb", "debug.log") + debug_int = os.path.join("wandb", "debug-internal.log") + if os.path.exists(debug): + print("DEBUG") + print(open(debug).read()) + if os.path.exists(debug_int): + print("DEBUG INTERNAL") + print(open(debug).read()) + assert False, "train.py failed to launch :(" + else: + print( + "Starting live syncing after {} seconds from: {}".format( + attempts * 0.1, latest_run + ) + ) + for i in range(3): + # Generally, the first sync will fail because the .wandb file is empty + sync = subprocess.Popen(["wandb", "sync", latest_run], env=os.environ) + assert sync.wait() == 0 + # Only confirm we don't have a .synced file if our offline run is still running + if offline_run.poll() is None: + assert len(glob.glob(os.path.join(latest_run, "*.synced"))) == 0 + assert offline_run.wait() == 0 + sync = subprocess.Popen(["wandb", "sync", latest_run], env=os.environ) + assert sync.wait() == 0 + assert len(glob.glob(os.path.join(latest_run, "*.synced"))) == 1 + print("Number of upserts: ", live_mock_server.get_ctx()["upsert_bucket_count"]) + assert live_mock_server.get_ctx()["upsert_bucket_count"] >= 3 diff --git a/tests/wandb_artifacts_test.py b/tests/wandb_artifacts_test.py index b08083cf86b..1be15240c32 100644 --- a/tests/wandb_artifacts_test.py +++ b/tests/wandb_artifacts_test.py @@ -515,6 +515,7 @@ def test_add_table_from_dataframe(live_mock_server, test_settings): run.finish() +@pytest.mark.timeout(120) def test_artifact_log_with_network_error(live_mock_server, test_settings): run = wandb.init(settings=test_settings) artifact = wandb.Artifact("table-example", "dataset") diff --git a/tests/wandb_integration_test.py b/tests/wandb_integration_test.py index 0daf90f6202..7777f1ca014 100644 --- a/tests/wandb_integration_test.py +++ b/tests/wandb_integration_test.py @@ -65,7 +65,8 @@ def test_resume_allow_success(live_mock_server, test_settings): @pytest.mark.skipif( - platform.system() == "Windows", reason="File syncing is somewhat busted in windows" + platform.system() == "Windows" or sys.version_info < (3, 6), + reason="File syncing is somewhat busted in windows and python 2", ) # TODO: Sometimes wandb-summary.json didn't exists, other times requirements.txt in windows def test_parallel_runs(request, live_mock_server, test_settings, test_name): diff --git a/wandb/sdk/internal/datastore.py b/wandb/sdk/internal/datastore.py index 3dc01b84b94..50c26df3ceb 100644 --- a/wandb/sdk/internal/datastore.py +++ b/wandb/sdk/internal/datastore.py @@ -65,12 +65,15 @@ def __init__(self): self._opened_for_scan = False self._fp = None self._index = 0 + self._size_bytes = 0 self._crc = [0] * (LEVELDBLOG_LAST + 1) for x in range(1, LEVELDBLOG_LAST + 1): self._crc[x] = zlib.crc32(strtobytes(chr(x))) & 0xFFFFFFFF - assert wandb._assert_is_internal_process + assert ( + wandb._assert_is_internal_process + ), "DataStore can only be used in the internal process" def open_for_write(self, fname): self._fname = fname @@ -95,24 +98,36 @@ def open_for_scan(self, fname): logger.info("open for scan: %s", fname) self._fp = open(fname, "rb") self._index = 0 + self._size_bytes = os.stat(fname).st_size self._opened_for_scan = True self._read_header() + def in_last_block(self): + """When reading, we want to know if we're in the last block to + handle in progress writes""" + return self._index > self._size_bytes - LEVELDBLOG_DATA_LEN + def scan_record(self): - assert self._opened_for_scan + assert self._opened_for_scan, "file not open for scanning" # TODO(jhr): handle some assertions as file corruption issues # assume we have enough room to read header, checked by caller? header = self._fp.read(LEVELDBLOG_HEADER_LEN) if len(header) == 0: return None - assert len(header) == LEVELDBLOG_HEADER_LEN + assert ( + len(header) == LEVELDBLOG_HEADER_LEN + ), "record header is {} bytes instead of the expected {}".format( + len(header), LEVELDBLOG_HEADER_LEN + ) fields = struct.unpack(" LEVELDBLOG_DATA_LEN: @@ -231,8 +259,10 @@ def _write_data(self, s): data_used += LEVELDBLOG_DATA_LEN data_left -= LEVELDBLOG_DATA_LEN - # write last + # write last and flush the entire block to disk self._write_record(s[data_used:], LEVELDBLOG_LAST) + self._fp.flush() + os.fsync(self._fp.fileno()) return file_offset, self._index - file_offset, flush_index, flush_offset @@ -249,7 +279,7 @@ def write(self, obj): """ raw_size = obj.ByteSize() s = obj.SerializeToString() - assert len(s) == raw_size + assert len(s) == raw_size, "invalid serialization" ret = self._write_data(s) return ret diff --git a/wandb/sdk_py27/internal/datastore.py b/wandb/sdk_py27/internal/datastore.py index 5e63a299441..3ab6b9d4e27 100644 --- a/wandb/sdk_py27/internal/datastore.py +++ b/wandb/sdk_py27/internal/datastore.py @@ -65,12 +65,15 @@ def __init__(self): self._opened_for_scan = False self._fp = None self._index = 0 + self._size_bytes = 0 self._crc = [0] * (LEVELDBLOG_LAST + 1) for x in range(1, LEVELDBLOG_LAST + 1): self._crc[x] = zlib.crc32(strtobytes(chr(x))) & 0xFFFFFFFF - assert wandb._assert_is_internal_process + assert ( + wandb._assert_is_internal_process + ), "DataStore can only be used in the internal process" def open_for_write(self, fname): self._fname = fname @@ -95,24 +98,36 @@ def open_for_scan(self, fname): logger.info("open for scan: %s", fname) self._fp = open(fname, "rb") self._index = 0 + self._size_bytes = os.stat(fname).st_size self._opened_for_scan = True self._read_header() + def in_last_block(self): + """When reading, we want to know if we're in the last block to + handle in progress writes""" + return self._index > self._size_bytes - LEVELDBLOG_DATA_LEN + def scan_record(self): - assert self._opened_for_scan + assert self._opened_for_scan, "file not open for scanning" # TODO(jhr): handle some assertions as file corruption issues # assume we have enough room to read header, checked by caller? header = self._fp.read(LEVELDBLOG_HEADER_LEN) if len(header) == 0: return None - assert len(header) == LEVELDBLOG_HEADER_LEN + assert ( + len(header) == LEVELDBLOG_HEADER_LEN + ), "record header is {} bytes instead of the expected {}".format( + len(header), LEVELDBLOG_HEADER_LEN + ) fields = struct.unpack(" LEVELDBLOG_DATA_LEN: @@ -231,8 +259,10 @@ def _write_data(self, s): data_used += LEVELDBLOG_DATA_LEN data_left -= LEVELDBLOG_DATA_LEN - # write last + # write last and flush the entire block to disk self._write_record(s[data_used:], LEVELDBLOG_LAST) + self._fp.flush() + os.fsync(self._fp.fileno()) return file_offset, self._index - file_offset, flush_index, flush_offset @@ -249,7 +279,7 @@ def write(self, obj): """ raw_size = obj.ByteSize() s = obj.SerializeToString() - assert len(s) == raw_size + assert len(s) == raw_size, "invalid serialization" ret = self._write_data(s) return ret diff --git a/wandb/sync/sync.py b/wandb/sync/sync.py index 19c827ff0f1..bf5f456228d 100644 --- a/wandb/sync/sync.py +++ b/wandb/sync/sync.py @@ -131,6 +131,22 @@ def _find_tfevent_files(self, sync_item): tb_event_files = 1 return tb_event_files, tb_logdirs, tb_root + def _setup_tensorboard(self, tb_root, tb_logdirs, tb_event_files, sync_item): + """Returns true if this sync item can be synced as tensorboard""" + if tb_root is not None: + if tb_event_files > 0 and sync_item.endswith(WANDB_SUFFIX): + wandb.termwarn("Found .wandb file, not streaming tensorboard metrics.") + else: + print("Found {} tfevent files in {}".format(tb_event_files, tb_root)) + if len(tb_logdirs) > 3: + wandb.termwarn( + "Found {} directories containing tfevent files. " + "If these represent multiple experiments, sync them " + "individually or pass a list of paths." + ) + return True + return False + def _send_tensorboard(self, tb_root, tb_logdirs, send_manager): if self._entity is None: viewer, server_info = send_manager._api.viewer_server_info() @@ -201,6 +217,21 @@ def _send_tensorboard(self, tb_root, tb_logdirs, send_manager): handle_manager.finish() send_manager.finish() + def _robust_scan(self, ds): + """Attempt to scan data, handling incomplete files""" + try: + return ds.scan_data() + except AssertionError as e: + if ds.in_last_block(): + wandb.termwarn( + ".wandb file is incomplete ({}), be sure to sync this run again once it's finished".format( + e + ) + ) + return None + else: + raise e + def run(self): for sync_item in self._sync_list: tb_event_files, tb_logdirs, tb_root = self._find_tfevent_files(sync_item) @@ -214,41 +245,34 @@ def run(self): continue if len(filtered_files) > 0: sync_item = os.path.join(sync_item, filtered_files[0]) - root_dir = os.path.dirname(sync_item) - # If we're syncing tensorboard, let's use a tmpdir - if tb_event_files > 0 and not sync_item.endswith(WANDB_SUFFIX): - root_dir = TMPDIR.name + sync_tb = self._setup_tensorboard( + tb_root, tb_logdirs, tb_event_files, sync_item + ) + # If we're syncing tensorboard, let's use a tmp dir for images etc. + root_dir = TMPDIR.name if sync_tb else os.path.dirname(sync_item) sm = sender.SendManager.setup(root_dir) + if sync_tb: + self._send_tensorboard(tb_root, tb_logdirs, sm) + continue - if tb_root is not None: - if tb_event_files > 0 and sync_item.endswith(WANDB_SUFFIX): - wandb.termwarn( - "Found .wandb file, not streaming tensorboard metrics." - ) - else: - print( - "Found {} tfevent files in {}".format(tb_event_files, tb_root) - ) - if len(tb_logdirs) > 3: - wandb.termwarn( - "Found {} directories containing tfevent files. " - "If these represent multiple experiments, sync them " - "individually or pass a list of paths." - ) - self._send_tensorboard(tb_root, tb_logdirs, sm) - continue ds = datastore.DataStore() - ds.open_for_scan(sync_item) + try: + ds.open_for_scan(sync_item) + except AssertionError as e: + print(".wandb file is empty ({}), skipping: {}".format(e, sync_item)) + continue # save exit for final send exit_pb = None + finished = False shown = False - while True: - data = ds.scan_data() + data = self._robust_scan(ds) if data is None: break pb, exit_pb, cont = self._parse_pb(data, exit_pb) + if exit_pb is not None: + finished = True if cont: continue sm.send(pb) @@ -273,7 +297,8 @@ def run(self): sys.stdout.flush() shown = True sm.finish() - if self._mark_synced and not self._view: + # Only mark synced if the run actually finished + if self._mark_synced and not self._view and finished: synced_file = "{}{}".format(sync_item, SYNCED_SUFFIX) with open(synced_file, "w"): pass