Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

moving utility functions into utils module, general clean up

  • Loading branch information...
commit b3837a3c76a9f4e41cf431e6efe7ee063f03d7db 1 parent 7da8591
@maxcountryman maxcountryman authored
View
1  rauth/hook.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
'''
rauth.hook
----------
View
1  rauth/oauth.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
'''
rauth.oauth
-----------
View
69 rauth/service.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
'''
rauth.service
-------------
@@ -5,39 +6,20 @@
Provides OAuth 1.0/a, 2.0 and Ofly service containers.
'''
-import requests
-import json
import hashlib
+import json
+import requests
from rauth.hook import OAuth1Hook
+from rauth.utils import absolute_url, parse_utf8_qsl
-from urllib import quote, urlencode
-from urlparse import parse_qsl, urlsplit
from datetime import datetime
+from urllib import quote, urlencode
+from urlparse import urlsplit
DEFAULT_TIMEOUT = 300
-def parse_utf8_qsl(s):
- d = dict(parse_qsl(s))
-
- for k, v in d.items():
- if isinstance(k, unicode) and isinstance(v, unicode):
- # skip this iteration if we have no keys or values to update
- continue
- d.pop(k)
- if not isinstance(k, unicode):
- k = unicode(k, 'utf-8')
- if not isinstance(v, unicode):
- v = unicode(v, 'utf-8')
- d[k] = v
- return d
-
-
-def is_absolute_url(url):
- return url.startswith(('http://', 'https://'))
-
-
class Request(object):
'''A container for common HTTP request methods.'''
def head(self, url, **kwargs):
@@ -243,7 +225,7 @@ def request(self, method, uri, **kwargs):
kwargs.setdefault('timeout', DEFAULT_TIMEOUT)
- if self.base_url is not None and not is_absolute_url(uri):
+ if self.base_url is not None and not absolute_url(uri):
uri = self.base_url + uri
header_auth = kwargs.pop('header_auth', False)
@@ -316,7 +298,7 @@ def __init__(self, client_id=None, client_secret=None, name=None,
self.consumer_secret = kwargs.get('consumer_secret', client_secret)
if None in (self.consumer_key, self.consumer_secret):
- raise ValueError('client_id and client_secret must not be None')
+ raise TypeError('client_id and client_secret must not be None')
self.access_token_url = access_token_url
@@ -359,7 +341,7 @@ def get_access_token(self, method='POST', **kwargs):
key = 'params'
else:
# raise an error because credentials must be sent in this method
- raise ValueError('Either params or data dict missing.')
+ raise NameError('Either params or data dict missing')
grant_type = kwargs[key].get('grant_type', 'authorization_code')
@@ -383,17 +365,17 @@ def request(self, method, uri, access_token=None, **kwargs):
:param method: A string representation of the HTTP method to be used.
:param uri: The resource to be requested.
- :param access_token: Overrides self.access_token. Defaults to None
+ :param access_token: Overrides self.access_token. Defaults to None.
:param \*\*kwargs: Optional arguments. Same as Requests.
'''
# see if we can prepend base_url
- if self.base_url is not None and not is_absolute_url(uri):
+ if self.base_url is not None and not absolute_url(uri):
uri = self.base_url + uri
# see if we can use a stored access_token
if access_token is None and self.access_token is None:
- raise ValueError('access_token must not be None')
+ raise TypeError('access_token must not be None')
if access_token is None:
access_token = self.access_token
@@ -504,7 +486,7 @@ def get_raw_request_token(self, method='GET', oauth_callback='oob',
'''
# ensure we've set the request_token_url
if self.request_token_url is None:
- raise ValueError('request_token_url must not be None')
+ raise TypeError('request_token_url must not be None')
auth_session = \
self._construct_session(header_auth=self.header_auth,
@@ -556,7 +538,7 @@ def get_access_token(self, method='GET', **kwargs):
'''
# ensure we've set the access_token_url
if self.access_token_url is None:
- raise ValueError('access_token_url must not be None')
+ raise TypeError('access_token_url must not be None')
request_token = kwargs.pop('request_token')
request_token_secret = kwargs.pop('request_token_secret')
@@ -602,8 +584,8 @@ def request(self, method, uri, access_token=None,
:param \*\*kwargs: Optional arguments. Same as Requests.
'''
header_auth = kwargs.pop('header_auth', self.header_auth)
- allow_redirects = kwargs.pop('allow_redirects', True)
+ kwargs.setdefault('allow_redirects', True)
kwargs.setdefault('headers', {})
kwargs.setdefault('params', {})
kwargs.setdefault('timeout', DEFAULT_TIMEOUT)
@@ -614,29 +596,28 @@ def request(self, method, uri, access_token=None,
'application/x-www-form-urlencoded')
# prepend a base_url to the uri if we can
- if self.base_url is not None and not is_absolute_url(uri):
+ if self.base_url is not None and not absolute_url(uri):
uri = self.base_url + uri
# check user supplied tokens
tokens = (access_token, access_token_secret)
- tokens_found = len([t for t in tokens if t is not None])
- if tokens_found and tokens_found != 2:
- raise ValueError('Either both or neither access_token and '
- 'access_token_secret must be supplied')
+ all_tokens_none = all(v is None for v in tokens)
+ if None in tokens and not all_tokens_none:
+ raise TypeError('Either both or neither access_token and '
+ 'access_token_secret must be supplied')
# use default tokens if user supplied tokens are not present
- if not tokens_found:
+ if all_tokens_none:
access_token = self.access_token
access_token_secret = self.access_token_secret
- auth_session = self._construct_session(
- access_token=access_token,
- access_token_secret=access_token_secret,
- header_auth=header_auth)
+ session_params = dict(access_token=access_token,
+ access_token_secret=access_token_secret,
+ header_auth=header_auth)
+ auth_session = self._construct_session(**session_params)
response = auth_session.request(method,
uri,
- allow_redirects=allow_redirects,
**kwargs)
return Response(response)
View
1  tests/base.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
'''
rauth.base
----------
View
1  tests/test_hook.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
'''
rauth.test_hook
---------------
View
1  tests/test_oauth.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
'''
rauth.test_oauth
------------------
View
52 tests/test_service.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
'''
- rauth.test_oauth
- ----------------
+ rauth.test_service
+ ------------------
Test suite for rauth.service.
'''
@@ -179,7 +179,7 @@ def test_get_with_access_token(self, mock_request):
self.assertEqual(response['status'], 'ok')
def test_missing_client_creds(self):
- with self.assertRaises(ValueError) as e:
+ with self.assertRaises(TypeError) as e:
OAuth2Service()
self.assertEqual(str(e.exception),
'client_id and client_secret must not be None')
@@ -217,9 +217,9 @@ def test_get_access_token_bad_response(self, mock_request):
self.response.raise_for_status = self.raise_for_status
mock_request.return_value = self.response
- with self.assertRaises(Exception) as e:
+ with self.assertRaises(NameError) as e:
self.service.get_access_token(code='4242')
- self.assertEqual('Either params or data dict missing.',
+ self.assertEqual('Either params or data dict missing',
str(e.exception))
@patch.object(requests.Session, 'request')
@@ -245,14 +245,15 @@ def test_request_with_access_token_override(self, mock_request):
url = 'http://example.com/endpoint'
response = self.service.request(method, url, access_token='420')
self.assertEqual(response.content['status'], 'ok')
- mock_request.assert_called_with(method, url,
+ mock_request.assert_called_with(method,
+ url,
params=dict(access_token='420'),
timeout=DEFAULT_TIMEOUT)
@patch.object(requests.Session, 'request')
def test_request_with_no_access_token(self, mock_request):
self.service.access_token = None
- with self.assertRaises(Exception) as e:
+ with self.assertRaises(TypeError) as e:
self.service.request('GET', 'http://example.com/endpoint')
self.assertEqual('access_token must not be None',
str(e.exception))
@@ -266,7 +267,8 @@ def test_request(self, mock_request):
url = 'http://example.com/endpoint'
response = self.service.request(method, url).content
self.assertEqual(response['status'], 'ok')
- mock_request.assert_called_with(method, url,
+ mock_request.assert_called_with(method,
+ url,
params=dict(access_token='987'),
timeout=DEFAULT_TIMEOUT)
@@ -368,7 +370,7 @@ def test_init_with_base_url(self):
def test_request_access_token_missing(self, mock_request):
mock_request.return_value = self.response
- with self.assertRaises(ValueError) as e:
+ with self.assertRaises(TypeError) as e:
self.service.get('http://example.com/some/method',
access_token_secret='666').content
self.assertEqual('Either both or neither access_token and '
@@ -379,7 +381,7 @@ def test_request_access_token_missing(self, mock_request):
def test_request_access_token_secret_missing(self, mock_request):
mock_request.return_value = self.response
- with self.assertRaises(ValueError) as e:
+ with self.assertRaises(TypeError) as e:
self.service.get('http://example.com/some/method',
access_token='666').content
self.assertEqual('Either both or neither access_token and '
@@ -389,24 +391,24 @@ def test_request_access_token_secret_missing(self, mock_request):
@patch.object(requests.Session, 'request')
def test_request_with_access_token_override(self, mock_request):
mock_request.return_value = self.response
- response = self.service.get(
- 'http://example.com/some/method',
- access_token_secret='777',
- access_token='666').content
+ response = self.service.request('GET',
+ 'http://example.com/some/method',
+ access_token_secret='777',
+ access_token='666').content
self.assertIsNotNone(response)
self.assertEqual('123', response['oauth_token'])
self.assertEqual('456', response['oauth_token_secret'])
@patch.object(OAuth1Service, '_construct_session')
- def test_request_with_access_token_override2(self, _construct_session):
- self.service.get(
- 'http://example.com/some/method',
- access_token_secret='777',
- access_token='666').content
- _construct_session.assert_called_with(
- access_token='666',
- access_token_secret='777',
- header_auth=self.service.header_auth)
+ def test_request_with_access_token_session(self, _construct_session):
+ self.service.request('GET',
+ 'http://example.com/some/method',
+ access_token_secret='777',
+ access_token='666')
+ session_params = dict(access_token='666',
+ access_token_secret='777',
+ header_auth=self.service.header_auth)
+ _construct_session.assert_called_with(**session_params)
@patch.object(requests.Session, 'request')
def test_get_raw_request_token(self, mock_request):
@@ -662,14 +664,14 @@ def test_parse_utf8_qsl_dup_keys(self, mock_request):
def test_missing_request_token_url(self):
service = OAuth1Service(None, None)
- with self.assertRaises(Exception) as e:
+ with self.assertRaises(TypeError) as e:
service.get_request_token()
self.assertEqual(str(e.exception),
'request_token_url must not be None')
def test_missing_access_token_url(self):
service = OAuth1Service(None, None)
- with self.assertRaises(Exception) as e:
+ with self.assertRaises(TypeError) as e:
service.get_access_token()
self.assertEqual(str(e.exception),
'access_token_url must not be None')
Please sign in to comment.
Something went wrong with that request. Please try again.