Skip to content

Commit

Permalink
Added MissingSignature exception. Finished removing data_store. 100% …
Browse files Browse the repository at this point in the history
…test coverage.
  • Loading branch information
joestump committed Oct 13, 2009
1 parent 143fb34 commit 8a25b66
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 50 deletions.
37 changes: 18 additions & 19 deletions oauth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def message(self):
"""A hack to get around the deprecation errors in 2.6."""
return self._message

def __str__(self):
return self._message

class MissingSignature(Error):
pass

def build_authenticate_header(realm=''):
"""Optional WWW-Authenticate header (401 error)"""
Expand Down Expand Up @@ -296,6 +301,8 @@ def get_parameter(self, parameter):
ret = self.get(parameter)
if ret is None:
raise Error('Parameter not found: %s' % parameter)

return ret

def get_normalized_parameters(self):
"""Return a string that contains the parameters that must be signed."""
Expand Down Expand Up @@ -437,13 +444,10 @@ def add_signature_method(self, signature_method):
def verify_request(self, request, consumer, token):
"""Verifies an api call and checks all the parameters."""

version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
# Get the access token.
token = self._get_token(oauth_request, 'access')
self._check_signature(oauth_request, consumer, token)
parameters = oauth_request.get_nonoauth_parameters()
return consumer, token, parameters
version = self._get_version(request)
self._check_signature(request, consumer, token)
parameters = request.get_nonoauth_parameters()
return parameters

def build_authenticate_header(self, realm=''):
"""Optional support for the authenticate header."""
Expand All @@ -467,13 +471,13 @@ def _get_signature_method(self, request):
signature_method = request.get_parameter('oauth_signature_method')
except:
signature_method = SIGNATURE_METHOD

try:
# Get the signature method object.
signature_method = self.signature_methods[signature_method]
except:
signature_method_names = ', '.join(self.signature_methods.keys())
raise Error('Signature method %s not supported try one of the '
'following: %s' % (signature_method, signature_method_names))
raise Error('Signature method %s not supported try one of the following: %s' % (signature_method, signature_method_names))

return signature_method

Expand All @@ -488,7 +492,7 @@ def _check_signature(self, request, consumer, token):
try:
signature = request.get_parameter('oauth_signature')
except:
raise Error('Missing signature.')
raise MissingSignature('Missing oauth_signature.')

# Validate the signature.
valid = signature_method.check(request, consumer, token, signature)
Expand All @@ -508,8 +512,7 @@ def _check_timestamp(self, timestamp):
lapsed = now - timestamp
if lapsed > self.timestamp_threshold:
raise Error('Expired timestamp: given %d and now %s has a '
'greater difference than threshold %d' %
(timestamp, now, self.timestamp_threshold))
'greater difference than threshold %d' % (timestamp, now, self.timestamp_threshold))


class Client(object):
Expand All @@ -527,15 +530,15 @@ def get_consumer(self):
def get_token(self):
return self.token

def fetch_request_token(self, oauth_request):
def fetch_request_token(self, request):
"""-> OAuthToken."""
raise NotImplementedError

def fetch_access_token(self, oauth_request):
def fetch_access_token(self, request):
"""-> OAuthToken."""
raise NotImplementedError

def access_resource(self, oauth_request):
def access_resource(self, request):
"""-> Some protected resource."""
raise NotImplementedError

Expand Down Expand Up @@ -575,10 +578,6 @@ def check(self, request, consumer, token, signature):
built = self.sign(request, consumer, token)
return built == signature

build_signature_base_string = signing_base
build_signature = sign
check_signature = check


class SignatureMethod_HMAC_SHA1(SignatureMethod):
name = 'HMAC-SHA1'
Expand Down
200 changes: 169 additions & 31 deletions tests/test_oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ def test_url(self):
req = oauth.Request(method, url2)
self.assertEquals(req.url, exp2)

def test_get_parameter(self):
url = "http://example.com"
method = "GET"
params = {'oauth_consumer' : 'asdf'}
req = oauth.Request(method, url, parameters=params)

self.assertEquals(req.get_parameter('oauth_consumer'), 'asdf')
self.assertRaises(oauth.Error, req.get_parameter, 'blah')

def test_get_nonoauth_parameters(self):

oauth_params = {
Expand Down Expand Up @@ -443,33 +452,58 @@ def test_from_token_and_callback(self):
self.assertTrue('oauth_callback' in req)
self.assertEquals(req['oauth_callback'], url)

class TestServer(unittest.TestCase):
def test_init(self):
server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()})
self.assertTrue('HMAC-SHA1' in server.signature_methods)
self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'],
oauth.SignatureMethod_HMAC_SHA1))
def test_from_consumer_and_token(self):
url = "http://sp.example.com/"

