Skip to content

Commit

Permalink
Merge branch 'feature/multithread_optimization' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
filiptubic committed Aug 8, 2019
2 parents 7412e35 + f7949c2 commit c1d0b7a
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 15 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
0.23.0 (2019-08-08)
==================

- Improvements:
- Added optimization for multi-thread applications
- Bulk operations in file and task models are logged in debug level

0.22.0 (2019-08-01)
==================

Expand Down
27 changes: 27 additions & 0 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,33 @@ Proxy configuration can be supplied in three different ways.
.. note:: Once you set the proxy, all calls including upload and download will use the proxy settings.


Low level configuration
---------------------------------
For low level library tweaking such as: number of cached connections and number of parallel requests (useful only for multi-thread applications)
there are few useful arguments when instantiating Api.

- Three arguments are directly exposed from requests library. Argument `pool_connections` is the number of urllib3 connection pools to cache.
Argument `pool_maxsize` gives ability to constraint maximum number of parallel requests to save in the pool while `pool_block` flag tells
whether the connection pool should block for connections. More detailed explanation of those arguments can be found
`here <http://2.python-requests.org/en/master/api/?highlight=pool#requests.adapters.HTTPAdapter>`_.

.. code:: python
api = sb.Api(
pool_connections=<CONN_NUMBER>,
pool_maxsize=<POOL_SIZE>,
pool_block=<BLOCK_FLAG>
)
- There is a way to throttle number of parallel requests over all connection pools. That is done using `max_parallel_requests` argument.

.. code:: python
api = sb.Api(max_parallel_requests=<MAX_PARALLEL>)
.. note:: Changing those values from default could affect performance.


Rate limit
----------

Expand Down
2 changes: 1 addition & 1 deletion sevenbridges/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import ssl
import logging

__version__ = "0.22.0"
__version__ = "0.23.0"

from sevenbridges.api import Api
from sevenbridges.config import Config
Expand Down
19 changes: 16 additions & 3 deletions sevenbridges/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from concurrent.futures import ThreadPoolExecutor

from sevenbridges.http.client import HttpClient
from requests.adapters import DEFAULT_POOLSIZE

from sevenbridges.http.client import HttpClient
from sevenbridges.models.app import App
from sevenbridges.models.file import File
from sevenbridges.models.task import Task
Expand Down Expand Up @@ -49,7 +50,9 @@ class Api(HttpClient):

def __init__(self, url=None, token=None, oauth_token=None, config=None,
timeout=None, download_max_workers=16, upload_max_workers=16,
proxies=None, error_handlers=None, advance_access=False):
proxies=None, error_handlers=None, advance_access=False,
pool_connections=DEFAULT_POOLSIZE, pool_maxsize=100,
pool_block=True, max_parallel_requests=100):
"""
Initializes api object.
Expand All @@ -63,12 +66,22 @@ def __init__(self, url=None, token=None, oauth_token=None, config=None,
:param proxies: Proxy settings if any.
:param error_handlers: List of error handlers - callables.
:param advance_access: If True advance access features will be enabled.
:param pool_connections: The number of urllib3 connection pools to
cache.
:param pool_maxsize: The maximum number of connections to save in the
pool.
:param pool_block: Whether the connection pool should block for
connections.
:param max_parallel_requests: Number which indicates number of parallel
requests, only useful for multi thread applications.
:return: Api object instance.
"""
super(Api, self).__init__(
url=url, token=token, oauth_token=oauth_token, config=config,
timeout=timeout, proxies=proxies, error_handlers=error_handlers,
advance_access=advance_access
advance_access=advance_access, pool_connections=pool_connections,
pool_maxsize=pool_maxsize, pool_block=pool_block,
max_parallel_requests=max_parallel_requests
)

