Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-28867: Warn when NamedTemporaryFile is not explicitly closed #1936

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions Lib/distutils/tests/test_upload.py
Expand Up @@ -198,6 +198,8 @@ def test_wrong_exception_order(self):
results = self.get_logs(ERROR)
self.assertIn(expected, results[-1])
self.clear_logs()
if hasattr(exception, 'close'):
exception.close()


def test_suite():
Expand Down
30 changes: 18 additions & 12 deletions Lib/tempfile.py
Expand Up @@ -428,23 +428,29 @@ def __init__(self, file, name, delete=True):
# __del__ is called.

def close(self, unlink=_os.unlink):
if not self.close_called and self.file is not None:
if not self.close_called:
self.close_called = True
try:
self.file.close()
finally:
if self.delete:
unlink(self.name)

# Need to ensure the file is deleted on __del__
def __del__(self):
self.close()

if self.file is not None:
try:
self.file.close()
finally:
if self.delete:
unlink(self.name)
else:
def close(self):
if not self.close_called:
self.close_called = True
self.file.close()
if self.file is not None:
self.file.close()

# Need to ensure the file is deleted on __del__
def __del__(self):
if not self.close_called:
_warnings.warn(
'unclosed file %r' % self, ResourceWarning,
stacklevel=2,
source=self)
self.close()


class _TemporaryFileWrapper:
Expand Down
9 changes: 9 additions & 0 deletions Lib/test/test_tempfile.py
Expand Up @@ -965,6 +965,15 @@ def test_bad_mode(self):
tempfile.NamedTemporaryFile(mode=2, dir=dir)
self.assertEqual(os.listdir(dir), [])

def test_resource_warning_on_del(self):
def f():
tempfile.NamedTemporaryFile()
with warnings.catch_warnings(record=True) as w:
f()
self.assertEqual(len(w), 1)
self.assertEqual(w[-1].category, ResourceWarning)
self.assertIn('unclosed file', str(w[-1].message))

# How to test the mode and bufsize parameters?

class TestSpooledTemporaryFile(BaseTestCase):
Expand Down
78 changes: 45 additions & 33 deletions Lib/test/test_urllib.py
Expand Up @@ -343,11 +343,11 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
def check_read(self, ver):
self.fakehttp(b"HTTP/" + ver + b" 200 OK\r\n\r\nHello!")
try:
fp = urlopen("http://python.org/")
self.assertEqual(fp.readline(), b"Hello!")
self.assertEqual(fp.readline(), b"")
self.assertEqual(fp.geturl(), 'http://python.org/')
self.assertEqual(fp.getcode(), 200)
with urlopen("http://python.org/") as fp:
self.assertEqual(fp.readline(), b"Hello!")
self.assertEqual(fp.readline(), b"")
self.assertEqual(fp.geturl(), 'http://python.org/')
self.assertEqual(fp.getcode(), 200)
finally:
self.unfakehttp()

Expand All @@ -364,8 +364,8 @@ def test_url_fragment(self):
def test_willclose(self):
self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!")
try:
resp = urlopen("http://www.python.org")
self.assertTrue(resp.fp.will_close)
with urlopen("http://www.python.org") as resp:
self.assertTrue(resp.fp.will_close)
finally:
self.unfakehttp()

Expand All @@ -391,8 +391,8 @@ def test_url_with_control_char_rejected(self):
InvalidURL, f"contain control.*{escaped_char_repr}"):
urllib.request.urlopen(f"https:{schemeless_url}")
# This code path quotes the URL so there is no injection.
resp = urlopen(f"http:{schemeless_url}")
self.assertNotIn(char, resp.geturl())
with urlopen(f"http:{schemeless_url}") as resp:
self.assertNotIn(char, resp.geturl())
finally:
self.unfakehttp()