server = oauth.Server()
self.assertEquals(server.signature_methods, {})
tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
req = oauth.Request.from_consumer_and_token(con, token=tok,
http_method="GET", http_url=url)

self.assertEquals(req['oauth_token'], tok.key)
self.assertEquals(req['oauth_consumer_key'], con.key)

def _req(self):
ds = MyDataStore()
class SignatureMethod_Bad(oauth.SignatureMethod):
name = "BAD"

def signing_base(self, request, consumer, token):
return ""

def sign(self, request, consumer, token):
return "invalid-signature"


class TestServer(unittest.TestCase):
def setUp(self):
url = "http://sp.example.com/"

params = {
'oauth_version': "1.0",
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': "137131200"
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'foo': 59
}

con = ds.lookup_consumer("test-consumer-key")
tok = ds.lookup_token(con, "request", "test-request-token-key")
self.consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
self.token = oauth.Token(key="token-key", secret="token-secret")

params['oauth_token'] = tok.key
params['oauth_consumer_key'] = con.key
return oauth.Request(method="GET", url=url, parameters=params)
params['oauth_token'] = self.token.key
params['oauth_consumer_key'] = self.consumer.key
self.request = oauth.Request(method="GET", url=url, parameters=params)

signature_method = oauth.SignatureMethod_HMAC_SHA1()
self.request.sign_request(signature_method, self.consumer, self.token)

def test_init(self):
server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()})
self.assertTrue('HMAC-SHA1' in server.signature_methods)
self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'],
oauth.SignatureMethod_HMAC_SHA1))

server = oauth.Server()
self.assertEquals(server.signature_methods, {})

def test_add_signature_method(self):
server = oauth.Server()
Expand All @@ -485,27 +519,131 @@ def test_add_signature_method(self):
self.assertTrue(isinstance(res['PLAINTEXT'],
oauth.SignatureMethod_PLAINTEXT))

def test_fetch_request_token(self):
pass
def test_verify_request(self):
server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

# server = oauth.Server(data_store=MyDataStore())
# token = server.fetch_request_token(self._req())
parameters = server.verify_request(self.request, self.consumer,
self.token)

def test_bad_token_fetch_request_token(self):
pass
self.assertTrue('bar' in parameters)
self.assertTrue('foo' in parameters)
self.assertEquals(parameters['bar'], 'blerg')
self.assertEquals(parameters['foo'], 59)

class TestClient(unittest.TestCase):
pass
def test_no_version(self):
url = "http://sp.example.com/"

class TestDataStore(unittest.TestCase):
pass
params = {
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'foo': 59
}

class TestSignatureMethod(unittest.TestCase):
pass
self.consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
self.token = oauth.Token(key="token-key", secret="token-secret")

class TestSignatureMethod_HMAC_SHA1(unittest.TestCase):
pass
params['oauth_token'] = self.token.key
params['oauth_consumer_key'] = self.consumer.key
self.request = oauth.Request(method="GET", url=url, parameters=params)

signature_method = oauth.SignatureMethod_HMAC_SHA1()
self.request.sign_request(signature_method, self.consumer, self.token)

server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

parameters = server.verify_request(self.request, self.consumer,
self.token)

def test_invalid_version(self):
url = "http://sp.example.com/"

params = {
'oauth_version': '222.9922',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'foo': 59
}

class TestSignatureMethod_PLAINTEXT(unittest.TestCase):
consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")

params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)

signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)

server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

self.assertRaises(oauth.Error, server.verify_request, request,
consumer, token)

def test_invalid_signature_method(self):
url = "http://sp.example.com/"

params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'foo': 59
}

consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")

params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)

signature_method = SignatureMethod_Bad()
request.sign_request(signature_method, consumer, token)

server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

self.assertRaises(oauth.Error, server.verify_request, request,
consumer, token)

def test_missing_signature(self):
url = "http://sp.example.com/"

params = {
'oauth_version': '1.0',
'oauth_nonce': "4572616e48616d6d65724c61686176",
'oauth_timestamp': int(time.time()),
'bar': 'blerg',
'foo': 59
}

consumer = oauth.Consumer(key="consumer-key",
secret="consumer-secret")
token = oauth.Token(key="token-key", secret="token-secret")

params['oauth_token'] = token.key
params['oauth_consumer_key'] = consumer.key
request = oauth.Request(method="GET", url=url, parameters=params)

signature_method = oauth.SignatureMethod_HMAC_SHA1()
request.sign_request(signature_method, consumer, token)
del request['oauth_signature']

server = oauth.Server()
server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

self.assertRaises(oauth.MissingSignature, server.verify_request,
request, consumer, token)


class TestClient(unittest.TestCase):
pass

0 comments on commit 8a25b66

Please sign in to comment.