Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

update to revision 1138 of http://oauth.googlecode.com/svn/code/pytho…

  • Loading branch information...
commit 45fc51d99eb6d575026650058b5d2530609ce168 1 parent 5e03112
@n8han n8han authored
Showing with 384 additions and 274 deletions.
  1. +384 −274 oauth.py
View
658 oauth.py
@@ -1,85 +1,161 @@
+"""
+The MIT License
+
+Copyright (c) 2007 Leah Culver
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+"""
+
import cgi
import urllib
import time
import random
import urlparse
import hmac
-import base64
+import binascii
+
VERSION = '1.0' # Hi Blaine!
HTTP_METHOD = 'GET'
SIGNATURE_METHOD = 'PLAINTEXT'
-# Generic exception class
+
class OAuthError(RuntimeError):
+ """Generic exception class."""
def __init__(self, message='OAuth error occured.'):
self.message = message
-# optional WWW-Authenticate header (401 error)
def build_authenticate_header(realm=''):
+ """Optional WWW-Authenticate header (401 error)"""
return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}
-# url escape
def escape(s):
- # escape '/' too
+ """Escape a URL including any /."""
return urllib.quote(s, safe='~')
-# util function: current timestamp
-# seconds since epoch (UTC)
+def _utf8_str(s):
+ """Convert unicode to utf-8."""
+ if isinstance(s, unicode):
+ return s.encode("utf-8")
+ else:
+ return str(s)
+
def generate_timestamp():
+ """Get seconds since epoch (UTC)."""
return int(time.time())
-# util function: nonce
-# pseudorandom number
def generate_nonce(length=8):
+ """Generate pseudorandom number."""
+ return ''.join([str(random.randint(0, 9)) for i in range(length)])
+
+def generate_verifier(length=8):
+ """Generate pseudorandom number."""
return ''.join([str(random.randint(0, 9)) for i in range(length)])
-# OAuthConsumer is a data type that represents the identity of the Consumer
-# via its shared secret with the Service Provider.
class OAuthConsumer(object):
+ """Consumer of OAuth authentication.
+
+ OAuthConsumer is a data type that represents the identity of the Consumer
+ via its shared secret with the Service Provider.
+
+ """
key = None
secret = None
- def __init__(self, key, secret):#{{{
+ def __init__(self, key, secret):
self.key = key
- self.secret = secret#}}}
+ self.secret = secret
+
-# OAuthToken is a data type that represents an End User via either an access
-# or request token.
class OAuthToken(object):
- # access tokens and request tokens
+ """OAuthToken is a data type that represents an End User via either an access
+ or request token.
+
+ key -- the token
+ secret -- the token secret
+
+ """
key = None
secret = None
+ callback = None
+ callback_confirmed = None
+ verifier = None
- '''
- key = the token
- secret = the token secret
- '''
- def __init__(self, key, secret, authorized_user_id=None):#{{{
- self.authorized_user_id = authorized_user_id
+ def __init__(self, key, secret):
self.key = key
- self.secret = secret#}}}
-
- def to_string(self):#{{{
- return urllib.urlencode({'oauth_token': self.key, 'oauth_token_secret': self.secret})#}}}
-
- # return a token from something like:
- # oauth_token_secret=digg&oauth_token=digg
- #@staticmethod
- def from_string(s):#{{{
+ self.secret = secret
+
+ def set_callback(self, callback):
+ self.callback = callback
+ self.callback_confirmed = 'true'
+
+ def set_verifier(self, verifier=None):
+ if verifier is not None:
+ self.verifier = verifier
+ else:
+ self.verifier = generate_verifier()
+
+ def get_callback_url(self):
+ if self.callback and self.verifier:
+ # Append the oauth_verifier.
+ parts = urlparse.urlparse(self.callback)
+ scheme, netloc, path, params, query, fragment = parts[:6]
+ if query:
+ query = '%s&oauth_verifier=%s' % (query, self.verifier)
+ else:
+ query = 'oauth_verifier=%s' % self.verifier
+ return urlparse.urlunparse((scheme, netloc, path, params,
+ query, fragment))
+ return self.callback
+
+ def to_string(self):
+ data = {
+ 'oauth_token': self.key,
+ 'oauth_token_secret': self.secret,
+ }
+ if self.callback_confirmed is not None:
+ data['oauth_callback_confirmed'] = self.callback_confirmed
+ return urllib.urlencode(data)
+
+ def from_string(s):
+ """ Returns a token from something like:
+ oauth_token_secret=xxx&oauth_token=xxx
+ """
params = cgi.parse_qs(s, keep_blank_values=False)
key = params['oauth_token'][0]
secret = params['oauth_token_secret'][0]
- return OAuthToken(key, secret)
- from_string = staticmethod(from_string)#}}}
+ token = OAuthToken(key, secret)
+ try:
+ token.callback_confirmed = params['oauth_callback_confirmed'][0]
+ except KeyError:
+ pass # 1.0, no callback confirmed.
+ return token
+ from_string = staticmethod(from_string)
+
+ def __str__(self):
+ return self.to_string()
- def __str__(self):#{{{
- return self.to_string()#}}}
-# OAuthRequest represents the request and can be serialized
class OAuthRequest(object):
- '''
+ """OAuthRequest represents the request and can be serialized.
+
OAuth parameters:
- oauth_consumer_key
- oauth_token
@@ -88,138 +164,143 @@ class OAuthRequest(object):
- oauth_timestamp
- oauth_nonce
- oauth_version
+ - oauth_verifier
... any additional parameters, as defined by the Service Provider.
- '''
- parameters = None # oauth parameters
+ """
+ parameters = None # OAuth parameters.
http_method = HTTP_METHOD
http_url = None
version = VERSION
- def __init__(self, http_method=HTTP_METHOD, http_url=None, parameters=None):#{{{
+ def __init__(self, http_method=HTTP_METHOD, http_url=None, parameters=None):
self.http_method = http_method
self.http_url = http_url
- self.parameters = parameters or {}#}}}
+ self.parameters = parameters or {}
- def set_parameter(self, parameter, value):#{{{
- self.parameters[parameter] = value#}}}
+ def set_parameter(self, parameter, value):
+ self.parameters[parameter] = value
- def get_parameter(self, parameter):#{{{
+ def get_parameter(self, parameter):
try:
return self.parameters[parameter]
except:
- raise OAuthError('Parameter not found: %s' % parameter)#}}}
-
- def _get_timestamp_nonce(self):#{{{
- return self.get_parameter('oauth_timestamp'), self.get_parameter('oauth_nonce')#}}}
+ raise OAuthError('Parameter not found: %s' % parameter)
+ def _get_timestamp_nonce(self):
+ return self.get_parameter('oauth_timestamp'), self.get_parameter(
+ 'oauth_nonce')
- def get_oauth_parameters(self):#{{{
+ def get_nonoauth_parameters(self):
+ """Get any non-OAuth parameters."""
parameters = {}
for k, v in self.parameters.iteritems():
- # ignore oauth parameters
- if k.find('oauth_') == 0:
- parameters[k] = v
- return parameters#}}}
-
- # get any non-oauth parameters
- def get_nonoauth_parameters(self):#{{{
- parameters = {}
- for k, v in self.parameters.iteritems():
- # ignore oauth parameters
+ # Ignore oauth parameters.
if k.find('oauth_') < 0:
parameters[k] = v
- return parameters#}}}
+ return parameters
- # serialize as a header for an HTTPAuth request
- def to_header(self, realm=''):#{{{
+ def to_header(self, realm=''):
+ """Serialize as a header for an HTTPAuth request."""
auth_header = 'OAuth realm="%s"' % realm
- # add the oauth parameters
+ # Add the oauth parameters.
if self.parameters:
for k, v in self.parameters.iteritems():
- auth_header += ', %s="%s"' % (k, escape(str(v)))
- return {'Authorization': auth_header}#}}}
+ if k[:6] == 'oauth_':
+ auth_header += ', %s="%s"' % (k, escape(str(v)))
+ return {'Authorization': auth_header}
- # serialize as post data for a POST request
- def to_postdata(self):#{{{
- return '&'.join(['%s=%s' % (escape(str(k)), escape(str(v))) for k, v in self.parameters.iteritems()])#}}}
+ def to_postdata(self):
+ """Serialize as post data for a POST request."""
+ return '&'.join(['%s=%s' % (escape(str(k)), escape(str(v))) \
+ for k, v in self.parameters.iteritems()])
- # serialize as a url for a GET request
- def to_url(self):#{{{
- return '%s?%s' % (self.get_normalized_http_url(), self.to_postdata())#}}}
+ def to_url(self):
+ """Serialize as a URL for a GET request."""
+ return '%s?%s' % (self.get_normalized_http_url(), self.to_postdata())
- # return a string that consists of all the parameters that need to be signed
- def get_normalized_parameters(self):#{{{
+ def get_normalized_parameters(self):
+ """Return a string that contains the parameters that must be signed."""
params = self.parameters
try:
- # exclude the signature if it exists
+ # Exclude the signature if it exists.
del params['oauth_signature']
except:
pass
- key_values = params.items()
- # sort lexicographically, first after key, then after value
+ # Escape key values before sorting.
+ key_values = [(escape(_utf8_str(k)), escape(_utf8_str(v))) \
+ for k,v in params.items()]
+ # Sort lexicographically, first after key, then after value.
key_values.sort()
- # combine key value pairs in string and escape
- return '&'.join(['%s=%s' % (escape(str(k)), escape(str(v))) for k, v in key_values])#}}}
+ # Combine key value pairs into a string.
+ return '&'.join(['%s=%s' % (k, v) for k, v in key_values])
- # just uppercases the http method
- def get_normalized_http_method(self):#{{{
- return self.http_method.upper()#}}}
+ def get_normalized_http_method(self):
+ """Uppercases the http method."""
+ return self.http_method.upper()
- # parses the url and rebuilds it to be scheme://host/path
- def get_normalized_http_url(self):#{{{
+ def get_normalized_http_url(self):
+ """Parses the URL and rebuilds it to be scheme://host/path."""
parts = urlparse.urlparse(self.http_url)
- url_string = '%s://%s%s' % (parts[0], parts[1], parts[2]) # scheme, netloc, path
- return url_string#}}}
-
- # set the signature parameter to the result of build_signature
- def sign_request(self, signature_method, consumer, token):#{{{
- # set the signature method
- self.set_parameter('oauth_signature_method', signature_method.get_name())
- # set the signature
- self.set_parameter('oauth_signature', self.build_signature(signature_method, consumer, token))#}}}
-
- def build_signature(self, signature_method, consumer, token):#{{{
- # call the build signature method within the signature method
- return signature_method.build_signature(self, consumer, token)#}}}
-
- #@staticmethod
- def from_request(http_method, http_url, headers=None, parameters=None, query_string=None):#{{{
- # combine multiple parameter sources
+ scheme, netloc, path = parts[:3]
+ # Exclude default port numbers.
+ if scheme == 'http' and netloc[-3:] == ':80':
+ netloc = netloc[:-3]
+ elif scheme == 'https' and netloc[-4:] == ':443':
+ netloc = netloc[:-4]
+ return '%s://%s%s' % (scheme, netloc, path)
+
+ def sign_request(self, signature_method, consumer, token):
+ """Set the signature parameter to the result of build_signature."""
+ # Set the signature method.
+ self.set_parameter('oauth_signature_method',
+ signature_method.get_name())
+ # Set the signature.
+ self.set_parameter('oauth_signature',
+ self.build_signature(signature_method, consumer, token))
+
+ def build_signature(self, signature_method, consumer, token):
+ """Calls the build signature method within the signature method."""
+ return signature_method.build_signature(self, consumer, token)
+
+ def from_request(http_method, http_url, headers=None, parameters=None,
+ query_string=None):
+ """Combines multiple parameter sources."""
if parameters is None:
parameters = {}
- # headers
+ # Headers
if headers and 'Authorization' in headers:
auth_header = headers['Authorization']
- # check that the authorization header is OAuth
- if auth_header.index('OAuth') > -1:
+ # Check that the authorization header is OAuth.
+ if auth_header[:6] == 'OAuth ':
+ auth_header = auth_header[6:]
try:
- # get the parameters from the header
+ # Get the parameters from the header.
header_params = OAuthRequest._split_header(auth_header)
parameters.update(header_params)
except:
- raise OAuthError('Unable to parse OAuth parameters from Authorization header.')
+ raise OAuthError('Unable to parse OAuth parameters from '
+ 'Authorization header.')
- # GET or POST query string
+ # GET or POST query string.
if query_string:
query_params = OAuthRequest._split_url_string(query_string)
parameters.update(query_params)
- # URL parameters
-
- if http_url:
- param_str = urlparse.urlparse(http_url)[4] # query
- url_params = OAuthRequest._split_url_string(param_str)
- parameters.update(url_params)
+ # URL parameters.
+ param_str = urlparse.urlparse(http_url)[4] # query
+ url_params = OAuthRequest._split_url_string(param_str)
+ parameters.update(url_params)
if parameters:
return OAuthRequest(http_method, http_url, parameters)
return None
- from_request = staticmethod(from_request)#}}}
+ from_request = staticmethod(from_request)
- #@staticmethod
- def from_consumer_and_token(oauth_consumer, token=None, http_method=HTTP_METHOD, http_url=None, parameters=None):#{{{
+ def from_consumer_and_token(oauth_consumer, token=None,
+ callback=None, verifier=None, http_method=HTTP_METHOD,
+ http_url=None, parameters=None):
if not parameters:
parameters = {}
@@ -235,164 +316,182 @@ def from_consumer_and_token(oauth_consumer, token=None, http_method=HTTP_METHOD,
if token:
parameters['oauth_token'] = token.key
+ if token.callback:
+ parameters['oauth_callback'] = token.callback
+ # 1.0a support for verifier.
+ if verifier:
+ parameters['oauth_verifier'] = verifier
+ elif callback:
+ # 1.0a support for callback in the request token request.
+ parameters['oauth_callback'] = callback
return OAuthRequest(http_method, http_url, parameters)
- from_consumer_and_token = staticmethod(from_consumer_and_token)#}}}
+ from_consumer_and_token = staticmethod(from_consumer_and_token)
- #@staticmethod
- def from_token_and_callback(token, callback=None, http_method=HTTP_METHOD, http_url=None, parameters=None):#{{{
+ def from_token_and_callback(token, callback=None, http_method=HTTP_METHOD,
+ http_url=None, parameters=None):
if not parameters:
parameters = {}
parameters['oauth_token'] = token.key
if callback:
- parameters['oauth_callback'] = escape(callback)
+ parameters['oauth_callback'] = callback
return OAuthRequest(http_method, http_url, parameters)
- from_token_and_callback = staticmethod( from_token_and_callback)#}}}
+ from_token_and_callback = staticmethod(from_token_and_callback)
- # util function: turn Authorization: header into parameters, has to do some unescaping
- #@staticmethod
- def _split_header(header):#{{{
+ def _split_header(header):
+ """Turn Authorization: header into parameters."""
params = {}
parts = header.split(',')
for param in parts:
- # ignore realm parameter
- if param.find('OAuth realm') > -1:
+ # Ignore realm parameter.
+ if param.find('realm') > -1:
continue
- # remove whitespace
+ # Remove whitespace.
param = param.strip()
- # split key-value
+ # Split key-value.
param_parts = param.split('=', 1)
- # remove quotes and unescape the value
+ # Remove quotes and unescape the value.
params[param_parts[0]] = urllib.unquote(param_parts[1].strip('\"'))
return params
- _split_header = staticmethod( _split_header)#}}}
+ _split_header = staticmethod(_split_header)
- # util function: turn url string into parameters, has to do some unescaping
- #@staticmethod
- def _split_url_string(param_str):#{{{
+ def _split_url_string(param_str):
+ """Turn URL string into parameters."""
parameters = cgi.parse_qs(param_str, keep_blank_values=False)
for k, v in parameters.iteritems():
parameters[k] = urllib.unquote(v[0])
return parameters
- _split_url_string = staticmethod( _split_url_string)#}}}
-
+ _split_url_string = staticmethod(_split_url_string)
-# OAuthServer is a worker to check a requests validity against a data store
class OAuthServer(object):
- timestamp_threshold = 300 # in seconds, five minutes
+ """A worker to check the validity of a request against a data store."""
+ timestamp_threshold = 300 # In seconds, five minutes.
version = VERSION
signature_methods = None
data_store = None
- def __init__(self, data_store=None, signature_methods=None):#{{{
+ def __init__(self, data_store=None, signature_methods=None):
self.data_store = data_store
- self.signature_methods = signature_methods or {}#}}}
+ self.signature_methods = signature_methods or {}
- def set_data_store(self, oauth_data_store):#{{{
- self.data_store = data_store#}}}
+ def set_data_store(self, data_store):
+ self.data_store = data_store
- def get_data_store(self):#{{{
- return self.data_store#}}}
+ def get_data_store(self):
+ return self.data_store
- def add_signature_method(self, signature_method):#{{{
+ def add_signature_method(self, signature_method):
self.signature_methods[signature_method.get_name()] = signature_method
- return self.signature_methods#}}}
+ return self.signature_methods
- # process a request_token request
- # returns the request token on success
- def fetch_request_token(self, oauth_request):#{{{
+ def fetch_request_token(self, oauth_request):
+ """Processes a request_token request and returns the
+ request token on success.
+ """
try:
- # get the request token for authorization
+ # Get the request token for authorization.
token = self._get_token(oauth_request, 'request')
except OAuthError:
- # no token required for the initial token request
+ # No token required for the initial token request.
version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
+ try:
+ callback = self.get_callback(oauth_request)
+ except OAuthError:
+ callback = None # 1.0, no callback specified.
self._check_signature(oauth_request, consumer, None)
- # fetch a new token
- token = self.data_store.fetch_request_token(consumer)
- return token#}}}
-
- # process an access_token request
- # returns the access token on success
- def fetch_access_token(self, oauth_request):#{{{
+ # Fetch a new token.
+ token = self.data_store.fetch_request_token(consumer, callback)
+ return token
+
+ def fetch_access_token(self, oauth_request):
+ """Processes an access_token request and returns the
+ access token on success.
+ """
version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
- # get the request token
+ try:
+ verifier = self._get_verifier(oauth_request)
+ except OAuthError:
+ verifier = None
+ # Get the request token.
token = self._get_token(oauth_request, 'request')
self._check_signature(oauth_request, consumer, token)
- new_token = self.data_store.fetch_access_token(consumer, token)
- return new_token#}}}
+ new_token = self.data_store.fetch_access_token(consumer, token, verifier)
+ return new_token
- # verify an api call, checks all the parameters
- def verify_request(self, oauth_request):#{{{
+ def verify_request(self, oauth_request):
+ """Verifies an api call and checks all the parameters."""
# -> consumer and token
version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
- # get the access token
+ # Get the access token.
token = self._get_token(oauth_request, 'access')
self._check_signature(oauth_request, consumer, token)
parameters = oauth_request.get_nonoauth_parameters()
- return consumer, token, parameters#}}}
-
- # authorize a request token
- def authorize_token(self, token, user):#{{{
- return self.data_store.authorize_request_token(token, user)#}}}
-
- # get the callback url
- def get_callback(self, oauth_request):#{{{
- return oauth_request.get_parameter('oauth_callback')#}}}
-
- # optional support for the authenticate header
- def build_authenticate_header(self, realm=''):#{{{
- return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}#}}}
-
- # verify the correct version request for this server
- def _get_version(self, oauth_request):#{{{
+ return consumer, token, parameters
+
+ def authorize_token(self, token, user):
+ """Authorize a request token."""
+ return self.data_store.authorize_request_token(token, user)
+
+ def get_callback(self, oauth_request):
+ """Get the callback URL."""
+ return oauth_request.get_parameter('oauth_callback')
+
+ def build_authenticate_header(self, realm=''):
+ """Optional support for the authenticate header."""
+ return {'WWW-Authenticate': 'OAuth realm="%s"' % realm}
+
+ def _get_version(self, oauth_request):
+ """Verify the correct version request for this server."""
try:
version = oauth_request.get_parameter('oauth_version')
except:
version = VERSION
if version and version != self.version:
raise OAuthError('OAuth version %s not supported.' % str(version))
- return version#}}}
+ return version
- # figure out the signature with some defaults
- def _get_signature_method(self, oauth_request):#{{{
+ def _get_signature_method(self, oauth_request):
+ """Figure out the signature with some defaults."""
try:
- signature_method = oauth_request.get_parameter('oauth_signature_method')
+ signature_method = oauth_request.get_parameter(
+ 'oauth_signature_method')
except:
signature_method = SIGNATURE_METHOD
try:
- # get the signature method object
+ # Get the signature method object.
signature_method = self.signature_methods[signature_method]
except:
signature_method_names = ', '.join(self.signature_methods.keys())
- raise OAuthError('Signature method %s not supported try one of the following: %s' % (signature_method, signature_method_names))
+ raise OAuthError('Signature method %s not supported try one of the '
+ 'following: %s' % (signature_method, signature_method_names))
- return signature_method#}}}
+ return signature_method
- def _get_consumer(self, oauth_request):#{{{
+ def _get_consumer(self, oauth_request):
consumer_key = oauth_request.get_parameter('oauth_consumer_key')
- if not consumer_key:
- raise OAuthError('Invalid consumer key.')
consumer = self.data_store.lookup_consumer(consumer_key)
if not consumer:
raise OAuthError('Invalid consumer.')
- return consumer#}}}
+ return consumer
- # try to find the token for the provided request token key
- def _get_token(self, oauth_request, token_type='access'):#{{{
+ def _get_token(self, oauth_request, token_type='access'):
+ """Try to find the token for the provided request token key."""
token_field = oauth_request.get_parameter('oauth_token')
token = self.data_store.lookup_token(token_type, token_field)
if not token:
raise OAuthError('Invalid %s token: %s' % (token_type, token_field))
- return token#}}}
+ return token
+
+ def _get_verifier(self, oauth_request):
+ return oauth_request.get_parameter('oauth_verifier')
- def _check_signature(self, oauth_request, consumer, token):#{{{
+ def _check_signature(self, oauth_request, consumer, token):
timestamp, nonce = oauth_request._get_timestamp_nonce()
self._check_timestamp(timestamp)
self._check_nonce(consumer, token, nonce)
@@ -401,105 +500,114 @@ def _check_signature(self, oauth_request, consumer, token):#{{{
signature = oauth_request.get_parameter('oauth_signature')
except:
raise OAuthError('Missing signature.')
- # validate the signature
- valid_sig = signature_method.check_signature(oauth_request, consumer, token, signature)
+ # Validate the signature.
+ valid_sig = signature_method.check_signature(oauth_request, consumer,
+ token, signature)
if not valid_sig:
- key, base = signature_method.build_signature_base_string(oauth_request, consumer, token)
- raise OAuthError('Invalid signature. Expected signature base string: %s' % base)
- built = signature_method.build_signature(oauth_request, consumer, token)#}}}
-
- def _check_timestamp(self, timestamp):#{{{
- # verify that timestamp is recentish
+ key, base = signature_method.build_signature_base_string(
+ oauth_request, consumer, token)
+ raise OAuthError('Invalid signature. Expected signature base '
+ 'string: %s' % base)
+ built = signature_method.build_signature(oauth_request, consumer, token)
+
+ def _check_timestamp(self, timestamp):
+ """Verify that timestamp is recentish."""
timestamp = int(timestamp)
now = int(time.time())
- lapsed = now - timestamp
+ lapsed = abs(now - timestamp)
if lapsed > self.timestamp_threshold:
- raise OAuthError('Expired timestamp: given %d and now %s has a greater difference than threshold %d' % (timestamp, now, self.timestamp_threshold))#}}}
+ raise OAuthError('Expired timestamp: given %d and now %s has a '
+ 'greater difference than threshold %d' %
+ (timestamp, now, self.timestamp_threshold))
- def _check_nonce(self, consumer, token, nonce):#{{{
- # verify that the nonce is uniqueish
+ def _check_nonce(self, consumer, token, nonce):
+ """Verify that the nonce is uniqueish."""
nonce = self.data_store.lookup_nonce(consumer, token, nonce)
if nonce:
- raise OAuthError('Nonce already used: %s' % str(nonce))#}}}
+ raise OAuthError('Nonce already used: %s' % str(nonce))
+
-# OAuthClient is a worker to attempt to execute a request
class OAuthClient(object):
+ """OAuthClient is a worker to attempt to execute a request."""
consumer = None
token = None
- def __init__(self, oauth_consumer, oauth_token):#{{{
+ def __init__(self, oauth_consumer, oauth_token):
self.consumer = oauth_consumer
- self.token = oauth_token#}}}
+ self.token = oauth_token
+
+ def get_consumer(self):
+ return self.consumer
- def get_consumer(self):#{{{
- return self.consumer#}}}
+ def get_token(self):
+ return self.token
- def get_token(self):#{{{
- return self.token#}}}
+ def fetch_request_token(self, oauth_request):
+ """-> OAuthToken."""
+ raise NotImplementedError
- def fetch_request_token(self, oauth_request):#{{{
- # -> OAuthToken
- raise NotImplementedError#}}}
+ def fetch_access_token(self, oauth_request):
+ """-> OAuthToken."""
+ raise NotImplementedError
- def fetch_access_token(self, oauth_request):#{{{
- # -> OAuthToken
- raise NotImplementedError#}}}
+ def access_resource(self, oauth_request):
+ """-> Some protected resource."""
+ raise NotImplementedError
- def access_resource(self, oauth_request):#{{{
- # -> some protected resource
- raise NotImplementedError#}}}
-# OAuthDataStore is a database abstraction used to lookup consumers and tokens
class OAuthDataStore(object):
+ """A database abstraction used to lookup consumers and tokens."""
+
+ def lookup_consumer(self, key):
+ """-> OAuthConsumer."""
+ raise NotImplementedError
- def lookup_consumer(self, key):#{{{
- # -> OAuthConsumer
- raise NotImplementedError#}}}
+ def lookup_token(self, oauth_consumer, token_type, token_token):
+ """-> OAuthToken."""
+ raise NotImplementedError
- def lookup_token(self, oauth_consumer, token_type, token_token):#{{{
- # -> OAuthToken
- raise NotImplementedError#}}}
+ def lookup_nonce(self, oauth_consumer, oauth_token, nonce):
+ """-> OAuthToken."""
+ raise NotImplementedError
- def lookup_nonce(self, oauth_consumer, oauth_token, nonce, timestamp):#{{{
- # -> OAuthToken
- raise NotImplementedError#}}}
+ def fetch_request_token(self, oauth_consumer, oauth_callback):
+ """-> OAuthToken."""
+ raise NotImplementedError
- def fetch_request_token(self, oauth_consumer):#{{{
- # -> OAuthToken
- raise NotImplementedError#}}}
+ def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier):
+ """-> OAuthToken."""
+ raise NotImplementedError
- def fetch_access_token(self, oauth_consumer, oauth_token):#{{{
- # -> OAuthToken
- raise NotImplementedError#}}}
+ def authorize_request_token(self, oauth_token, user):
+ """-> OAuthToken."""
+ raise NotImplementedError
- def authorize_request_token(self, oauth_token, user):#{{{
- # -> OAuthToken
- raise NotImplementedError#}}}
-# OAuthSignatureMethod is a strategy class that implements a signature method
class OAuthSignatureMethod(object):
- def get_name(self):#{{{
- # -> str
- raise NotImplementedError#}}}
+ """A strategy class that implements a signature method."""
+ def get_name(self):
+ """-> str."""
+ raise NotImplementedError
- def build_signature_base_string(self, oauth_request, oauth_consumer, oauth_token):#{{{
- # -> str key, str raw
- raise NotImplementedError#}}}
+ def build_signature_base_string(self, oauth_request, oauth_consumer, oauth_token):
+ """-> str key, str raw."""
+ raise NotImplementedError
- def build_signature(self, oauth_request, oauth_consumer, oauth_token):#{{{
- # -> str
- raise NotImplementedError#}}}
+ def build_signature(self, oauth_request, oauth_consumer, oauth_token):
+ """-> str."""
+ raise NotImplementedError
- def check_signature(self, oauth_request, consumer, token, signature):#{{{
+ def check_signature(self, oauth_request, consumer, token, signature):
built = self.build_signature(oauth_request, consumer, token)
- return built == signature#}}}
+ return built == signature
+
class OAuthSignatureMethod_HMAC_SHA1(OAuthSignatureMethod):
- def get_name(self):#{{{
- return 'HMAC-SHA1'#}}}
+ def get_name(self):
+ return 'HMAC-SHA1'
- def build_signature_base_string(self, oauth_request, consumer, token):#{{{
+ def build_signature_base_string(self, oauth_request, consumer, token):
sig = (
escape(oauth_request.get_normalized_http_method()),
escape(oauth_request.get_normalized_http_url()),
@@ -510,36 +618,38 @@ def build_signature_base_string(self, oauth_request, consumer, token):#{{{
if token:
key += escape(token.secret)
raw = '&'.join(sig)
- return key, raw#}}}
+ return key, raw
- def build_signature(self, oauth_request, consumer, token):#{{{
- # build the base signature string
- key, raw = self.build_signature_base_string(oauth_request, consumer, token)
+ def build_signature(self, oauth_request, consumer, token):
+ """Builds the base signature string."""
+ key, raw = self.build_signature_base_string(oauth_request, consumer,
+ token)
- # hmac object
+ # HMAC object.
try:
import hashlib # 2.5
hashed = hmac.new(key, raw, hashlib.sha1)
except:
- import sha # deprecated
+ import sha # Deprecated
hashed = hmac.new(key, raw, sha)
- #BROKEN IN JYTHON 2.2:
- #return base64.b64encode(hashed.digest())
- # workaround:
- return base64.encodestring(hashed.digest()).strip() #}}}
+ # Calculate the digest base 64.
+ return binascii.b2a_base64(hashed.digest())[:-1]
+
class OAuthSignatureMethod_PLAINTEXT(OAuthSignatureMethod):
- def get_name(self):#{{{
- return 'PLAINTEXT'#}}}
+ def get_name(self):
+ return 'PLAINTEXT'
- def build_signature_base_string(self, oauth_request, consumer, token):#{{{
- # concatenate the consumer key and secret
- sig = escape(consumer.secret) + '&'
+ def build_signature_base_string(self, oauth_request, consumer, token):
+ """Concatenates the consumer key and secret."""
+ sig = '%s&' % escape(consumer.secret)
if token:
sig = sig + escape(token.secret)
- return sig,sig#}}}
+ return sig, sig
- def build_signature(self, oauth_request, consumer, token):#{{{
- return self.build_signature_base_string(oauth_request, consumer, token)[0]#}}}
+ def build_signature(self, oauth_request, consumer, token):
+ key, raw = self.build_signature_base_string(oauth_request, consumer,
+ token)
+ return key
Please sign in to comment.
Something went wrong with that request. Please try again.