Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

adding an example protected resource and decorator to protect with oa…

…uth. also started nosetests to replace rspec tests
  • Loading branch information...
commit fc89a8456ef65a49db3d7ebd9fe99c2272b7fc18 1 parent bd7eab8
@progrium authored
View
26 main.py
@@ -7,6 +7,7 @@
from oauth.handlers import AuthorizationHandler, AccessTokenHandler
from oauth.models import OAuth_Client
+from oauth.utils import oauth_required
# Notes:
# Access tokens usually live shorter than access grant
@@ -19,8 +20,7 @@ def get(self):
class ClientsHandler(webapp.RequestHandler):
""" This is only indirectly necessary since the spec
- calls for clients, but managing them is out of scope
- """
+ calls for clients, but managing them is out of scope """
def get(self):
clients = OAuth_Client.all()
@@ -34,14 +34,28 @@ def post(self):
client.put()
self.redirect(self.request.path)
+class ProtectedResourceHandler(webapp.RequestHandler):
+ """ This is an example of a resource protected by OAuth
+ and requires the 'read' scope """
+
+ SECRET_PAYLOAD = 'bananabread'
+
+ @oauth_required(scope='read')
+ def get(self, token):
+ self.response.headers['Content-Type'] = "application/json"
+ self.response.out.write(
+ simplejson.dumps({'is_protected': True, 'secret_payload': self.SECRET_PAYLOAD}))
-def main():
- application = webapp.WSGIApplication([
+def application():
+ return webapp.WSGIApplication([
('/', MainHandler),
('/oauth/authorize', AuthorizationHandler),
('/oauth/token', AccessTokenHandler),
- ('/app/clients', ClientsHandler), ],debug=True)
- util.run_wsgi_app(application)
+ ('/protected/resource', ProtectedResourceHandler),
+ ('/admin/clients', ClientsHandler), ],debug=True)
+
+def main():
+ util.run_wsgi_app(application())
if __name__ == '__main__':
main()
View
5 oauth/handlers.py
@@ -6,7 +6,10 @@
import urllib
from oauth.models import OAuth_Authorization, OAuth_Token, OAuth_Client
-from oauth.utils import extract
+
+def extract(keys, d):
+ """ Extracts subset of a dict into new dict """
+ return dict((k, d[k]) for k in keys if k in d)
class AuthorizationHandler(webapp.RequestHandler):
SUPPORTED_RESPONSE_TYPES = [
View
14 oauth/models.py
@@ -1,5 +1,13 @@
from google.appengine.ext import db
-from oauth.utils import random_str, now
+import time
+import hashlib
+import random
+
+def now():
+ return int(time.mktime(time.gmtime()))
+
+def random_str():
+ return hashlib.sha1(str(random.random())).hexdigest()
class OAuth_Token(db.Model):
EXPIRY_TIME = 3600*24
@@ -15,6 +23,10 @@ class OAuth_Token(db.Model):
def get_by_refresh_token(cls, refresh_token):
return cls.all().filter('refresh_token =', refresh_token).get()
+ @classmethod
+ def get_by_access_token(cls, access_token):
+ return cls.all().filter('access_token =', access_token).get()
+
def put(self, can_refresh=True):
if can_refresh:
self.refresh_token = random_str()
View
65 oauth/utils.py
@@ -1,13 +1,54 @@
-import time
-import hashlib
-import random
+from oauth.models import OAuth_Token
-def now():
- return int(time.mktime(time.gmtime()))
-
-def random_str():
- return hashlib.sha1(str(random.random())).hexdigest()
-
-def extract(keys, d):
- """ Extracts subset of a dict into new dict """
- return dict((k, d[k]) for k in keys if k in d)
+def oauth_required(scope=None, realm='Example OAuth Service'):
+ """ This is a decorator to be used with RequestHandler methods
+ that accepts/requires OAuth to access a protected resource
+ in accordance with Section 5 of the spec.
+
+ If the token is valid, it's passed as a named parameter to
+ the request handler. The request handler is responsible for
+ validating the user associated with the token. """
+ def decorator(f):
+ def wrapped_f(*args):
+ request = args[0].request
+ response = args[0].response
+
+ def render_error(error, error_desc, error_uri=None):
+ response.set_status({
+ 'invalid_request': 400,
+ 'invalid_token': 401,
+ 'expired_token': 401,
+ 'insufficient_scope': 403, }[error])
+ authenticate_header = 'OAuth realm="%s", error="%s", error_description="%s"' % \
+ (realm, error, error_desc)
+ if error_uri:
+ authenticate_header += ', error_uri="%s"' % error_uri
+ if scope:
+ authenticate_header += ', scope="%s"' % scope
+ response.headers['WWW-Authenticate'] = authenticate_header
+ response.out.write(error_desc)
+
+ if request.headers.get('Authorization', '').startswith('OAuth'):
+ token = request.headers['Authorization'].split(' ')[1]
+ else:
+ token = request.get('oauth_token', None)
+
+ if not token:
+ render_error('invalid_request', "Not a valid request for an OAuth protected resource")
+ return
+
+ token = OAuth_Token.get_by_access_token(token)
+ if token.is_expired():
+ if token.refresh_token:
+ render_error('expired_token', "This token has expired")
+ else:
+ render_error('invalid_token', "This token is no longer valid")
+ return
+
+ if scope != token.scope:
+ render_error('insufficient_scope', "This resource requires higher priveleges")
+ return
+
+ f(*args, token=token)
+ return wrapped_f
+ return decorator
View
31 tests/main_test.py
@@ -0,0 +1,31 @@
+from webtest import TestApp
+from main import application, ProtectedResourceHandler
+from oauth.models import OAuth_Client
+from google.appengine.api import apiproxy_stub_map, datastore_file_stub
+
+app = TestApp(application())
+
+# clear datastore
+apiproxy_stub_map.apiproxy._APIProxyStubMap__stub_map['datastore_v3'].Clear()
+
+# set up test client
+client = OAuth_Client(name='test')
+client.put()
+
+def test_protected_resource_fail_naked():
+ response = app.get('/protected/resource', status=400)
+ assert not ProtectedResourceHandler.SECRET_PAYLOAD in str(response)
+
+def test_protected_resource_success_flow():
+ response = app.post('/oauth/token', dict(
+ grant_type = 'password',
+ username = 'user',
+ password = 'pass',
+ client_id = client.client_id,
+ client_secret = client.client_secret,
+ scope = 'read',
+ ))
+ assert 'access_token' in response.json.keys()
+ token = response.json['access_token']
+ response = app.get('/protected/resource', dict(oauth_token=token))
+ assert ProtectedResourceHandler.SECRET_PAYLOAD in str(response)
Please sign in to comment.
Something went wrong with that request. Please try again.