self.download_pool = ThreadPoolExecutor(
Expand Down
15 changes: 15 additions & 0 deletions sevenbridges/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ def wrapper(*args, **kwargs):
return func


def throttle(func):
"""Throttles number of parallel requests made by threads from single
HttpClient session."""

# noinspection PyProtectedMember
@functools.wraps(func)
def wrapper(http_client, *args, **kwargs):
if http_client._throttle_limit:
with http_client._throttle_limit:
return func(http_client, *args, **kwargs)
else:
return func(http_client, *args, **kwargs)
return wrapper


def check_for_error(func):
"""
Executes the wrapped function and inspects the response object
Expand Down
38 changes: 34 additions & 4 deletions sevenbridges/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import json
import logging
import platform
import threading
from datetime import datetime as dt

import requests

import sevenbridges
from sevenbridges.decorators import check_for_error
from sevenbridges.decorators import check_for_error, throttle
from sevenbridges.errors import SbgError, URITooLong
from sevenbridges.config import Config, format_proxies
from sevenbridges.http.error_handlers import maintenance_sleeper
Expand Down Expand Up @@ -48,13 +49,26 @@ def send(self, request, **kwargs):
return super(RequestSession, self).send(request, **kwargs)


def generate_session(proxies=None):
def generate_session(pool_connections, pool_maxsize, pool_block, proxies=None):
"""
Utility method to generate request sessions.
:param pool_connections: The number of urllib3 connection pools to
cache.
:param pool_maxsize: The maximum number of connections to save in the
pool.
:param pool_block: Whether the connection pool should block for
connections.
:param proxies: Proxies dictionary.
:return: requests.Session object.
"""
session = RequestSession()
adapter = requests.adapters.HTTPAdapter(
pool_connections=pool_connections,
pool_maxsize=pool_maxsize,
pool_block=pool_block
)
session.mount('http://', adapter)
session.mount('https://', adapter)
session.proxies = proxies
return session

Expand Down Expand Up @@ -95,7 +109,9 @@ class HttpClient(object):

def __init__(self, url=None, token=None, oauth_token=None, config=None,
timeout=None, proxies=None, error_handlers=None,
advance_access=False):
advance_access=False, pool_connections=None,
pool_maxsize=None, pool_block=True,
max_parallel_requests=None):

if (url, token, config) == (None, None, None):
url, token, proxies, advance_access = config_vars(
Expand All @@ -122,8 +138,21 @@ def __init__(self, url=None, token=None, oauth_token=None, config=None,
)

self.url = url.rstrip('/')
self._session = generate_session(proxies)
self.pool_connections = pool_connections
self.pool_maxsize = pool_maxsize
self.pool_block = pool_block
self._session = generate_session(
pool_connections,
pool_maxsize,
pool_block,
proxies=proxies
)
self.timeout = timeout
self._throttle_limit = (
threading.Semaphore(max_parallel_requests)
if max_parallel_requests
else None
)
self._limit = None
self._remaining = None
self._reset = None
Expand Down Expand Up @@ -195,6 +224,7 @@ def remove_error_handler(self, handler):
def _rate_limit(self):
self._request('GET', url='/rate_limit', append_base=True)

@throttle
@check_for_error
def _request(self, verb, url, headers=None, params=None, data=None,
append_base=False, stream=False):
Expand Down
8 changes: 4 additions & 4 deletions sevenbridges/models/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def bulk_get(cls, files, api=None):
file_ids = [Transform.to_file(file_) for file_ in files]
data = {'file_ids': file_ids}

logger.info('Getting files in bulk.')
logger.debug('Getting files in bulk.')
response = api.post(url=cls._URL['bulk_get'], data=data)
return FileBulkRecord.parse_records(response=response, api=api)

Expand All @@ -422,7 +422,7 @@ def bulk_delete(cls, files, api=None):
file_ids = [Transform.to_file(file_) for file_ in files]
data = {'file_ids': file_ids}

logger.info('Deleting files in bulk.')
logger.debug('Deleting files in bulk.')
response = api.post(url=cls._URL['bulk_delete'], data=data)
return FileBulkRecord.parse_records(response=response, api=api)

Expand Down Expand Up @@ -454,7 +454,7 @@ def bulk_update(cls, files, api=None):
]
}

logger.info('Updating files in bulk.')
logger.debug('Updating files in bulk.')
response = api.post(url=cls._URL['bulk_update'], data=data)
return FileBulkRecord.parse_records(response=response, api=api)

Expand Down Expand Up @@ -486,7 +486,7 @@ def bulk_edit(cls, files, api=None):
]
}

logger.info('Editing files in bulk.')
logger.debug('Editing files in bulk.')
response = api.post(url=cls._URL['bulk_edit'], data=data)
return FileBulkRecord.parse_records(response=response, api=api)

Expand Down
2 changes: 1 addition & 1 deletion sevenbridges/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def bulk_get(cls, tasks, api=None):
task_ids = [Transform.to_task(task) for task in tasks]
data = {'task_ids': task_ids}

logger.info('Getting tasks in bulk.')
logger.debug('Getting tasks in bulk.')
response = api.post(url=cls._URL['bulk_get'], data=data)
return TaskBulkRecord.parse_records(response=response, api=api)

Expand Down
7 changes: 6 additions & 1 deletion sevenbridges/transfer/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ def __init__(self, url, file_path,
self._progress_callback = None
self._time_started = 0

self._session = generate_session(self._api.session.proxies)
self._session = generate_session(
self._api.pool_connections,
self._api.pool_maxsize,
self._api.pool_block,
self._api.session.proxies
)

try:
self._file_size = self._get_file_size()
Expand Down
7 changes: 6 additions & 1 deletion sevenbridges/transfer/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,12 @@ def __init__(self, file_path, project=None, parent=None, file_name=None,
self._stop_signal = False
self._result = None

self.session = generate_session(self._api.session.proxies)
self.session = generate_session(
self._api.pool_connections,
self._api.pool_maxsize,
self._api.pool_block,
self._api.session.proxies
)

def __repr__(self):
return six.text_type(
Expand Down

0 comments on commit c1d0b7a

Please sign in to comment.