Skip to content

Commit

Permalink
Changed from the oauth library to oauth2 library.
Browse files Browse the repository at this point in the history
  • Loading branch information
flashingpumpkin committed Apr 12, 2010
1 parent f19f847 commit 669713d
Showing 1 changed file with 113 additions and 156 deletions.
269 changes: 113 additions & 156 deletions socialregistration/utils.py
Expand Up @@ -11,9 +11,10 @@
import base64
import urllib
import urllib2
from urlparse import parse_qsl
from xml.dom import minidom

from oauth import oauth
import oauth2 as oauth
from openid.consumer import consumer as openid
from openid.store.interface import OpenIDStore as OIDStore
from openid.association import Association as OIDAssociation
Expand All @@ -29,6 +30,7 @@


from socialregistration.models import OpenIDStore as OpenIDStoreModel, OpenIDNonce
from urlparse import urlparse

USE_HTTPS = bool(getattr(settings, 'SOCIALREGISTRATION_USE_HTTPS', False))

Expand Down Expand Up @@ -128,192 +130,147 @@ def is_valid(self):

return self.result.status == openid.SUCCESS

class OAuthClient(oauth.OAuthClient):
"""
Simple OAuth client to perform OAuth requests
( primarily connect accounts - for other requests see class OAuth below )

def get_token_prefix(url):
"""
Returns a prefix for the token to store in the session so we can hold
more than one single oauth provider's access key in the session.
def __init__(self, request, consumer_key, consumer_secret,
request_token_url, access_token_url, authorization_url, callback_url, parameters=None):
Example:
self.request = request
The request token url ``http://twitter.com/oauth/request_token``
returns ``twitter.com``
"""
return urllib2.urlparse.urlparse(url).netloc


class OAuthError(Exception):
pass

class OAuthClient(object):

def __init__(self, request, consumer_key, consumer_secret, request_token_url,
access_token_url, authorization_url, callback_url, parameters=None):

self.request = request

self.request_token_url = request_token_url
self.access_token_url = access_token_url
self.authorization_url = authorization_url

self.consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)
self.signature_method = oauth.OAuthSignatureMethod_HMAC_SHA1()

self.parameters = parameters

self.errors = []
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret

self.consumer = oauth.Consumer(consumer_key, consumer_secret)
self.client = oauth.Client(self.consumer)

self.signature_method = oauth.SignatureMethod_HMAC_SHA1()

self.parameters = parameters

self.callback_url = callback_url

def _get_response(self, oauth_request):
try:
return urllib2.urlopen(oauth_request.to_url()).read()
except urllib2.HTTPError, e:
raise Exception('%s on %s' % (e, oauth_request.to_url()))

def get_request_token(self):
"""
Get a request token
"""
oauth_request = oauth.OAuthRequest.from_consumer_and_token(
self.consumer, http_url=self.request_token_url,
parameters=self.parameters
)
oauth_request.sign_request(self.signature_method, self.consumer, None)
response = self._get_response(oauth_request)
self.errors = []
self.request_token = None
self.access_token = None

if response.startswith('{'):
# Response is in json convert to string
oauth_token = simplejson.loads(response)['oauth_token']
oauth_token_secret = simplejson.loads(response)['oauth_token_secret']
def _get_request_token(self):
"""
Obtain a temporary request token to authorize an access token and to
sign the request to obtain the access token
"""
if self.request_token is None:
response, content = self.client.request(self.request_token_url, "GET")
if response['status'] != '200':
raise OAuthError(
_('Invalid response while obtaining request token from "%s".') % get_token_prefix(self.request_token_url))
self.request_token = dict(parse_qsl(content))
self.request.session['oauth_%s_request_token' % get_token_prefix(self.request_token_url)] = self.request_token
return self.request_token

response = 'oauth_token=' + oauth_token + '&oauth_token_secret=' + oauth_token_secret
def _get_access_token(self):
if self.access_token is None:
request_token = self._get_rt_from_session()
token = oauth.Token(request_token['oauth_token'], request_token['oauth_token_secret'])
self.client = oauth.Client(self.consumer, token)
response, content = self.client.request(self.access_token_url, "GET")
if response['status'] != '200':
raise OAuthError(
_('Invalid response while obtaining access token from "%s".') % get_token_prefix(self.request_token_url))
self.access_token = dict(parse_qsl(content))

return oauth.OAuthToken.from_string(response)

def get_access_token(self):
"""
Get an access token
"""
oauth_request = oauth.OAuthRequest.from_consumer_and_token(
self.consumer, http_url=self.access_token_url, token=self.token,
parameters=self.parameters
)
oauth_request.sign_request(self.signature_method, self.consumer, self.token)
response = self._get_response(oauth_request)
self.request.session['oauth_%s_access_token' % get_token_prefix(self.request_token_url)] = self.access_token
return self.access_token

if response.startswith('<?xml'):
# Response is in xml convert to string
xml = minidom.parseString(response)
oauth_token = xml.getElementsByTagName('oauth_token')[0].childNodes[0].nodeValue
oauth_token_secret = xml.getElementsByTagName('oauth_token_secret')[0].childNodes[0].nodeValue

response = 'oauth_token=' + oauth_token + '&oauth_token_secret=' + oauth_token_secret

return oauth.OAuthToken.from_string(response)

def token_prefix(self):
def _get_rt_from_session(self):
"""
Returns a prefix for the token to store in the session so we can hold
more than one single oauth provider's access key in the session
Returns the request token cached in the session by ``_get_request_token``
"""
if getattr(self, '_prefix', None) is None:
self._prefix = urllib2.urlparse.urlparse(self.request_token_url).netloc
return self._prefix

@property
def token(self):
""" Short wrapper around get_request_token to cache the token """
if getattr(self, '_token', None) is None:
self._token = self.get_request_token()
return self._token
try:
return self.request.session['oauth_%s_request_token' % get_token_prefix(self.request_token_url)]
except KeyError:
raise OAuthError(_('No request token saved for "%s".') % get_token_prefix(self.request_token_url))

def _get_authorization_url(self):
request_token = self._get_request_token()
return '%s?oauth_token=%s&oauth_callback=%s' % (self.authorization_url,
request_token['oauth_token'], '%s%s' % (Site.objects.get_current().domain,
reverse(self.callback_url)))

def session_token(self):
""" Short wrapper around the token we've stored in the session """
return self.request.session.get(
'oauth_%s_unauthed_token' % self.token_prefix(),
None
)
def is_valid(self):
try:
self._get_rt_from_session()
self._get_access_token()
except OAuthError, e:
self.errors.append(e.args[0])
return False
return True


