Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial commit :cake:

  • Loading branch information...
commit c4f1995bb5f1ed488a76456eeb7947d37cde947c 0 parents
@maxcountryman maxcountryman authored
6 .gitignore
@@ -0,0 +1,6 @@
+build/*
+dist/*
+*.egg-info
+*.pyc
+*.un~
+.coverage
32 setup.py
@@ -0,0 +1,32 @@
+import os
+import sys
+import webauth
+
+from setuptools import setup, find_packages
+
+if sys.argv[-1] == 'test':
+ os.system('nosetests --with-coverage --cover-package=webauth')
+ sys.exit()
+
+setup(
+ name='webauth',
+ version=webauth.__version__,
+ description='A Python Requests hook providing OAuth 1.0/a support.',
+ long_description=open('README.markdown'),
+ author='Max Countryman', # this is just a stand-in don't know what's preferred
+ author_email='max@litl.com', # ditto
+ url='https://github.com/litl/webauth',
+ packages=find_packages(),
+ install_requires=['requests>=0.10.0'],
+ license='BSD', # i don't know what litl will prefer for this
+ classifiers=(
+ 'Development Status :: 4 - Beta',
+ 'Intended Audience :: Developers',
+ 'Programming Language :: Python',
+ 'License :: OSI Approved :: BSD License',
+ 'Operating System :: OS Independent',
+ 'Topic :: Internet :: WWW/HTTP',
+ 'Topic :: Software Development :: Libraries :: Python Modules',
+ ),
+ zip_safe=False,
+)
35 tests/base.py
@@ -0,0 +1,35 @@
+'''
+ webauth.test_hook
+ -----------------
+
+ Test suite for webauth.hook.
+'''
+
+import unittest
+
+from mock import Mock
+
+
+class WebauthTestCase(unittest.TestCase):
+ def setUp(self):
+ # mock request object
+ request = Mock()
+ request.method = 'GET'
+ request.url = 'http://example.com/'
+ request.headers = {}
+ request.params = {}
+ request.data = {}
+ request.data_and_params = {}
+ self.request = request
+
+ # mock consumer object
+ consumer = Mock()
+ consumer.key = '123'
+ consumer.secret = '456'
+ self.consumer = consumer
+
+ # mock token object
+ token = Mock()
+ token.key = '321'
+ token.secret = '456'
+ self.token = token
104 tests/test_hook.py
@@ -0,0 +1,104 @@
+'''
+ webauth.test_hook
+ -----------------
+
+ Test suite for webauth.hook.
+'''
+
+from base import WebauthTestCase
+from webauth.hook import OAuthHook
+
+
+class OAuthHookTestCase(WebauthTestCase):
+ def test_intialize_oauthhook(self):
+ # without token
+ oauth = OAuthHook('123', '345')
+ self.assertTrue(hasattr(oauth, 'consumer'))
+
+ # with token
+ oauth = OAuthHook('123', '345', '321', '654')
+ self.assertTrue(hasattr(oauth, 'consumer'))
+ self.assertTrue(oauth.token is not None)
+
+ def test_oauth_header_auth(self):
+ oauth = OAuthHook('123', '345', header_auth=True)
+ self.assertTrue(oauth.header_auth)
+ oauth(self.request)
+ auth_header = self.request.headers['Authorization']
+ self.assertTrue(auth_header is not None)
+ self.assertTrue('oauth_timestamp' in auth_header)
+ self.assertTrue('oauth_consumer_key="123"' in auth_header)
+ self.assertTrue('oauth_nonce' in auth_header)
+ self.assertTrue('oauth_version="1.0"' in auth_header)
+ self.assertTrue('oauth_signature_method="HMAC-SHA1"' in auth_header)
+
+ def test_oauth_post(self):
+ oauth = OAuthHook('123', '345')
+
+ # call the instance (this would be a POST)
+ self.request.method = 'POST'
+ oauth(self.request)
+ self.assertTrue('oauth_timestamp' in self.request.data)
+ self.assertEqual('123', self.request.data['oauth_consumer_key'])
+ self.assertTrue('oauth_nonce' in self.request.data)
+ self.assertTrue('oauth_version' in self.request.data)
+ self.assertEqual('1.0', self.request.data['oauth_version'])
+ self.assertTrue('oauth_signature_method' in self.request.data)
+ self.assertEqual('HMAC-SHA1',
+ self.request.data['oauth_signature_method'])
+
+ def test_oauth_get(self):
+ oauth = OAuthHook('123', '345')
+
+ # call the instance (this would be a GET)
+ oauth(self.request)
+ self.assertTrue('oauth_timestamp' in self.request.url)
+ self.assertTrue('oauth_consumer_key' in self.request.url)
+ self.assertTrue('oauth_nonce' in self.request.url)
+ self.assertTrue('oauth_version=1.0' in self.request.url)
+ self.assertTrue('oauth_signature_method=HMAC-SHA1' in self.request.url)
+
+ def test_oauth_callback(self):
+ oauth = OAuthHook('123', '345')
+
+ self.request.params = {'oauth_callback': 'http://example.com/callback'}
+ oauth(self.request)
+ oauth_callback = self.request.oauth_params['oauth_callback']
+ self.assertEqual('http://example.com/callback', oauth_callback)
+
+ self.request.data = {'oauth_callback': 'http://example.com/callback'}
+ oauth(self.request)
+ oauth_callback = self.request.oauth_params['oauth_callback']
+ self.assertEqual('http://example.com/callback', oauth_callback)
+
+ def test_oauth_with_token(self):
+ oauth = OAuthHook('123', '345', '321', '654')
+ oauth(self.request)
+ self.assertTrue(oauth.token.key is not None)
+ self.assertTrue('oauth_token' in self.request.url)
+ self.assertEqual('321', self.request.oauth_params['oauth_token'])
+ self.assertTrue('oauth_verifier' in self.request.url)
+ self.assertEqual('', self.request.oauth_params['oauth_verifier'])
+
+ # test with a verifier
+ oauth.token.verifier = '4242'
+ oauth(self.request)
+ self.assertEqual('4242', self.request.oauth_params['oauth_verifier'])
+
+ def test_unique_nonce(self):
+ oauth = OAuthHook('123', '345')
+ oauth(self.request)
+ first_nonce = self.request.oauth_params['oauth_nonce']
+ oauth(self.request)
+ second_nonce = self.request.oauth_params['oauth_nonce']
+ self.assertTrue(first_nonce != second_nonce)
+
+ def test_params_or_data_as_lists(self):
+ oauth = OAuthHook('123', '345')
+ self.request.params = [('foo', 'bar')]
+ self.request.data = [('foo', 'bar')]
+ self.assertTrue(isinstance(self.request.params, list))
+ self.assertTrue(isinstance(self.request.params, list))
+ oauth(self.request)
+ self.assertTrue(isinstance(self.request.params, dict))
+ self.assertTrue(isinstance(self.request.data, dict))
111 tests/test_oauth.py
@@ -0,0 +1,111 @@
+'''
+ webauth.test_oauth
+ ------------------
+
+ Test suite for webauth.oauth.
+'''
+
+from base import WebauthTestCase
+from webauth.oauth import HmacSha1Signature
+
+from urllib import urlencode
+
+
+class OAuthTestCase(WebauthTestCase):
+ def test_hamcsha1_signature(self):
+ self.request.params = {'foo': 'bar'}
+ HmacSha1Signature().sign(self.request, self.consumer, self.token)
+ oauth_signature = self.request.data_and_params['oauth_signature']
+ self.assertTrue(oauth_signature is not None)
+
+ def test_normalize_request_parameters_params(self):
+ # params as a dict
+ self.request.params = {'foo': 'bar'}
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ self.assertEqual('foo=bar', normalized)
+
+ # params as a dict with URL encodable chars
+ self.request.data_and_params = {}
+ self.request.params = {'foo+bar': 'baz'}
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ self.assertEqual('foo%2Bbar=baz', normalized)
+ self.assertTrue('+' not in normalized)
+
+ # params as a string
+ self.request.data_and_params = {}
+ self.request.params = urlencode({'foo': 'bar'})
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ self.assertEqual('foo=bar', normalized)
+
+ # params as a string with URL encodable chars
+ self.request.data_and_params = {}
+ self.request.params = urlencode({'foo+bar': 'baz'})
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ self.assertEqual('foo%2Bbar=baz', normalized)
+ self.assertTrue('+' not in normalized)
+
+ # params and dict as dicts
+ self.request.data_and_params = {}
+ self.request.params = {'a': 'b'}
+ self.request.data = {'foo': 'bar'}
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ self.assertEqual('a=b&foo=bar', normalized)
+
+ def test_normalize_request_parameters_data(self):
+ # data as a dict
+ self.request.data = {'foo': 'bar'}
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ self.assertEqual('foo=bar', normalized)
+
+ # data as a dict with URL encodable chars
+ self.request.data_and_params = {}
+ self.request.data = {'foo+bar': 'baz'}
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ self.assertEqual('foo%2Bbar=baz', normalized)
+ self.assertTrue('+' not in normalized)
+
+ # data as a string with URL encodable chars
+ self.request.data = urlencode({'foo+bar': 'baz'})
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ self.assertEqual('foo%2Bbar=baz', normalized)
+ self.assertTrue('+' not in normalized)
+
+ def test_normalize_request_parameters_both_string(self):
+ # params and data both as a string
+ self.request.params = urlencode({'a': 'b'})
+ self.request.data = urlencode({'foo': 'bar'})
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ # this also demonstrates sorting
+ self.assertEqual('a=b&foo=bar', normalized)
+
+ def test_normalize_request_parameters_params_string(self):
+ # params is a string but data is a dict
+ self.request.params = urlencode({'a': 'b'})
+ self.request.data = {'foo': 'bar'}
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ self.assertEqual('a=b&foo=bar', normalized)
+
+ def test_normalize_request_parameters_data_string(self):
+ # params is a dict but data is a string
+ self.request.params = {'a': 'b'}
+ self.request.data = urlencode({'foo': 'bar'})
+ normalized = \
+ HmacSha1Signature()._normalize_request_parameters(self.request)
+ self.assertEqual('a=b&foo=bar', normalized)
+
+ def test_utf8_encoded_string(self):
+ # in the event a string is already UTF-8
+ self.request.params = {u'foo': u'bar'}
+ self.request.url = u'http://example.com/'
+ HmacSha1Signature().sign(self.request, self.consumer)
+ self.assertEqual({'foo': 'bar'}, self.request.params)
41 webauth/README.markdown
@@ -0,0 +1,41 @@
+# Webauth: OAuth 1.0/a for Python
+
+Webauth is a package providing OAuth 1.0/a consumer support. The package is
+written as a hook over the superb Python Requests package.
+
+
+## Installation
+
+Install the package with one of the following commands:
+
+ $ python setup.py install
+
+or
+
+ $ pip install webauth (not yet!)
+
+
+## Usage
+
+Using the package is quite simple. Ensure that Python Requests is installed.
+Import the relavent module and start utilizing OAuth endpoints!
+
+ from webauth import OAuthHook
+ import requests
+
+ # setup the OAuth Hook
+ oauth = OAuthHook(consumer_key='123', consumer_secret='456')
+ # attach it to a pre-request hook
+ oauth_requests = requests.session(hooks={'prehook': oauth})
+
+ # begin by getting a request token
+ oauth_requests.get('http://example.com/request_token').content
+
+Once the request token is acquired you'll want to update the OAuth Hook and
+request session accordingly, providing the `token` and `token_key` parameters
+to `OAuthHook`.
+
+
+## Documentation
+
+The Sphinx-compiled documentation is available here: (not yet!)
10 webauth/__init__.py
@@ -0,0 +1,10 @@
+''''
+ webauth
+ -------
+
+ OAuth 1.0/a wrapped around Python Requests.
+'''
+
+from webauth.hook import OAuthHook
+
+__version__ = 0.1
141 webauth/hook.py
@@ -0,0 +1,141 @@
+'''
+ webauth.hook
+ ------------
+
+ A hook for the Python Requests package that provides OAuth 1.0/a client
+ support.
+'''
+
+import time
+import random
+
+from hashlib import sha1
+from urllib import quote, urlencode
+
+from oauth import HmacSha1Signature, Token, Consumer
+
+
+class OAuthHook(object):
+ '''Primary hook object providing the interface through which a request is
+ hooked into and patched.
+
+ This package is built on the excellent Python Requests package. It
+ functions by "hooking" into a request and appending various attributes to
+ it which allow a client to interact with a standardized OAuth 1.0/a
+ provider.
+
+ You might intialize :class:`OAuthHook` something like this::
+
+ oauth = OAuthHook(consumer_key=1234,
+ consumer_secret=5678)
+ oauth_session = requests.session(hooks={'pre_request': oauth})
+
+ This establishes a requests session that is wrapped if the OAuth-capable
+ hook. Using this session, an OAuth provider may be interacted with and
+ will receive the proper formatting for requests.
+
+ Note that this is normally used as a starting from which a request token
+ would be generated whereupon an access token is received. Once such a token
+ has been received, the wrapper should be reinitalized with this token::
+
+ # we provide our consumer pair as well as the access pair as returned
+ # by the provider endpoint
+ oauth = OAuthHook(consumer_key=1234,
+ consumer_secret=5678,
+ access_token=4321,
+ access_token_secret=8765)
+ oauth_session = requests.session(hooks={'pre_request': oauth})
+
+ The session is now ready to make calls to the endpoints made available by
+ the provider.
+
+ Additionally some services will make use of header authentication. This is
+ provided by passing :class:`__init__` the `auth_header` parameter as
+ `True`.
+ '''
+ OAUTH_VERSION = '1.0'
+ signature = HmacSha1Signature()
+ token = None
+
+ def __init__(self, consumer_key, consumer_secret, access_token=None,
+ access_token_secret=None, header_auth=False):
+ # construct a token if the access token is available
+ if not None in (access_token, access_token_secret):
+ self.token = Token(access_token, access_token_secret)
+
+ self.consumer = Consumer(consumer_key, consumer_secret)
+ self.header_auth = header_auth
+
+ def __call__(self, request):
+ # apparently it's possible for these not to be set?
+ if not request.params:
+ request.params = {}
+ if not request.data:
+ request.data = {}
+
+ # this is a workaround for a known bug that will be patched
+ if isinstance(request.params, list):
+ request.params = dict(request.params)
+ if isinstance(request.data, list):
+ request.data = dict(request.data)
+
+ # generate the necessary request params
+ request.oauth_params = self.generate_oauth_params()
+
+ # here we append an oauth_callback parameter if any
+ if 'oauth_callback' in request.data:
+ request.oauth_params['oauth_callback'] = \
+ request.data.pop('oauth_callback')
+ if 'oauth_callback' in request.params:
+ request.oauth_params['oauth_callback'] = \
+ request.params.pop('oauth_callback')
+
+ # this is used in the Normalize Request Parameters step
+ request.data_and_params = request.oauth_params.copy()
+
+ # sign and add the signature to the request params
+ self.signature.sign(request, self.consumer, self.token)
+
+ if self.header_auth:
+ # authenticate in the header
+ request.headers['Authorization'] = \
+ self.generate_authorization_header(request.data_and_params)
+ elif request.method == 'POST':
+ # add data_and_params to the body of the POST
+ request.data = request.data_and_params
+ request.headers['content-type'] = \
+ 'application/x-www-form-urlencoded'
+ else:
+ # add data_and_params to the URL parameters
+ request.url = request.url + '?' + \
+ urlencode(request.data_and_params)
+
+ # we're done with these now
+ del request.data_and_params
+
+ def generate_oauth_params(self):
+ '''This method handles generating the necessary URL parameters the
+ OAuth provider will expect.'''
+ oauth_params = {}
+
+ oauth_params['oauth_consumer_key'] = self.consumer.key
+ oauth_params['oauth_timestamp'] = int(time.time())
+ oauth_params['oauth_nonce'] = sha1(str(random.random())).hexdigest()
+ oauth_params['oauth_version'] = self.OAUTH_VERSION
+
+ if self.token:
+ oauth_params['oauth_token'] = self.token.key
+ # this must be set upon recieving a verifier
+ oauth_params['oauth_verifier'] = self.token.verifier or ''
+
+ oauth_params['oauth_signature_method'] = self.signature.NAME
+ return oauth_params
+
+ def generate_authorization_header(self, oauth_params):
+ '''This method constructs an authorization header.'''
+ auth_header = 'OAuth realm=""'
+ params = ''
+ for k, v in oauth_params.items():
+ params += ',{0}="{1}"'.format(k, quote(str(v)))
+ auth_header += params
+ return auth_header
143 webauth/oauth.py
@@ -0,0 +1,143 @@
+'''
+ webauth.oauth
+ -------------
+
+ A module providing various OAuth related containers.
+'''
+
+import base64
+import hmac
+import urlparse
+
+from hashlib import sha1
+from urllib import quote, urlencode
+
+
+class OAuthObject(object):
+ '''A base class for OAuth token objects.'''
+ verifier = None
+
+ def __init__(self, key, secret):
+ self.key = key
+ self.secret = secret
+
+
+class Consumer(OAuthObject):
+ '''The consumer token object.'''
+ pass
+
+
+class Token(OAuthObject):
+ '''The access token object.'''
+ pass
+
+
+class SignatureMethod(object):
+ '''A base class for signature methods providing a set of common methods.'''
+ def _encode_utf8(self, s):
+ if isinstance(s, unicode):
+ return s.encode('utf-8')
+ return unicode(s, 'utf-8').encode('utf-8')
+
+ def _escape(self, s):
+ return quote(self._encode_utf8(s), safe='~')
+
+ def _normalize_request_parameters(self, request):
+ '''The OAuth 1.0/a specs indicate that parameter and body data must be
+ normalized. The specifics of this operation are detailed in the
+ respective specs.
+
+ Here we have to ensure that parameter and body data is properly
+ handled. This means that the case of params or data being strings is
+ taken care of.
+
+ Essentially this is achieved by checking that `request.data` and
+ `request.params` are not strings. This being the case we can then
+ construct a unified list of tuples from them.
+
+ Otherwise we build a series intermediary lists of tuples depending on
+ the type of `request.params` and `request.data`.
+ '''
+ if type(request.params) != str and type(request.data) != str:
+ # if neither params nor data are a string, i.e. both are dicts
+
+ # we concatenate the respective dicts
+ data_and_params = \
+ dict(request.data.items() + request.params.items())
+
+ normalized = []
+ for k, v in data_and_params.items():
+ normalized += [(k, v)]
+ elif type(request.params) == str and type(request.data) == str:
+ # if both params and data are strings
+ params = urlparse.parse_qsl(request.params)
+ data = urlparse.parse_qsl(request.data)
+ normalized = params + data
+ elif type(request.params) == str:
+ # parse the string into a list of tuples
+ normalized = urlparse.parse_qsl(request.params)
+
+ # extract any data
+ for k, v in request.data.items():
+ normalized += [(k, v)]
+ elif type(request.data) == str:
+ # and we do the same if data
+ normalized = urlparse.parse_qsl(request.data)
+
+ # extract any params
+ for k, v in request.params.items():
+ normalized += [(k, v)]
+
+ # extract values from our list of tuples
+ all_normalized = []
+ for t in normalized:
+ k, v = t
+
+ # save key/value pairs to the request and our list
+ request.data_and_params[k] = v
+ all_normalized += [(k, v)]
+
+ # add in the params from data_and_params for signing
+ for k, v in request.data_and_params.items():
+ if (k, v) in all_normalized:
+ continue
+ all_normalized += [(k, v)]
+
+ # sort the params as per the OAuth 1.0/a spec
+ all_normalized.sort()
+
+ # finally encode the params as a string
+ return urlencode(all_normalized)
+
+
+class HmacSha1Signature(SignatureMethod):
+ '''HMAC-SHA1 Signature Method.
+
+ This is a signature method, as per the OAuth 1.0/a and 2.0 specs. As the
+ name might suggest, this method signs parameters with HMAC using SHA1.
+ '''
+ NAME = 'HMAC-SHA1'
+
+ def sign(self, request, consumer, token=None):
+ '''Sign request parameters.'''
+
+ # the necessary parameters we'll sign
+ params_and_data = self._normalize_request_parameters(request)
+ parameters = [self._escape(request.method),
+ self._escape(request.url),
+ self._escape(params_and_data)]
+
+ # set our key
+ key = self._escape(consumer.secret) + '&'
+ if token is not None:
+ key += self._escape(token.secret)
+
+ # build a Signature Base String
+ signature_base_string = '&'.join(parameters)
+
+ # hash the string with HMAC-SHA1
+ hashed = hmac.new(key, signature_base_string, sha1)
+
+ # add the signature to the request
+ request.data_and_params['oauth_signature'] = \
+ base64.b64encode(hashed.digest())
Please sign in to comment.
Something went wrong with that request. Please try again.