Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,5 @@ dmypy.json
# Pyre type checker
.pyre/

# End of https://www.gitignore.io/api/python
AGENTS.md
PROMPTS.md
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ profile = "black"
[tool.black]
line-length = 119
target-version = ["py310"]

[tool.pytest.ini_options]
minversion = "6.0"
required_plugins = "pytest-cov"
testpaths = ["tests"]
asyncio_default_fixture_loop_scope = "session"
87 changes: 57 additions & 30 deletions reportportal_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@
# limitations under the License

"""This package is the base package for ReportPortal client."""
import typing
import warnings

import sys
from typing import Optional, Tuple, TypedDict, Union

# noinspection PyUnreachableCode
if sys.version_info >= (3, 11):
from typing import Unpack
else:
from typing_extensions import Unpack

import aenum

Expand All @@ -34,74 +41,94 @@ class ClientType(aenum.Enum):
ASYNC_BATCHED = aenum.auto()


class _ClientOptions(TypedDict, total=False):
client_type: ClientType
endpoint: str
project: str
api_key: Optional[str]
# OAuth 2.0 parameters
oauth_uri: Optional[str]
oauth_username: Optional[str]
oauth_password: Optional[str]
oauth_client_id: Optional[str]
oauth_client_secret: Optional[str]
oauth_scope: Optional[str]
# Common client parameters
launch_uuid: Optional[str]
is_skipped_an_issue: bool
verify_ssl: Union[bool, str]
retries: int
max_pool_size: int
http_timeout: Union[float, Tuple[float, float]]
mode: str
launch_uuid_print: bool
print_output: OutputType
truncate_attributes: bool
log_batch_size: int
log_batch_payload_limit: int
# Async client specific parameters
keepalive_timeout: float
# Async threaded/batched client specific parameters
task_timeout: float
shutdown_timeout: float
# Async batched client specific parameters
trigger_num: int
trigger_interval: float


# noinspection PyIncorrectDocstring
def create_client(
client_type: ClientType, endpoint: str, project: str, *, api_key: str = None, **kwargs: typing.Any
) -> typing.Optional[RP]:
client_type: ClientType, endpoint: str, project: str, **kwargs: Unpack[_ClientOptions]
) -> Optional[RP]:
"""Create and ReportPortal Client based on the type and arguments provided.

:param client_type: Type of the Client to create.
:type client_type: ClientType
:param endpoint: Endpoint of the ReportPortal service.
:type endpoint: str
:param project: Project name to report to.
:type project: str
:param api_key: Authorization API key.
:type api_key: str
:param oauth_uri: OAuth 2.0 token endpoint URI (for OAuth authentication).
:param oauth_username: Username for OAuth 2.0 authentication.
:param oauth_password: Password for OAuth 2.0 authentication.
:param oauth_client_id: OAuth 2.0 client ID.
:param oauth_client_secret: OAuth 2.0 client secret (optional).
:param oauth_scope: OAuth 2.0 scope (optional).
:param launch_uuid: A launch UUID to use instead of starting own one.
:type launch_uuid: str
:param is_skipped_an_issue: Option to mark skipped tests as not 'To Investigate' items on the server
side.
:type is_skipped_an_issue: bool
:param verify_ssl: Option to skip ssl verification.
:type verify_ssl: typing.Union[bool, str]
:param retries: Number of retry attempts to make in case of connection / server
errors.
:type retries: int
:param max_pool_size: Option to set the maximum number of connections to save the pool.
:type max_pool_size: int
:param http_timeout : A float in seconds for connect and read timeout. Use a Tuple to
specific connect and read separately.
:type http_timeout: Tuple[float, float]
:param mode: Launch mode, all Launches started by the client will be in that mode.
:type mode: str
:param launch_uuid_print: Print Launch UUID into passed TextIO or by default to stdout.
:type launch_uuid_print: bool
:param print_output: Set output stream for Launch UUID printing.
:type print_output: OutputType
:param truncate_attributes: Truncate test item attributes to default maximum length.
:type truncate_attributes: bool
:param log_batch_size: Option to set the maximum number of logs that can be processed in one
batch.
:type log_batch_size: int
:param log_batch_payload_limit: Maximum size in bytes of logs that can be processed in one batch.
:type log_batch_payload_limit: int
:param keepalive_timeout: For Async Clients only. Maximum amount of idle time in seconds before
force connection closing.
:type keepalive_timeout: int
:param task_timeout: For Async Threaded and Batched Clients only. Time limit in seconds for a
Task processing.
:type task_timeout: float
:param shutdown_timeout: For Async Threaded and Batched Clients only. Time limit in seconds for
shutting down internal Tasks.
:type shutdown_timeout: float
:param trigger_num: For Async Batched Client only. Number of tasks which triggers Task batch
execution.
:type trigger_num: int
:param trigger_interval: For Async Batched Client only. Time limit which triggers Task batch
execution.
:type trigger_interval: float
:return: ReportPortal Client instance.
"""
if client_type is ClientType.SYNC:
return RPClient(endpoint, project, api_key=api_key, **kwargs)
return RPClient(endpoint, project, **kwargs)
if client_type is ClientType.ASYNC:
return AsyncRPClient(endpoint, project, api_key=api_key, **kwargs)
return AsyncRPClient(endpoint, project, **kwargs)
if client_type is ClientType.ASYNC_THREAD:
return ThreadedRPClient(endpoint, project, api_key=api_key, **kwargs)
return ThreadedRPClient(endpoint, project, **kwargs)
if client_type is ClientType.ASYNC_BATCHED:
return BatchedRPClient(endpoint, project, api_key=api_key, **kwargs)
warnings.warn(f"Unknown ReportPortal Client type requested: {client_type}", RuntimeWarning, stacklevel=2)
return BatchedRPClient(endpoint, project, **kwargs)
raise ValueError(f"Unknown ReportPortal Client type requested: {client_type}")