Expand All @@ -415,10 +415,10 @@ def test_url_with_newline_header_injection_rejected(self):
with self.assertRaisesRegex(InvalidURL, r"contain control.*\\n"):
urllib.request.urlopen(f"https:{schemeless_url}")
# This code path quotes the URL so there is no injection.
resp = urlopen(f"http:{schemeless_url}")
self.assertNotIn(' ', resp.geturl())
self.assertNotIn('\r', resp.geturl())
self.assertNotIn('\n', resp.geturl())
with urlopen(f"http:{schemeless_url}") as resp:
self.assertNotIn(' ', resp.geturl())
self.assertNotIn('\r', resp.geturl())
self.assertNotIn('\n', resp.geturl())
finally:
self.unfakehttp()

Expand All @@ -442,7 +442,10 @@ def test_read_bogus(self):
Content-Type: text/html; charset=iso-8859-1
''', mock_close=True)
try:
self.assertRaises(OSError, urlopen, "http://python.org/")
with self.assertRaises(OSError) as cm:
with urlopen("http://python.org/"):
pass
cm.exception.close()
finally:
self.unfakehttp()

Expand All @@ -457,8 +460,9 @@ def test_invalid_redirect(self):
''', mock_close=True)
try:
msg = "Redirection to url 'file:"
with self.assertRaisesRegex(urllib.error.HTTPError, msg):
with self.assertRaisesRegex(urllib.error.HTTPError, msg) as cm:
urlopen("http://python.org/")
cm.exception.close()
finally:
self.unfakehttp()

