Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/superannotate.sdk.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ ________
.. autofunction:: superannotate.upload_annotations_from_folder_to_project
.. autofunction:: superannotate.upload_preannotations_from_folder_to_project
.. autofunction:: superannotate.share_project
.. autofunction:: superannotate.add_contributors_to_project
.. autofunction:: superannotate.get_project_settings
.. autofunction:: superannotate.set_project_default_image_quality_in_editor
.. autofunction:: superannotate.get_project_workflow
Expand Down
5 changes: 3 additions & 2 deletions src/superannotate/lib/app/interface/sdk_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from lib.core.types import MLModel
from lib.core.types import Project
from lib.infrastructure.controller import Controller
from pydantic import conlist
from pydantic import parse_obj_as
from pydantic import StrictBool
from tqdm import tqdm
Expand Down Expand Up @@ -2904,7 +2905,7 @@ def validate_annotations(
@Trackable
@validate_arguments
def add_contributors_to_project(
project: NotEmptyStr, emails: List[EmailStr], role: AnnotatorRole
project: NotEmptyStr, emails: conlist(EmailStr, min_items=1), role: AnnotatorRole
) -> Tuple[List[str], List[str]]:
"""Add contributors to project.

Expand All @@ -2930,7 +2931,7 @@ def add_contributors_to_project(

@Trackable
@validate_arguments
def invite_contributors_to_team(emails: List[EmailStr], admin: StrictBool = False) -> Tuple[List[str], List[str]]:
def invite_contributors_to_team(emails: conlist(EmailStr, min_items=1), admin: StrictBool = False) -> Tuple[List[str], List[str]]:
"""Invites contributors to the team.

:param emails: list of contributor emails
Expand Down
2 changes: 1 addition & 1 deletion src/superannotate/lib/app/interface/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def validate(cls, value: Union[str]) -> Union[str]:
value = value[: cls.curtail_length]
if value.lower() not in [role.lower() for role in cls.ANNOTATOR_ROLES]:
raise TypeError(
f"Available statuses is {', '.join(AnnotatorRole)}. "
f"Invalid user role provided. Please specify one of {', '.join(cls.ANNOTATOR_ROLES)}. "
)
return value

Expand Down
24 changes: 23 additions & 1 deletion src/superannotate/lib/core/usecases/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from abc import ABC
from abc import ABCMeta
from abc import abstractmethod
from typing import Iterable
from typing import List

from lib.core.exceptions import AppValidationException
from lib.core.reporter import Reporter
Expand Down Expand Up @@ -56,7 +58,27 @@ def execute(self) -> Iterable:
raise NotImplementedError


class BaseReportableUseCae(BaseUseCase):
class BaseReportableUseCae(BaseUseCase, metaclass=ABCMeta):
def __init__(self, reporter: Reporter):
super().__init__()
self.reporter = reporter


class BaseUserBasedUseCase(BaseReportableUseCae, metaclass=ABCMeta):
"""
class contain validation of unique emails
"""
def __init__(self, reporter: Reporter, emails: List[str]):
super().__init__(reporter)
self._emails = emails

def validate_emails(self):
emails_to_add = set()
duplicated_emails = [
email for email in self._emails if email not in emails_to_add and not emails_to_add.add(email)
]
if duplicated_emails:
self.reporter.log_info(
f"Dropping duplicates. Found {len(duplicated_emails)}/{len(self._emails)} unique users."
)
self._emails = emails_to_add
141 changes: 71 additions & 70 deletions src/superannotate/lib/core/usecases/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from lib.core.serviceproviders import SuerannotateServiceProvider
from lib.core.usecases.base import BaseReportableUseCae
from lib.core.usecases.base import BaseUseCase
from lib.core.usecases.base import BaseUserBasedUseCase
from requests.exceptions import RequestException

logger = logging.getLogger("root")
Expand Down Expand Up @@ -941,7 +942,7 @@ def execute(self):
return self._response


class AddContributorsToProject(BaseReportableUseCae):
class AddContributorsToProject(BaseUserBasedUseCase):
"""
Returns tuple of lists (added, skipped)
"""
Expand All @@ -955,10 +956,9 @@ def __init__(
role: str,
service: SuerannotateServiceProvider,
):
super().__init__(reporter)
super().__init__(reporter, emails)
self._team = team
self._project = project
self._emails = emails
self._role = role
self._service = service

Expand All @@ -967,47 +967,48 @@ def user_role(self):
return constances.UserRole.get_value(self._role)

def execute(self):
team_users = set()
project_users = {user["user_id"] for user in self._project.users}
for user in self._team.users:
if user.user_role > constances.UserRole.ADMIN.value:
team_users.add(user.email)
# collecting pending team users which is not admin
for user in self._team.pending_invitations:
if user["user_role"] > constances.UserRole.ADMIN.value:
team_users.add(user["email"])
# collecting pending project users which is not admin
for user in self._project.unverified_users:
if user["user_role"] > constances.UserRole.ADMIN.value:
project_users.add(user["email"])

to_add = list(team_users.intersection(self._emails) - project_users)
to_skip = list(set(self._emails).difference(to_add))

if to_skip:
self.reporter.log_warning(
f"Skipped {len(to_skip)}/{len(self._emails)} "
"contributors that are out of the team scope or already have access to the project."
)
if to_add:
response = self._service.share_project_bulk(
team_id=self._team.uuid,
project_id=self._project.uuid,
users=[
dict(user_id=user_id, user_role=self.user_role)
for user_id in to_add
],
)
if response and not response.get("invalidUsers"):
self.reporter.log_info(
f"Added {len(to_add)}/{len(self._emails)} "
f"contributors to the project {self._project.name} with the {self.user_role} role."
if self.is_valid():
team_users = set()
project_users = {user["user_id"] for user in self._project.users}
for user in self._team.users:
if user.user_role > constances.UserRole.ADMIN.value:
team_users.add(user.email)
# collecting pending team users which is not admin
for user in self._team.pending_invitations:
if user["user_role"] > constances.UserRole.ADMIN.value:
team_users.add(user["email"])
# collecting pending project users which is not admin
for user in self._project.unverified_users:
if user["user_role"] > constances.UserRole.ADMIN.value:
project_users.add(user["email"])

to_add = list(team_users.intersection(self._emails) - project_users)
to_skip = list(set(self._emails).difference(to_add))

if to_skip:
self.reporter.log_warning(
f"Skipped {len(to_skip)}/{len(self._emails)} "
"contributors that are out of the team scope or already have access to the project."
)
self._response.data = to_add, to_skip
return self._response
if to_add:
response = self._service.share_project_bulk(
team_id=self._team.uuid,
project_id=self._project.uuid,
users=[
dict(user_id=user_id, user_role=self.user_role)
for user_id in to_add
],
)
if response and not response.get("invalidUsers"):
self.reporter.log_info(
f"Added {len(to_add)}/{len(self._emails)} "
f"contributors to the project {self._project.name} with the {self._role} role."
)
self._response.data = to_add, to_skip
return self._response


class InviteContributorsToTeam(BaseReportableUseCae):
class InviteContributorsToTeam(BaseUserBasedUseCase):
"""
Returns tuple of lists (added, skipped)
"""
Expand All @@ -1020,42 +1021,42 @@ def __init__(
set_admin: bool,
service: SuerannotateServiceProvider,
):
super().__init__(reporter)
super().__init__(reporter, emails)
self._team = team
self._emails = emails
self._set_admin = set_admin
self._service = service

def execute(self):
team_users = {user.email for user in self._team.users}
# collecting pending team users
team_users.update({user["email"] for user in self._team.pending_invitations})
if self.is_valid():
team_users = {user.email for user in self._team.users}
# collecting pending team users
team_users.update({user["email"] for user in self._team.pending_invitations})

emails = set(self._emails)
emails = set(self._emails)

to_skip = list(emails.intersection(team_users))
to_add = list(emails.difference(to_skip))
to_skip = list(emails.intersection(team_users))
to_add = list(emails.difference(to_skip))

if to_skip:
self.reporter.log_warning(
f"Found {len(to_skip)}/{len(self._emails)} existing members of the team."
)
if to_add:
invited, failed = self._service.invite_contributors(
team_id=self._team.uuid,
# REMINDER UserRole.VIEWER is the contributor for the teams
team_role=constances.UserRole.ADMIN.value if self._set_admin else constances.UserRole.VIEWER.value,
emails=to_add
)
if invited:
self.reporter.log_info(
f"Sent team {'admin' if self._set_admin else 'contributor'} invitations"
f" to {len(to_add)}/{len(self._emails)} users."
if to_skip:
self.reporter.log_warning(
f"Found {len(to_skip)}/{len(self._emails)} existing members of the team."
)
if failed:
self.reporter.log_info(
f"Skipped team {'admin' if self._set_admin else 'contributor'} "
f"invitations for {len(failed)}/{len(self._emails)} users."
if to_add:
invited, failed = self._service.invite_contributors(
team_id=self._team.uuid,
# REMINDER UserRole.VIEWER is the contributor for the teams
team_role=constances.UserRole.ADMIN.value if self._set_admin else constances.UserRole.VIEWER.value,
emails=to_add
)
self._response.data = to_add, to_skip
return self._response
if invited:
self.reporter.log_info(
f"Sent team {'admin' if self._set_admin else 'contributor'} invitations"
f" to {len(to_add)}/{len(self._emails)} users."
)
if failed:
self.reporter.log_info(
f"Skipped team {'admin' if self._set_admin else 'contributor'} "
f"invitations for {len(failed)}/{len(self._emails)} users."
)
self._response.data = to_add, to_skip
return self._response