Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ayon_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
create_dependency_package_basename,
)
from .server_api import (
RequestTypes,
ServerAPI,
)

Expand Down Expand Up @@ -60,7 +61,9 @@
update_event,
dispatch_event,
enroll_event_job,
download_file_to_stream,
download_file,
upload_file_from_stream,
upload_file,
trigger_server_restart,
query_graphql,
Expand Down Expand Up @@ -223,6 +226,7 @@
"slugify_string",
"create_dependency_package_basename",

"RequestTypes",
"ServerAPI",

"GlobalServerAPI",
Expand Down Expand Up @@ -276,7 +280,9 @@
"update_event",
"dispatch_event",
"enroll_event_job",
"download_file_to_stream",
"download_file",
"upload_file_from_stream",
"upload_file",
"trigger_server_restart",
"query_graphql",
Expand Down
51 changes: 51 additions & 0 deletions ayon_api/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,32 @@ def enroll_event_job(*args, **kwargs):
return con.enroll_event_job(*args, **kwargs)


def download_file_to_stream(*args, **kwargs):
"""Download file from AYON server to IOStream.

Endpoint can be full url (must start with 'base_url' of api object).

Progress object can be used to track download. Can be used when
download happens in thread and other thread want to catch changes over
time.

Todos:
Use retries and timeout.
Return RestApiResponse.

Args:
endpoint (str): Endpoint or URL to file that should be downloaded.
stream (Union[io.BytesIO, BinaryIO]): Stream where output will be stored.
chunk_size (Optional[int]): Size of chunks that are received
in single loop.
progress (Optional[TransferProgress]): Object that gives ability
to track download progress.

"""
con = get_server_api_connection()
return con.download_file_to_stream(*args, **kwargs)


def download_file(*args, **kwargs):
"""Download file from AYON server.

Expand All @@ -814,6 +840,31 @@ def download_file(*args, **kwargs):
return con.download_file(*args, **kwargs)


def upload_file_from_stream(*args, **kwargs):
"""Upload file to server from bytes.

Todos:
Use retries and timeout.
Return RestApiResponse.

Args:
endpoint (str): Endpoint or url where file will be uploaded.
stream (Union[io.BytesIO, BinaryIO]): File content stream.
progress (Optional[TransferProgress]): Object that gives ability
to track upload progress.
request_type (Optional[RequestType]): Type of request that will
be used to upload file.
**kwargs (Any): Additional arguments that will be passed
to request function.

Returns:
requests.Response: Response object

"""
con = get_server_api_connection()
return con.upload_file_from_stream(*args, **kwargs)


def upload_file(*args, **kwargs):
"""Upload file to server.

