Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Replace gsutil with an aio library #631

Merged
merged 7 commits into from
Sep 14, 2023
Merged

Conversation

rzvoncek
Copy link
Contributor

@rzvoncek rzvoncek commented Aug 31, 2023

There is a bit of duplicated code, but that will go into the AbstractStorage once we do #628.
The S3 ITs are failing, because I enforced KMS and there is a KMS test that is not working because #612 is still open.

Oh, and I forgot to remove unused code, will handle asap.

Fixes #627.
Fixes #637.

@codecov
Copy link

codecov bot commented Aug 31, 2023

Codecov Report

Merging #631 (f799451) into master (22612d3) will increase coverage by 9.09%.
The diff coverage is 90.79%.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #631      +/-   ##
==========================================
+ Coverage   71.89%   80.98%   +9.09%     
==========================================
  Files          56       55       -1     
  Lines        4639     4669      +30     
  Branches      675      671       -4     
==========================================
+ Hits         3335     3781     +446     
+ Misses       1250      860     -390     
+ Partials       54       28      -26     
Files Changed Coverage Δ
medusa/storage/abstract_storage.py 89.37% <50.00%> (-3.10%) ⬇️
medusa/storage/google_storage.py 94.88% <94.44%> (+42.38%) ⬆️
medusa/backup_node.py 87.43% <100.00%> (+1.79%) ⬆️
medusa/download.py 88.88% <100.00%> (-0.59%) ⬇️
medusa/storage/s3_base_storage.py 90.80% <100.00%> (+2.03%) ⬆️

... and 20 files with indirect coverage changes

@rzvoncek rzvoncek marked this pull request as ready for review August 31, 2023 18:49
with GSUtil(self.config) as gsutil:
for parent, src_paths in _group_by_parent(srcs):
yield self._upload_paths(gsutil, parent, src_paths, dest)
@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: we should probably retry individual blob uploads (_upload_blob()) instead of this one.
It will work nicely with the resumable transfer mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

)
resp = resp['resource']
else:
resp = await self.gcs_storage.upload_from_filename(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: upload_from_filename() will read the whole file into memory, which will obviously not work for us.
Let's use upload() instead and pass it a file object as file_data arg. Let's also set force_resumable_upload to true and disable the timeout for now.
Multipart uploads in this lib don't seem to work as we would want them to anyway (seems like all parts are read at once, they're not uploaded concurrently and there's no retry when a part fails).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

with GSUtil(self.config) as gsutil:
for parent, src_paths in _group_by_parent(srcs):
yield self._download_paths(gsutil, parent, src_paths, dest)
@retry(stop_max_attempt_number=MAX_UP_DOWN_LOAD_RETRIES, wait_fixed=5000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: I think we should move the retries on individual downloads instead.
Since the retry swallows the exceptions, I'd also catch it, log it and then re-raise it for better observability of these failed attempts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -31,7 +31,7 @@
from medusa.index import add_backup_start_to_index, add_backup_finish_to_index, set_latest_backup_in_index
from medusa.monitoring import Monitoring
from medusa.storage import Storage, format_bytes_str, ManifestObject, divide_chunks
from medusa.storage.google_storage import GSUTIL_MAX_FILES_PER_CHUNK
from medusa.storage.google_storage import GOOGLE_MAX_FILES_PER_CHUNK
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Do we still need this? I have a feeling this was related to gsutil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct, we don't need this. The new storage driver implementations handle this themsleves (eventually we'll do this in the AbstractStorage anyway).

Copy link
Contributor

@adejanovski adejanovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Large files downloads are failing due to the download method being invoked.

logging.debug("Blob {} last modification time is {}".format(blob.name, blob.extra["last_modified"]))
return parser.parse(blob.extra["last_modified"])
try:
await self.gcs_storage.download_to_filename(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue: This method reads the files at once and puts their content into memory. We need to use download_stream() instead which will return a BufferedStream which should be writeable to a file without loading all the data in memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Pushed a new commit, together with a rebase on a recent master.

setup.py Outdated
@@ -70,7 +70,8 @@
'dnspython>=2.2.1',
'asyncio==3.4.3',
'aiohttp==3.8.5',
'aiohttp-s3-client==0.8.17'
'aiohttp-s3-client==0.8.17',
'gcloud-aio-storage==8.3.0'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I added this dependency which was missing from setup.py

Copy link
Contributor

@adejanovski adejanovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great but I'm wondering about the concurrency level of some operations.

Comment on lines 233 to 235
async def _delete_objects(self, objects: t.List[AbstractBlob]):
coros = [self._delete_object(obj) for obj in objects]
await asyncio.gather(*coros)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is this going to delete all objects concurrently? Maybe our concurrency setting should apply here to avoid sending too many concurrent requests?

Copy link
Contributor Author

@rzvoncek rzvoncek Sep 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's all at once. Chunking might be a good idea. Do I chunk it in chunks of config.max_concurrent_transfers size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I chunk it in chunks of config.max_concurrent_transfers size?

Yes, sounds good

Comment on lines 248 to 250
async def _download_blobs(self, srcs: t.List[t.Union[Path, str]], dest: t.Union[Path, str]):
coros = [self._download_blob(src, dest) for src in map(str, srcs)]
await asyncio.gather(*coros)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: is it downloading all files concurrently at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the same as with the deletes. Do we chunk by concucurrent transfers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should chunk, otherwise we may overwhelm the network which could have unforeseen impacts.

@sonarcloud
Copy link

sonarcloud bot commented Sep 14, 2023

SonarCloud Quality Gate failed.    Quality Gate failed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 13 Code Smells

No Coverage information No Coverage information
3.8% 3.8% Duplication

idea Catch issues before they fail your Quality Gate with our IDE extension sonarlint SonarLint

Copy link
Contributor

@adejanovski adejanovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome stuff! :shipit:

@rzvoncek rzvoncek merged commit e02afe1 into master Sep 14, 2023
27 of 28 checks passed
adejanovski pushed a commit that referenced this pull request Sep 15, 2023
* [Storage] Replace gsutil with an aio library

* [Storage] Dont chunk files outside of storage drivers

* [Storage/GCS] Move retries to uploads of individual blobs

* [Storage/GCS] Use timeouts everywhere and force resumable where relevant

* [Storage/GCS] Move retries for downloads to individual blobs

* [GCS Storage] Download via stream instead of all into memory

* [GCS Storage] Chunk deletes/downloads by config.concurrent_transfers
adejanovski pushed a commit that referenced this pull request Sep 15, 2023
* [Storage] Replace gsutil with an aio library

* [Storage] Dont chunk files outside of storage drivers

* [Storage/GCS] Move retries to uploads of individual blobs

* [Storage/GCS] Use timeouts everywhere and force resumable where relevant

* [Storage/GCS] Move retries for downloads to individual blobs

* [GCS Storage] Download via stream instead of all into memory

* [GCS Storage] Chunk deletes/downloads by config.concurrent_transfers
@rzvoncek rzvoncek deleted the radovan/aio-gcs branch September 22, 2023 11:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants