Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
minversion = 3.0
log_cli=true
python_files = test_*.py
addopts = -n32 --dist=loadscope
addopts = -n 32 --dist=loadscope
41 changes: 14 additions & 27 deletions src/superannotate/lib/app/interface/sdk_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from lib.app.helpers import get_annotation_paths
from lib.app.helpers import get_paths_and_duplicated_from_csv
from lib.app.helpers import reformat_metrics_json
from lib.app.interface.types import AnnotationStatuses
from lib.app.interface.types import AnnotationType
from lib.app.interface.types import NotEmptyStr
from lib.app.interface.types import Status
Expand Down Expand Up @@ -1646,7 +1647,7 @@ def upload_images_from_s3_bucket_to_project(
def prepare_export(
project: Union[NotEmptyStr, dict],
folder_names: Optional[List[NotEmptyStr]] = None,
annotation_statuses: Optional[List[NotEmptyStr]] = None,
annotation_statuses: Optional[List[AnnotationStatuses]] = None,
include_fuse: Optional[StrictBool] = False,
only_pinned=False,
):
Expand Down Expand Up @@ -2152,38 +2153,24 @@ def download_export(
"""
project_name, folder_name = extract_project_folder(project)
export_name = export["name"] if isinstance(export, dict) else export
response = controller.download_export(

use_case = controller.download_export(
project_name=project_name,
export_name=export_name,
folder_path=folder_path,
extract_zip_contents=extract_zip_contents,
to_s3_bucket=to_s3_bucket,
)
downloaded_folder_path = response.data

if to_s3_bucket:
to_s3_bucket = boto3.Session().resource("s3").Bucket(to_s3_bucket)

files_to_upload = []
for file in Path(downloaded_folder_path).rglob("*.*"):
files_to_upload.append(file)

def _upload_file_to_s3(to_s3_bucket, path, s3_key) -> None:
controller.upload_file_to_s3(
to_s3_bucket=to_s3_bucket, path=path, s3_key=s3_key
)

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
results = []
for path in files_to_upload:
s3_key = f"{path.as_posix()}"
results.append(
executor.submit(_upload_file_to_s3, to_s3_bucket, str(path), s3_key)
)

for future in concurrent.futures.as_completed(results):
future.result()
logger.info("Exported to AWS %s/%s", to_s3_bucket, str(path))
if use_case.is_valid():
if to_s3_bucket:
with tqdm(
total=use_case.get_upload_files_count(), desc="Uploading"
) as progress_bar:
for _ in use_case.execute():
progress_bar.update(1)
else:
for _ in use_case.execute():
continue


@Trackable
Expand Down
10 changes: 10 additions & 0 deletions src/superannotate/lib/app/interface/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ def validate(cls, value: Union[str]) -> Union[str]:
return value


class AnnotationStatuses(StrictStr):
@classmethod
def validate(cls, value: Union[str]) -> Union[str]:
if value.lower() not in AnnotationStatus.values():
raise TypeError(
f"Available annotation_statuses are {', '.join(AnnotationStatus.titles())}. "
)
return value


def to_chunks(t, size=2):
it = iter(t)
return zip(*[it] * size)
Expand Down
4 changes: 4 additions & 0 deletions src/superannotate/lib/core/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def get_value(cls, name):
def values(cls):
return [enum.name.lower() for enum in list(cls)]

@classmethod
def titles(cls):
return [enum.name for enum in list(cls)]


class ProjectType(BaseTitledEnum):
VECTOR = "Vector", 1
Expand Down
117 changes: 76 additions & 41 deletions src/superannotate/lib/core/usecases.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os.path
import random
import tempfile
import time
import uuid
import zipfile
Expand Down Expand Up @@ -4012,58 +4013,92 @@ def __init__(
self._folder_path = folder_path
self._extract_zip_contents = extract_zip_contents
self._to_s3_bucket = to_s3_bucket
self._temp_dir = None

def validate_project_type(self):
if self._project.project_type in constances.LIMITED_FUNCTIONS:
raise AppValidationException(
constances.LIMITED_FUNCTIONS[self._project.project_type]
)

def execute(self):
if self.is_valid():
exports = self._service.get_exports(
team_id=self._project.team_id, project_id=self._project.uuid
)
export_id = None
for export in exports:
if export["name"] == self._export_name:
export_id = export["id"]
break
if not export_id:
raise AppException("Export not found.")
def upload_to_s3_from_folder(self, folder_path: str):
to_s3_bucket = boto3.Session().resource("s3").Bucket(self._to_s3_bucket)
files_to_upload = list(Path(folder_path).rglob("*.*"))

while True:
export = self._service.get_export(
team_id=self._project.team_id,
project_id=self._project.uuid,
export_id=export_id,
def _upload_file_to_s3(_to_s3_bucket, _path, _s3_key) -> None:
_to_s3_bucket.upload_file(_path, _s3_key)

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
results = []
for path in files_to_upload:
s3_key = f"{self._folder_path}/{path.name}"
results.append(
executor.submit(_upload_file_to_s3, to_s3_bucket, str(path), s3_key)
)
yield

if export["status"] == ExportStatus.IN_PROGRESS.value:
logger.info("Waiting 5 seconds for export to finish on server.")
time.sleep(5)
continue
if export["status"] == ExportStatus.ERROR.value:
raise AppException("Couldn't download export.")
pass
break

filename = Path(export["path"]).name
filepath = Path(self._folder_path) / filename
with requests.get(export["download"], stream=True) as r:
r.raise_for_status()
with open(filepath, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if self._extract_zip_contents:
with zipfile.ZipFile(filepath, "r") as f:
f.extractall(self._folder_path)
Path.unlink(filepath)
logger.info(f"Extracted {filepath} to folder {self._folder_path}")
else:
logger.info(f"Downloaded export ID {export['id']} to {filepath}")
def download_to_local_storage(self, destination: str):
exports = self._service.get_exports(
team_id=self._project.team_id, project_id=self._project.uuid
)
export = next(filter(lambda i: i["name"] == self._export_name, exports), None)
export = self._service.get_export(
team_id=self._project.team_id,
project_id=self._project.uuid,
export_id=export["id"],
)
if not export:
raise AppException("Export not found.")
export_status = export["status"]

while export_status != ExportStatus.COMPLETE.value:
logger.info("Waiting 5 seconds for export to finish on server.")
time.sleep(5)

export = self._service.get_export(
team_id=self._project.team_id,
project_id=self._project.uuid,
export_id=export["id"],
)
export_status = export["status"]
if export_status in (ExportStatus.ERROR.value, ExportStatus.CANCELED.value):
raise AppException("Couldn't download export.")

filename = Path(export["path"]).name
filepath = Path(destination) / filename
with requests.get(export["download"], stream=True) as response:
response.raise_for_status()
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
if self._extract_zip_contents:
with zipfile.ZipFile(filepath, "r") as f:
f.extractall(destination)
Path.unlink(filepath)
return export["id"], filepath, destination

def get_upload_files_count(self):
if not self._temp_dir:
self._temp_dir = tempfile.TemporaryDirectory()
self.download_to_local_storage(self._temp_dir.name)
return len(list(Path(self._temp_dir.name).rglob("*.*")))

self._response.data = self._folder_path
def execute(self):
if self.is_valid():
if self._to_s3_bucket:
self.get_upload_files_count()
yield from self.upload_to_s3_from_folder(self._temp_dir.name)
logger.info(f"Exported to AWS {self._to_s3_bucket}/{self._folder_path}")
self._temp_dir.cleanup()
else:
export_id, filepath, destination = self.download_to_local_storage(
self._folder_path
)
if self._extract_zip_contents:
logger.info(f"Extracted {filepath} to folder {destination}")
else:
logger.info(f"Downloaded export ID {export_id} to {filepath}")
yield
return self._response


Expand Down
3 changes: 1 addition & 2 deletions src/superannotate/lib/infrastructure/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1386,15 +1386,14 @@ def download_export(
to_s3_bucket: bool,
):
project = self._get_project(project_name)
use_case = usecases.DownloadExportUseCase(
return usecases.DownloadExportUseCase(
service=self._backend_client,
project=project,
export_name=export_name,
folder_path=folder_path,
extract_zip_contents=extract_zip_contents,
to_s3_bucket=to_s3_bucket,
)
return use_case.execute()

def download_ml_model(self, model_data: dict, download_path: str):
model = MLModelEntity(
Expand Down
8 changes: 1 addition & 7 deletions tests/integration/test_depricated_functions_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,18 @@ class TestDeprecatedFunctionsVideo(BaseTestCase):
EXCEPTION_MESSAGE = LIMITED_FUNCTIONS[ProjectType.VIDEO.value]
EXCEPTION_MESSAGE_DOCUMENT_VIDEO = DEPRICATED_DOCUMENT_VIDEO_MESSAGE


def setUp(self, *args, **kwargs):
self.tearDown()

sa.create_project(
self.PROJECT_NAME, self.PROJECT_DESCRIPTION, self.PROJECT_TYPE
)

sa.create_project(
self.PROJECT_NAME_2, self.PROJECT_DESCRIPTION_2, self.PROJECT_TYPE_2
)

def tearDown(self) -> None:
projects = sa.search_projects(self.PROJECT_NAME, return_metadata=True)
for project in projects:
sa.delete_project(project)

projects = sa.search_projects(self.PROJECT_NAME_2, return_metadata=True)
projects.extend(sa.search_projects(self.PROJECT_NAME_2, return_metadata=True))
for project in projects:
sa.delete_project(project)

Expand Down