Expand Down
147 changes: 109 additions & 38 deletions ayon_api/server_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1602,30 +1602,25 @@ def enroll_event_job(

return response.data

def _download_file(self, url, filepath, chunk_size, progress):
dst_directory = os.path.dirname(filepath)
if not os.path.exists(dst_directory):
os.makedirs(dst_directory)

def _download_file_to_stream(self, url, stream, chunk_size, progress):
kwargs = {"stream": True}
if self._session is None:
kwargs["headers"] = self.get_headers()
get_func = self._base_functions_mapping[RequestTypes.get]
else:
get_func = self._session_functions_mapping[RequestTypes.get]

with open(filepath, "wb") as f_stream:
with get_func(url, **kwargs) as response:
response.raise_for_status()
progress.set_content_size(response.headers["Content-length"])
for chunk in response.iter_content(chunk_size=chunk_size):
f_stream.write(chunk)
progress.add_transferred_chunk(len(chunk))
with get_func(url, **kwargs) as response:
response.raise_for_status()
progress.set_content_size(response.headers["Content-length"])
for chunk in response.iter_content(chunk_size=chunk_size):
stream.write(chunk)
progress.add_transferred_chunk(len(chunk))

def download_file(
self, endpoint, filepath, chunk_size=None, progress=None
def download_file_to_stream(
self, endpoint, stream, chunk_size=None, progress=None
):
"""Download file from AYON server.
"""Download file from AYON server to IOStream.

Endpoint can be full url (must start with 'base_url' of api object).

Expand All @@ -1639,7 +1634,7 @@ def download_file(

Args:
endpoint (str): Endpoint or URL to file that should be downloaded.
filepath (str): Path where file will be downloaded.
stream (Union[io.BytesIO, BinaryIO]): Stream where output will be stored.
chunk_size (Optional[int]): Size of chunks that are received
in single loop.
progress (Optional[TransferProgress]): Object that gives ability
Expand All @@ -1655,16 +1650,16 @@ def download_file(
endpoint = endpoint.lstrip("/").rstrip("/")
url = "{}/{}".format(self._rest_url, endpoint)

# Create dummy object so the function does not have to check
# 'progress' variable everywhere
if progress is None:
progress = TransferProgress()

progress.set_source_url(url)
progress.set_destination_url(filepath)
progress.set_started()

try:
self._download_file(url, filepath, chunk_size, progress)
self._download_file_to_stream(
url, stream, chunk_size, progress
)

except Exception as exc:
progress.set_failed(str(exc))
Expand All @@ -1674,12 +1669,58 @@ def download_file(
progress.set_transfer_done()
return progress

def download_file(
self, endpoint, filepath, chunk_size=None, progress=None
):
"""Download file from AYON server.

Endpoint can be full url (must start with 'base_url' of api object).

Progress object can be used to track download. Can be used when
download happens in thread and other thread want to catch changes over
time.

Todos:
Use retries and timeout.
Return RestApiResponse.

Args:
endpoint (str): Endpoint or URL to file that should be downloaded.
filepath (str): Path where file will be downloaded.
chunk_size (Optional[int]): Size of chunks that are received
in single loop.
progress (Optional[TransferProgress]): Object that gives ability
to track download progress.

"""
# Create dummy object so the function does not have to check
# 'progress' variable everywhere
if progress is None:
progress = TransferProgress()

progress.set_destination_url(filepath)

dst_directory = os.path.dirname(filepath)
os.makedirs(dst_directory, exist_ok=True)

try:
with open(filepath, "wb") as stream:
self.download_file_to_stream(
endpoint, stream, chunk_size, progress
)

except Exception as exc:
progress.set_failed(str(exc))
raise

return progress

@staticmethod
def _upload_chunks_iter(file_stream, progress, chunk_size):
"""Generator that yields chunks of file.

Args:
file_stream (io.BinaryIO): Byte stream.
file_stream (Union[io.BytesIO, BinaryIO]): Byte stream.
progress (TransferProgress): Object to track upload progress.
chunk_size (int): Size of chunks that are uploaded at once.

Expand All @@ -1704,7 +1745,7 @@ def _upload_chunks_iter(file_stream, progress, chunk_size):
def _upload_file(
self,
url,
filepath,
stream,
progress,
request_type=None,
chunk_size=None,
Expand All @@ -1714,7 +1755,7 @@ def _upload_file(

Args:
url (str): Url where file will be uploaded.
filepath (str): Source filepath.
stream (Union[io.BytesIO, BinaryIO]): File stream.
progress (TransferProgress): Object that gives ability to track
progress.
request_type (Optional[RequestType]): Type of request that will
Expand Down Expand Up @@ -1743,28 +1784,27 @@ def _upload_file(
if not chunk_size:
chunk_size = self.default_upload_chunk_size

with open(filepath, "rb") as stream:
response = post_func(
url,
data=self._upload_chunks_iter(stream, progress, chunk_size),
**kwargs
)
response = post_func(
url,
data=self._upload_chunks_iter(stream, progress, chunk_size),
**kwargs
)

response.raise_for_status()
return response

def upload_file(
self, endpoint, filepath, progress=None, request_type=None, **kwargs
def upload_file_from_stream(
self, endpoint, stream, progress, request_type, **kwargs
):
"""Upload file to server.
"""Upload file to server from bytes.

Todos:
Use retries and timeout.
Return RestApiResponse.

Args:
endpoint (str): Endpoint or url where file will be uploaded.
filepath (str): Source filepath.
stream (Union[io.BytesIO, BinaryIO]): File content stream.
progress (Optional[TransferProgress]): Object that gives ability
to track upload progress.
request_type (Optional[RequestType]): Type of request that will
Expand All @@ -1774,7 +1814,7 @@ def upload_file(

Returns:
requests.Response: Response object

"""
if endpoint.startswith(self._base_url):
url = endpoint
Expand All @@ -1787,13 +1827,12 @@ def upload_file(
if progress is None:
progress = TransferProgress()

progress.set_source_url(filepath)
progress.set_destination_url(url)
progress.set_started()

try:
return self._upload_file(
url, filepath, progress, request_type, **kwargs
url, stream, progress, request_type, **kwargs
)

except Exception as exc:
Expand All @@ -1803,6 +1842,39 @@ def upload_file(
finally:
progress.set_transfer_done()

def upload_file(
self, endpoint, filepath, progress=None, request_type=None, **kwargs
):
"""Upload file to server.

Todos:
Use retries and timeout.
Return RestApiResponse.

Args:
endpoint (str): Endpoint or url where file will be uploaded.
filepath (str): Source filepath.
progress (Optional[TransferProgress]): Object that gives ability
to track upload progress.
request_type (Optional[RequestType]): Type of request that will
be used to upload file.
**kwargs (Any): Additional arguments that will be passed
to request function.

Returns:
requests.Response: Response object

"""
if progress is None:
progress = TransferProgress()

progress.set_source_url(filepath)

with open(filepath, "rb") as stream:
return self.upload_file_from_stream(
endpoint, stream, progress, request_type, **kwargs
)

def trigger_server_restart(self):
"""Trigger server restart.

Expand Down Expand Up @@ -2153,8 +2225,7 @@ def download_addon_private_file(
dst_filepath = os.path.join(destination_dir, destination_filename)
# Filename can contain "subfolders"
dst_dirpath = os.path.dirname(dst_filepath)
if not os.path.exists(dst_dirpath):
os.makedirs(dst_dirpath)
os.makedirs(dst_dirpath, exist_ok=True)

url = self.get_addon_url(
addon_name,
Expand Down