Expand All @@ -471,8 +475,9 @@ def test_redirect_limit_independent(self):
Connection: close
''', mock_close=True)
try:
self.assertRaises(urllib.error.HTTPError, urlopen,
"http://something")
with self.assertRaises(urllib.error.HTTPError) as cm:
urlopen("http://something")
cm.exception.close()
finally:
self.unfakehttp()

Expand Down Expand Up @@ -524,18 +529,19 @@ def test_ftp_cache_pruning(self):
self.fakeftp()
try:
urllib.request.ftpcache['test'] = urllib.request.ftpwrapper('user', 'pass', 'localhost', 21, [])
urlopen('ftp://localhost')
with urlopen('ftp://localhost'):
pass
finally:
self.unfakeftp()

def test_userpass_inurl(self):
self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!")
try:
fp = urlopen("http://user:pass@python.org/")
self.assertEqual(fp.readline(), b"Hello!")
self.assertEqual(fp.readline(), b"")
self.assertEqual(fp.geturl(), 'http://user:pass@python.org/')
self.assertEqual(fp.getcode(), 200)
with urlopen("http://user:pass@python.org/") as fp:
self.assertEqual(fp.readline(), b"Hello!")
self.assertEqual(fp.readline(), b"")
self.assertEqual(fp.geturl(), 'http://user:pass@python.org/')
self.assertEqual(fp.getcode(), 200)
finally:
self.unfakehttp()

Expand All @@ -547,14 +553,14 @@ def test_userpass_inurl_w_spaces(self):
fakehttp_wrapper = http.client.HTTPConnection
authorization = ("Authorization: Basic %s\r\n" %
b64encode(userpass.encode("ASCII")).decode("ASCII"))
fp = urlopen(url)
# The authorization header must be in place
self.assertIn(authorization, fakehttp_wrapper.buf.decode("UTF-8"))
self.assertEqual(fp.readline(), b"Hello!")
self.assertEqual(fp.readline(), b"")
# the spaces are quoted in URL so no match
self.assertNotEqual(fp.geturl(), url)
self.assertEqual(fp.getcode(), 200)
with urlopen(url) as fp:
# The authorization header must be in place
self.assertIn(authorization, fakehttp_wrapper.buf.decode("UTF-8"))
self.assertEqual(fp.readline(), b"Hello!")
self.assertEqual(fp.readline(), b"")
# the spaces are quoted in URL so no match
self.assertNotEqual(fp.geturl(), url)
self.assertEqual(fp.getcode(), 200)
finally:
self.unfakehttp()

Expand Down Expand Up @@ -606,6 +612,11 @@ def setUp(self):
self.text_url_base64)
self.image_url_resp = urllib.request.urlopen(self.image_url)

def tearDown(self):
self.text_url_resp.close()
self.text_url_base64_resp.close()
self.image_url_resp.close()

def test_interface(self):
# Make sure object returned by urlopen() has the specified methods
for attr in ("read", "readline", "readlines",
Expand All @@ -620,8 +631,9 @@ def test_info(self):
[('text/plain', ''), ('charset', 'ISO-8859-1')])
self.assertEqual(self.image_url_resp.info()['content-length'],
str(len(self.image)))
self.assertEqual(urllib.request.urlopen("data:,").info().get_params(),
[('text/plain', ''), ('charset', 'US-ASCII')])
with urllib.request.urlopen("data:,") as r:
self.assertEqual(r.info().get_params(),
[('text/plain', ''), ('charset', 'US-ASCII')])

def test_geturl(self):
self.assertEqual(self.text_url_resp.geturl(), self.text_url)
Expand Down
56 changes: 32 additions & 24 deletions Lib/test/test_urllib2.py
Expand Up @@ -746,18 +746,18 @@ def connect_ftp(self, user, passwd, host, port, dirs,
]:
req = Request(url)
req.timeout = None
r = h.ftp_open(req)
# ftp authentication not yet implemented by FTPHandler
self.assertEqual(h.user, user)
self.assertEqual(h.passwd, passwd)
self.assertEqual(h.host, socket.gethostbyname(host))
self.assertEqual(h.port, port)
self.assertEqual(h.dirs, dirs)
self.assertEqual(h.ftpwrapper.filename, filename)
self.assertEqual(h.ftpwrapper.filetype, type_)
headers = r.info()
self.assertEqual(headers.get("Content-type"), mimetype)
self.assertEqual(int(headers["Content-length"]), len(data))
with h.ftp_open(req) as r:
# ftp authentication not yet implemented by FTPHandler
self.assertEqual(h.user, user)
self.assertEqual(h.passwd, passwd)
self.assertEqual(h.host, socket.gethostbyname(host))
self.assertEqual(h.port, port)
self.assertEqual(h.dirs, dirs)
self.assertEqual(h.ftpwrapper.filename, filename)
self.assertEqual(h.ftpwrapper.filetype, type_)
headers = r.info()
self.assertEqual(headers.get("Content-type"), mimetype)
self.assertEqual(int(headers["Content-length"]), len(data))

def test_file(self):
import email.utils
Expand Down Expand Up @@ -1172,7 +1172,8 @@ def test_redirect(self):
try:
method(req, MockFile(), code, "Blah",
MockHeaders({"location": to_url}))
except urllib.error.HTTPError:
except urllib.error.HTTPError as e:
e.close()
# 307 in response to POST requires user OK
self.assertEqual(code, 307)
self.assertIsNotNone(data)
Expand Down Expand Up @@ -1211,7 +1212,8 @@ def redirect(h, req, url=to_url):
while 1:
redirect(h, req, "http://example.com/")
count = count + 1
except urllib.error.HTTPError:
except urllib.error.HTTPError as e:
e.close()
# don't stop until max_repeats, because cookies may introduce state
self.assertEqual(count, urllib.request.HTTPRedirectHandler.max_repeats)

Expand All @@ -1223,7 +1225,8 @@ def redirect(h, req, url=to_url):
while 1:
redirect(h, req, "http://example.com/%d" % count)
count = count + 1
except urllib.error.HTTPError:
except urllib.error.HTTPError as e:
e.close()
self.assertEqual(count,
urllib.request.HTTPRedirectHandler.max_redirections)

Expand All @@ -1239,9 +1242,11 @@ def test_invalid_redirect(self):

for scheme in invalid_schemes:
invalid_url = scheme + '://' + schemeless_url
self.assertRaises(urllib.error.HTTPError, h.http_error_302,
with self.assertRaises(urllib.error.HTTPError) as cm:
h.http_error_302(
req, MockFile(), 302, "Security Loophole",
MockHeaders({"location": invalid_url}))
cm.exception.close()

for scheme in valid_schemes:
valid_url = scheme + '://' + schemeless_url
Expand Down Expand Up @@ -1774,14 +1779,17 @@ def test_HTTPError_interface(self):
url = code = fp = None
hdrs = 'Content-Length: 42'
err = urllib.error.HTTPError(url, code, msg, hdrs, fp)
self.assertTrue(hasattr(err, 'reason'))
self.assertEqual(err.reason, 'something bad happened')
self.assertTrue(hasattr(err, 'headers'))
self.assertEqual(err.headers, 'Content-Length: 42')
expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg)
self.assertEqual(str(err), expected_errmsg)
expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg)
self.assertEqual(repr(err), expected_errmsg)
try:
self.assertTrue(hasattr(err, 'reason'))
self.assertEqual(err.reason, 'something bad happened')
self.assertTrue(hasattr(err, 'headers'))
self.assertEqual(err.headers, 'Content-Length: 42')
expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg)
self.assertEqual(str(err), expected_errmsg)
expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg)
self.assertEqual(repr(err), expected_errmsg)
finally:
err.close()

def test_parse_proxy(self):
parse_proxy_test_cases = [
Expand Down
16 changes: 9 additions & 7 deletions Lib/test/test_urllib2_localnet.py
Expand Up @@ -312,7 +312,9 @@ def test_basic_auth_httperror(self):
ah = urllib.request.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
urllib.request.install_opener(urllib.request.build_opener(ah))
self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
with self.assertRaises(urllib.error.HTTPError) as cm:
urllib.request.urlopen(self.server_url)
cm.exception.close()


class ProxyAuthTests(unittest.TestCase):
Expand Down Expand Up @@ -358,15 +360,15 @@ def test_proxy_with_bad_password_raises_httperror(self):
self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD+"bad")
self.digest_auth_handler.set_qop("auth")
self.assertRaises(urllib.error.HTTPError,
self.opener.open,
self.URL)
with self.assertRaises(urllib.error.HTTPError) as cm:
self.opener.open(self.URL)
cm.exception.close()

def test_proxy_with_no_password_raises_httperror(self):
self.digest_auth_handler.set_qop("auth")
self.assertRaises(urllib.error.HTTPError,
self.opener.open,
self.URL)
with self.assertRaises(urllib.error.HTTPError) as cm:
self.opener.open(self.URL)
cm.exception.close()

def test_proxy_qop_auth_works(self):
self.proxy_digest_handler.add_password(self.REALM, self.URL,
Expand Down
21 changes: 10 additions & 11 deletions Lib/test/test_urllib_response.py
Expand Up @@ -40,21 +40,20 @@ def closehook():
self.assertTrue(closehook_called)

def test_addinfo(self):
info = urllib.response.addinfo(self.fp, self.test_headers)
self.assertEqual(info.info(), self.test_headers)
self.assertEqual(info.headers, self.test_headers)
with urllib.response.addinfo(self.fp, self.test_headers) as info:
self.assertEqual(info.info(), self.test_headers)
self.assertEqual(info.headers, self.test_headers)

def test_addinfourl(self):
url = "http://www.python.org"
code = 200
infourl = urllib.response.addinfourl(self.fp, self.test_headers,
url, code)
self.assertEqual(infourl.info(), self.test_headers)
self.assertEqual(infourl.geturl(), url)
self.assertEqual(infourl.getcode(), code)
self.assertEqual(infourl.headers, self.test_headers)
self.assertEqual(infourl.url, url)
self.assertEqual(infourl.status, code)
with urllib.response.addinfourl(self.fp, self.test_headers, url, code) as infourl:
self.assertEqual(infourl.info(), self.test_headers)
self.assertEqual(infourl.geturl(), url)
self.assertEqual(infourl.getcode(), code)
self.assertEqual(infourl.headers, self.test_headers)
self.assertEqual(infourl.url, url)
self.assertEqual(infourl.status, code)

def tearDown(self):
self.sock.close()
Expand Down