Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Commit

Permalink
Change how Nectar configures requests to be thread-safe
Browse files Browse the repository at this point in the history
Previously, Nectar set the configuration at a session level. This didn't
cause problems because Nectar is only ever used to download with one set
of settings. However, the pulp_streamer has to change the configuration
on a request-by-request basis _and_ it needs to use a session to pool
connections. This leads to trouble when attempting to update the
configuration since Nectar changes it for all requests rather than a
specific one.

This patch instead adds a method to translate a Nectar configuration to
a set of requests kwargs that work with the ``requests.request`` API.
This allows the streamer to stash its session instance between instances
of a Nectar HTTPThreadedDownloader without corruption.

fixes #2111
  • Loading branch information
jeremycline committed Aug 16, 2016
1 parent 7699227 commit 4fe7327
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 15 deletions.
76 changes: 64 additions & 12 deletions nectar/downloaders/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from logging import getLogger

import requests
from requests.packages.urllib3.util import retry
from requests.packages.urllib3.util import retry, url as urllib3_url

from nectar.config import HTTPBasicWithProxyAuth
from nectar.downloaders.base import Downloader
Expand Down Expand Up @@ -88,15 +88,16 @@ def __init__(self, config, event_listener=None, tries=DEFAULT_TRIES, session=Non
# set of locations that produced a connection error
self.failed_netlocs = set([])

self.session = session or build_session(config)
if not session:
session = requests.Session()
retry_conf = retry.Retry(total=tries, connect=tries, read=tries, backoff_factor=1)
retry_conf.BACKOFF_MAX = 8
adapter = requests.adapters.HTTPAdapter(max_retries=retry_conf)
session.mount('http://', adapter)
session.mount('https://', adapter)

# Configure an adapter to retry failed requests. See urllib3's documentation
# for details on each argument.
retry_conf = retry.Retry(total=tries, connect=tries, read=tries, backoff_factor=1)
retry_conf.BACKOFF_MAX = 8
adapter = requests.adapters.HTTPAdapter(max_retries=retry_conf)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
self.session = session
self.session.stream = True

@property
def buffer_size(self):
Expand All @@ -111,6 +112,56 @@ def progress_interval(self):
seconds = self.config.progress_interval or DEFAULT_PROGRESS_INTERVAL
return datetime.timedelta(seconds=seconds)

@staticmethod
def requests_kwargs_from_nectar_config(config):
"""
Take a Nectar configuration and map it to a set of requests keyword arguments.
These keyword arguments can be used with the Python requests ``requests.request``
API. In the future when Nectar is just a memory, this can be adapted to map a
Pulp importer configuration to requests kwargs.
:param config: A nectar configuration instance
:type config: nectar.config.DownloaderConfig
:return: A dictionary of keyword arguments for the requests API.
:rtype: dict
"""
requests_kwargs = {}

# Configure basic authentication
if config.basic_auth_username and config.basic_auth_password:
requests_kwargs['auth'] = (config.basic_auth_username, config.basic_auth_password)

# Configure verification of the server's TLS certificates; defaults to the system trust store.
if config.ssl_validation is not False:
if config.ssl_ca_cert_path:
requests_kwargs['verify'] = config.ssl_ca_cert_path
else:
requests_kwargs['verify'] = True
else:
requests_kwargs['verify'] = False

# Configure client-side certificate authentication
if config.ssl_client_cert_path and config.ssl_client_key_path:
requests_kwargs['cert'] = (config.ssl_client_cert_path, config.ssl_client_key_path)

# Configure proxy servers and proxy authentication.
#
# Annoyingly, although the config is called 'proxy_url', the port and basic auth
# credentials are defined separately, so we have to build the url.
if config.proxy_url and config.proxy_port:
parsed_url = urllib3_url.parse_url(config.proxy_url)
proxy_auth = None
if config.proxy_username and config.proxy_password:
proxy_auth = '{user}:{password}'.format(user=config.proxy_username,
password=config.proxy_password)
parsed_url = urllib3_url.Url(scheme=parsed_url.scheme, auth=proxy_auth,
host=parsed_url.host, port=config.proxy_port)
requests_kwargs['proxies'] = {'http': parsed_url.url, 'https': parsed_url.url}

return requests_kwargs

def worker(self, queue):
"""
:param queue: queue of DownloadRequest instances
Expand All @@ -133,7 +184,6 @@ def worker(self, queue):
def download(self, request_list):
worker_threads = []
queue = WorkerQueue(request_list)
self.session = build_session(self.config, self.session)

_logger.debug('starting workers')
for i in range(self.max_concurrent):
Expand Down Expand Up @@ -188,7 +238,6 @@ def _download_one(self, request):
:return: download report
:rtype: nectar.report.DownloadReport
"""
self.session = build_session(self.config, self.session)
return self._fetch(request)

def _fetch(self, request):
Expand Down Expand Up @@ -216,9 +265,11 @@ def _fetch(self, request):
raise SkipLocation()

_logger.debug("Attempting to connect to {url}.".format(url=request.url))
requests_kwargs = self.requests_kwargs_from_nectar_config(self.config)
response = self.session.get(request.url, headers=headers,
timeout=(self.config.connect_timeout,
self.config.read_timeout))
self.config.read_timeout),
**requests_kwargs)
report.headers = response.headers
self.fire_download_headers(report)