__all__ = [
Expand Down
98 changes: 94 additions & 4 deletions reportportal_client/_internal/aio/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,20 @@
import asyncio
import sys
from types import TracebackType
from typing import Any, Callable, Coroutine, Optional, Type
from typing import Any, Callable, Coroutine, Optional, Type, Union

from aenum import Enum
from aiohttp import ClientResponse, ClientResponseError, ClientSession, ServerConnectionError
from aiohttp import ClientResponse, ClientResponseError
from aiohttp import ClientSession as AioHttpClientSession
from aiohttp import ServerConnectionError

from reportportal_client._internal.services.auth import AuthAsync

DEFAULT_RETRY_NUMBER: int = 5
DEFAULT_RETRY_DELAY: float = 0.005
THROTTLING_STATUSES: set = {425, 429}
RETRY_STATUSES: set = {408, 500, 502, 503, 507}.union(THROTTLING_STATUSES)
AUTH_PROBLEM_STATUSES: set = {401, 403}


class RetryClass(int, Enum):
Expand All @@ -46,7 +51,7 @@ class RetryClass(int, Enum):
class RetryingClientSession:
"""Class uses aiohttp.ClientSession.request method and adds request retry logic."""

_client: ClientSession
_client: AioHttpClientSession
__retry_number: int
__retry_delay: float

Expand All @@ -68,7 +73,7 @@ def __init__(
an error. Real value highly depends on Retry Class and Retry attempt number,
since retries are performed in exponential delay manner
"""
self._client = ClientSession(*args, **kwargs)
self._client = AioHttpClientSession(*args, **kwargs)
self.__retry_number = max_retry_number
self.__retry_delay = base_retry_delay

Expand All @@ -91,8 +96,12 @@ async def __request(self, method: Callable, url, **kwargs: Any) -> ClientRespons
"""
result = None
exceptions = []

for i in range(self.__retry_number + 1): # add one for the first attempt, which is not a retry
retry_factor = None
if result is not None:
# Release previous result to return connection to pool
await result.release()
try:
result = await method(url, **kwargs)
except Exception as exc:
Expand Down Expand Up @@ -157,3 +166,84 @@ async def __aexit__(
) -> None:
"""Auxiliary method which controls what `async with` construction does on block exit."""
await self.close()


class ClientSession:
"""Class wraps aiohttp.ClientSession or RetryingClientSession and adds authentication support."""

_client: Union[AioHttpClientSession, RetryingClientSession]
__auth: Optional[AuthAsync]

def __init__(
self,
wrapped: Union[AioHttpClientSession, RetryingClientSession],
auth: Optional[AuthAsync] = None,
):
"""Initialize an instance of the session with arguments.

:param wrapped: aiohttp.ClientSession or RetryingClientSession instance to wrap
:param auth: authentication instance to use for requests
"""
self._client = wrapped
self.__auth = auth

async def __request(self, method: Callable, url: str, **kwargs: Any) -> ClientResponse:
"""Make a request with authentication support.

The method adds Authorization header if auth is configured and handles auth refresh
on 401/403 responses.
"""
# Clone kwargs and add Authorization header if auth is configured
request_kwargs = kwargs.copy()
if self.__auth:
auth_header = await self.__auth.get()
if auth_header:
if "headers" not in request_kwargs:
request_kwargs["headers"] = {}
else:
request_kwargs["headers"] = request_kwargs["headers"].copy()
request_kwargs["headers"]["Authorization"] = auth_header

result = await method(url, **request_kwargs)

# Check for authentication errors
if result.status in AUTH_PROBLEM_STATUSES and self.__auth:
refreshed_header = await self.__auth.refresh()
if refreshed_header:
# Release previous result to return connection to pool
await result.release()
# Retry with new auth header
request_kwargs["headers"] = request_kwargs.get("headers", {}).copy()
request_kwargs["headers"]["Authorization"] = refreshed_header
result = await method(url, **request_kwargs)

return result

def get(self, url: str, *, allow_redirects: bool = True, **kwargs: Any) -> Coroutine[Any, Any, ClientResponse]:
"""Perform HTTP GET request."""
return self.__request(self._client.get, url, allow_redirects=allow_redirects, **kwargs)

def post(self, url: str, *, data: Any = None, **kwargs: Any) -> Coroutine[Any, Any, ClientResponse]:
"""Perform HTTP POST request."""
return self.__request(self._client.post, url, data=data, **kwargs)

def put(self, url: str, *, data: Any = None, **kwargs: Any) -> Coroutine[Any, Any, ClientResponse]:
"""Perform HTTP PUT request."""
return self.__request(self._client.put, url, data=data, **kwargs)

def close(self) -> Coroutine:
"""Gracefully close internal session instance."""
return self._client.close()

async def __aenter__(self) -> "ClientSession":
"""Auxiliary method which controls what `async with` construction does on block enter."""
return self

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""Auxiliary method which controls what `async with` construction does on block exit."""
await self.close()
30 changes: 18 additions & 12 deletions reportportal_client/_internal/aio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,24 @@ def append(self, value: _T) -> Optional[List[_T]]:
:return: a batch or None
"""
self.__task_list.append(value)
if self.__ready_to_run():
tasks = self.__task_list
self.__task_list = []
return tasks
if not self.__ready_to_run():
return None

tasks = self.__task_list
self.__task_list = []
return tasks

def flush(self) -> Optional[List[_T]]:
"""Immediately return everything what's left in the internal batch.

:return: a batch or None
"""
if len(self.__task_list) > 0:
tasks = self.__task_list
self.__task_list = []
return tasks
if len(self.__task_list) <= 0:
return None

tasks = self.__task_list
self.__task_list = []
return tasks


class BackgroundTaskList(Generic[_T]):
Expand Down Expand Up @@ -224,7 +228,9 @@ def flush(self) -> Optional[List[_T]]:
:return: a batch or None
"""
self.__remove_finished()
if len(self.__task_list) > 0:
tasks = self.__task_list
self.__task_list = []
return tasks
if len(self.__task_list) <= 0:
return None

tasks = self.__task_list
self.__task_list = []
return tasks
Loading