Skip to content
Permalink
Browse files
Fix Issue #8797: Raise HTTPError on failed Basic Authentication immed…
…iately. Initial patch by Sam Bull.
  • Loading branch information
orsenthil committed Aug 20, 2014
1 parent c1a723a commit 783737625d1f06f78b53eee6331d2f428ffe4d27
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 18 deletions.
@@ -1,3 +1,4 @@
import base64
import os
import email
import urllib.parse
@@ -197,6 +198,50 @@ def handle_request(self, request_handler):
return self._return_auth_challenge(request_handler)
return True


class BasicAuthHandler(http.server.BaseHTTPRequestHandler):
"""Handler for performing basic authentication."""
# Server side values
USER = 'testUser'
PASSWD = 'testPass'
REALM = 'Test'
USER_PASSWD = "%s:%s" % (USER, PASSWD)
ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')

def __init__(self, *args, **kwargs):
http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)

def log_message(self, format, *args):
# Suppress console log message
pass

def do_HEAD(self):
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()

def do_AUTHHEAD(self):
self.send_response(401)
self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
self.send_header("Content-type", "text/html")
self.end_headers()

def do_GET(self):
if not self.headers.get("Authorization", ""):
self.do_AUTHHEAD()
self.wfile.write(b"No Auth header received")
elif self.headers.get(
"Authorization", "") == "Basic " + self.ENCODED_AUTH:
self.send_response(200)
self.end_headers()
self.wfile.write(b"It works")
else:
# Request Unauthorized
self.do_AUTHHEAD()
self.wfile.close()



# Proxy test infrastructure

class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
@@ -232,6 +277,43 @@ def do_GET(self):

# Test cases

@unittest.skipUnless(threading, "Threading required for this test.")
class BasicAuthTests(unittest.TestCase):
USER = "testUser"
PASSWD = "testPass"
INCORRECT_PASSWD = "Incorrect"
REALM = "Test"

def setUp(self):
super(BasicAuthTests, self).setUp()
# With Basic Authentication
def http_server_with_basic_auth_handler(*args, **kwargs):
return BasicAuthHandler(*args, **kwargs)
self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
self.server_url = 'http://127.0.0.1:%s' % self.server.port
self.server.start()
self.server.ready.wait()

def tearDown(self):
self.server.stop()
super(BasicAuthTests, self).tearDown()

def test_basic_auth_success(self):
ah = urllib.request.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
urllib.request.install_opener(urllib.request.build_opener(ah))
try:
self.assertTrue(urllib.request.urlopen(self.server_url))
except urllib.error.HTTPError:
self.fail("Basic auth failed for the url: %s", self.server_url)

def test_basic_auth_httperror(self):
ah = urllib.request.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
urllib.request.install_opener(urllib.request.build_opener(ah))
self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)


@unittest.skipUnless(threading, "Threading required for this test.")
class ProxyAuthTests(unittest.TestCase):
URL = "http://localhost"
@@ -245,6 +327,7 @@ def setUp(self):
self.digest_auth_handler = DigestAuthHandler()
self.digest_auth_handler.set_users({self.USER: self.PASSWD})
self.digest_auth_handler.set_realm(self.REALM)
# With Digest Authentication.
def create_fake_proxy_handler(*args, **kwargs):
return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)

@@ -846,24 +846,13 @@ def __init__(self, password_mgr=None):
password_mgr = HTTPPasswordMgr()
self.passwd = password_mgr
self.add_password = self.passwd.add_password
self.retried = 0

def reset_retry_count(self):
self.retried = 0

def http_error_auth_reqed(self, authreq, host, req, headers):
# host may be an authority (without userinfo) or a URL with an
# authority
# XXX could be multiple headers
authreq = headers.get(authreq, None)

if self.retried > 5:
# retry sending the username:password 5 times before failing.
raise HTTPError(req.get_full_url(), 401, "basic auth failed",
headers, None)
else:
self.retried += 1

if authreq:
scheme = authreq.split()[0]
if scheme.lower() != 'basic':
@@ -878,17 +867,14 @@ def http_error_auth_reqed(self, authreq, host, req, headers):
warnings.warn("Basic Auth Realm was unquoted",
UserWarning, 2)
if scheme.lower() == 'basic':
response = self.retry_http_basic_auth(host, req, realm)
if response and response.code != 401:
self.retried = 0
return response
return self.retry_http_basic_auth(host, req, realm)

def retry_http_basic_auth(self, host, req, realm):
user, pw = self.passwd.find_user_password(realm, host)
if pw is not None:
raw = "%s:%s" % (user, pw)
auth = "Basic " + base64.b64encode(raw.encode()).decode("ascii")
if req.headers.get(self.auth_header, None) == auth:
if req.get_header(self.auth_header, None) == auth:
return None
req.add_unredirected_header(self.auth_header, auth)
return self.parent.open(req, timeout=req.timeout)
@@ -904,7 +890,6 @@ def http_error_401(self, req, fp, code, msg, headers):
url = req.full_url
response = self.http_error_auth_reqed('www-authenticate',
url, req, headers)
self.reset_retry_count()
return response


@@ -920,7 +905,6 @@ def http_error_407(self, req, fp, code, msg, headers):
authority = req.host
response = self.http_error_auth_reqed('proxy-authenticate',
authority, req, headers)
self.reset_retry_count()
return response


@@ -33,6 +33,9 @@ Library

- Issue #22165: SimpleHTTPRequestHandler now supports undecodable file names.

- Issue #8797: Raise HTTPError on failed Basic Authentication immediately.
Initial patch by Sam Bull.

- Issue #20729: Restored the use of lazy iterkeys()/itervalues()/iteritems()
in the mailbox module.

0 comments on commit 7837376

Please sign in to comment.