Expand Down Expand Up @@ -361,6 +412,7 @@ def _fire_event_to_listener(self, event_listener_callback, *args, **kwargs):


def build_session(config, session=None):
"""This method is deprecated: it is not thread-safe."""
if session is None:
session = requests.Session()
session.stream = True # required for reading the download in chunks
Expand Down
78 changes: 75 additions & 3 deletions test/unit/test_threaded_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,76 @@ def test_instantiation(self):
self.assertEqual(downloader.progress_interval,
datetime.timedelta(seconds=threaded.DEFAULT_PROGRESS_INTERVAL))

@mock.patch('nectar.config.DownloaderConfig._process_ssl_settings', mock.Mock())
def test_requests_kwargs_from_config(self):
"""Assert that a Nectar config is translated to a requests config correctly."""
nectar_config = DownloaderConfig(
ssl_ca_cert_path='/tmp/CA.pem',
ssl_client_cert_path='/tmp/cert.pem',
ssl_client_key_path='/tmp/key.pem',
)
expected_kwargs = {'verify': '/tmp/CA.pem', 'cert': ('/tmp/cert.pem', '/tmp/key.pem')}

actual = threaded.HTTPThreadedDownloader.requests_kwargs_from_nectar_config(nectar_config)
self.assertEqual(expected_kwargs, actual)

def test_requests_kwargs_defaults_secure(self):
"""
Test that requests_kwargs_from_nectar_config creates a `proxies` kwarg
and handles a missing password properly.
"""
nectar_config = DownloaderConfig()
expected_kwargs = {'verify': True}

actual = threaded.HTTPThreadedDownloader.requests_kwargs_from_nectar_config(nectar_config)
self.assertEqual(expected_kwargs, actual)

def test_requests_kwargs_basic_auth(self):
"""
Test that requests_kwargs_from_nectar_config creates an `auth` kwarg for basic auth.
"""
nectar_config = DownloaderConfig(basic_auth_username='test', basic_auth_password='hunter2')
expected_kwargs = {'verify': True, 'auth': ('test', 'hunter2')}

actual = threaded.HTTPThreadedDownloader.requests_kwargs_from_nectar_config(nectar_config)
self.assertEqual(expected_kwargs, actual)

def test_requests_kwargs_proxy(self):
"""
Test that requests_kwargs_from_nectar_config creates a `proxies` kwarg.
"""
nectar_config = DownloaderConfig(proxy_url='http://proxy.example.com', proxy_port=3128)
expected_kwargs = {
'verify': True,
'proxies': {
'http': 'http://proxy.example.com:3128',
'https': 'http://proxy.example.com:3128',
}
}

actual = threaded.HTTPThreadedDownloader.requests_kwargs_from_nectar_config(nectar_config)
self.assertEqual(expected_kwargs, actual)

def test_requests_kwargs_basic_auth_proxy(self):
"""
Test that requests_kwargs_from_nectar_config creates a `proxies` kwarg
with basic auth credentials.
"""
nectar_config = DownloaderConfig(basic_auth_username='test', basic_auth_password='hunter2',
proxy_url='http://proxy.example.com', proxy_port=3128,
proxy_username='proxy_user', proxy_password='test@123')
expected_kwargs = {
'verify': True,
'auth': ('test', 'hunter2'),
'proxies': {
'http': 'http://proxy_user:test@123@proxy.example.com:3128',
'https': 'http://proxy_user:test@123@proxy.example.com:3128',
}
}

actual = threaded.HTTPThreadedDownloader.requests_kwargs_from_nectar_config(nectar_config)
self.assertEqual(expected_kwargs, actual)

def test_configure_session(self):
kwargs = {'basic_auth_username': 'admin',
'basic_auth_password': 'admin',
Expand Down Expand Up @@ -255,7 +325,8 @@ def test_request_headers(self):
self.session.get.assert_called_once_with(
URL,
headers={'pulp_header': 'awesome!'},
timeout=(self.config.connect_timeout, self.config.read_timeout)
timeout=(self.config.connect_timeout, self.config.read_timeout),
verify=True,
)

@mock.patch('nectar.downloaders.threaded.DownloadReport.from_download_request')
Expand Down Expand Up @@ -298,7 +369,8 @@ def test_wrong_content_encoding(self):
self.assertEqual(report.bytes_downloaded, 3)
self.session.get.assert_called_once_with(URL, headers={'accept-encoding': ''},
timeout=(self.config.connect_timeout,
self.config.read_timeout))
self.config.read_timeout),
verify=True)

def test_normal_content_encoding(self):
URL = 'http://fakeurl/primary.xml'
Expand All @@ -315,7 +387,7 @@ def test_normal_content_encoding(self):
# passing "None" for headers lets the requests library add whatever
# headers it thinks are appropriate.
self.session.get.assert_called_once_with(
URL, headers={}, timeout=(self.config.connect_timeout, self.config.read_timeout))
URL, headers={}, timeout=(self.config.connect_timeout, self.config.read_timeout), verify=True)

def test_fetch_with_connection_error(self):
"""
Expand Down

0 comments on commit 4fe7327

Please sign in to comment.