New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CLI-881][CLI-880][CLI-451] Improve wandb sync to handle errors #2199
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2199 +/- ##
==========================================
+ Coverage 78.18% 78.31% +0.12%
==========================================
Files 243 244 +1
Lines 32416 32491 +75
==========================================
+ Hits 25344 25444 +100
+ Misses 7072 7047 -25
Continue to review full report at Codecov.
|
@@ -6,7 +6,7 @@ | |||
}, | |||
"git.ignoreLimitWarning": true, | |||
|
|||
"editor.formatOnSave": false, | |||
"editor.formatOnSave": true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was getting sick of forgetting to format my docs with black. I know there are a few files that aren't formatted but I figure the vast majority are and you can always save without formatting if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But make sure you are pinned to the right version of black. otherwise you will get problems. but im fine if other people dont mind.
ident, magic, version = struct.unpack("<4sHB", header) | ||
if ident != strtobytes(LEVELDBLOG_HEADER_IDENT): | ||
raise Exception("Invalid header") | ||
if magic != LEVELDBLOG_HEADER_MAGIC: | ||
raise Exception("Invalid header") | ||
if version != LEVELDBLOG_HEADER_VERSION: | ||
raise Exception("Invalid header") | ||
assert len(header) == header_length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was one bug where we didn't verify we had a valid header before attempt to unpack it.
self._write_record(s[data_used:], LEVELDBLOG_LAST) | ||
self._fp.flush() | ||
os.fsync(self._fp.fileno()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raubitsj not positive this is the right thing to do, but figured we should at least flush every block to disk explicitly. I don't think this actually fixes any issues with syncing and I can take it out if you think it will have negative performance consequences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isnt quite doing what you think it is doing.
I had planned on making flushes happen time based that is why i left it out. Writing on every block is fine, but this is only syncing on a record that spanned multiple blocks. It will be fine, but it wont actually guarantee that things are flushed regularly -- but in most cases it will happen.
lets keep it for now if it was tested to work ok.
try: | ||
return ds.scan_data() | ||
except AssertionError as e: | ||
if ds.in_last_block(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the fix for in progress syncing. If we get an assertion error and are in the last block of the leveldb file we just print a warning but don't raise an exception.
"individually or pass a list of paths." | ||
) | ||
self._send_tensorboard(tb_root, tb_logdirs, sm) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The linter was complaining about the complexity of this method so I put this into a helper method.
ds.open_for_scan(sync_item) | ||
try: | ||
ds.open_for_scan(sync_item) | ||
except AssertionError as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the other fix. If we get an assertion error when we open the file, it means the file exists but hasn't been written to yet.
@@ -273,7 +297,8 @@ def run(self): | |||
sys.stdout.flush() | |||
shown = True | |||
sm.finish() | |||
if self._mark_synced and not self._view: | |||
# Only mark synced if the run actually finished | |||
if self._mark_synced and not self._view and finished: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other big change is we don't mark "live" runs as synced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, thanks for adding all the assert messages :)
@@ -6,7 +6,7 @@ | |||
}, | |||
"git.ignoreLimitWarning": true, | |||
|
|||
"editor.formatOnSave": false, | |||
"editor.formatOnSave": true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But make sure you are pinned to the right version of black. otherwise you will get problems. but im fine if other people dont mind.
self._write_record(s[data_used:], LEVELDBLOG_LAST) | ||
self._fp.flush() | ||
os.fsync(self._fp.fileno()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isnt quite doing what you think it is doing.
I had planned on making flushes happen time based that is why i left it out. Writing on every block is fine, but this is only syncing on a record that spanned multiple blocks. It will be fine, but it wont actually guarantee that things are flushed regularly -- but in most cases it will happen.
lets keep it for now if it was tested to work ok.
@@ -273,7 +297,8 @@ def run(self): | |||
sys.stdout.flush() | |||
shown = True | |||
sm.finish() | |||
if self._mark_synced and not self._view: | |||
# Only mark synced if the run actually finished | |||
if self._mark_synced and not self._view and finished: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good.
This is how far I got to address offline / running sync. Not sure if this is the right approach, but there are 2 scenarios I'm sure fail today:
.wandb
is empty or doesn't have a valid header yet.wandb
file is being written to while someone is running wandb sync so the final record could be corrupt#2132
#2130
#1297
https://wandb.atlassian.net/browse/CLI-881
https://wandb.atlassian.net/browse/CLI-880
https://wandb.atlassian.net/browse/CLI-451
Description
What does the PR do?
Testing
How was this PR tested?