def get_authorization_url(self):
def get_redirect(self):
"""
Returns the url to redirect the user to
Returns a ``HttpResponseRedirect`` object to redirect the user to the
URL the OAuth provider handles authorization.
"""
oauth_request = oauth.OAuthRequest.from_consumer_and_token(
self.consumer,
http_url=self.authorization_url,
token=self.token,
)
if self.callback_url:
oauth_request.parameters['oauth_callback'] = Site.objects.get_current().domain + reverse(self.callback_url)
return HttpResponseRedirect(self._get_authorization_url())

oauth_request.sign_request(self.signature_method, self.consumer, self.token)
return oauth_request.to_url()
class OAuth(object):
def __init__(self, request, consumer_key, secret_key, request_token_url):
self.request = request

self.consumer_key = consumer_key
self.secret_key = secret_key
self.consumer = oauth.Consumer(consumer_key, secret_key)

self.request_token_url = request_token_url

def get_redirect(self):
def _get_at_from_session(self):
"""
Returns a HttpResponseRedirect object to redirect the user to the url
where authorization of the current application is handled.
Get the saved access token for private resources from the session.
"""
self.request.session['oauth_%s_unauthed_token' % self.token_prefix()] = self.token.to_string()
return HttpResponseRedirect(self.get_authorization_url())
try:
return self.request.session['oauth_%s_access_token' % get_token_prefix(self.request_token_url)]
except KeyError:
raise OAuthError(
_('No access token saved for "%s".') % get_token_prefix(self.request_token_url))

def is_valid(self):
"""
Check if everything is valid after the user got redirected back to our
site.
"""
if not self.session_token():
self.errors.append(_('No un-authorized token given.'))
return False
def query(self, url, method="GET", params=dict()):
# TODO: Params

self._token = oauth.OAuthToken.from_string(self.session_token())

if not self.token.key == self.request.GET.get('oauth_token', 'no-token-given'):
self.errors.append(_('The given authorization tokens do not match.'))
return False
at = self._get_at_from_session()

self._token = self.get_access_token()
self.request.session['oauth_%s_access_token' % self.token_prefix()] = self.token.to_string()
token = oauth.Token(at['oauth_token'], at['oauth_token_secret'])

return True

class OAuth(object):
"""
Base object to perform OAuth signed requests to a service provider
"""
def __init__(self, request, consumer_key, secret_key, request_token_url):
self.request = request
self.consumer = oauth.OAuthConsumer(consumer_key, secret_key)
self.signature_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
client = oauth.Client(self.consumer, token)

self.request_token_url = request_token_url
response, content = client.request(url, method=method)

def token_prefix(self):
"""
Create a prefix for the token so we can hold multiple different oauth
tokens in the session
"""
return urllib2.urlparse.urlparse(self.request_token_url).netloc

@property
def access_token(self):
if getattr(self, '_access_token', None) is None:
self._access_token = oauth.OAuthToken.from_string(
self.request.session['oauth_%s_access_token' % self.token_prefix()]
)
return self._access_token

def get_request(self, url, parameters=None):
""" Build a request object """
oauth_request = oauth.OAuthRequest.from_consumer_and_token(
self.consumer, http_url=url, token=self.access_token,
parameters=parameters
)
oauth_request.sign_request(
self.signature_method, self.consumer, self.access_token
)
return oauth_request

def get_response(self, oauth_request):
""" Submit the request and fetch the response body
TODO: Add POST support"""
try:
return urllib2.urlopen(oauth_request.to_url()).read()
except urllib2.HTTPError, e:
raise Exception('%s on %s' % (e, oauth_request.to_url()))

def query(self, url, parameters=None):
return self.get_response(
self.get_request(url, parameters)
)
if response['status'] != '200':
raise OAuthError(
_('No access to private resources at "%s".') % get_token_prefix(self.request_token_url))

return content

class OAuthTwitter(OAuth):
"""
Expand Down

0 comments on commit 669713d

Please sign in to comment.