Skip to content

Commit

Permalink
Close file handle after upload job
Browse files Browse the repository at this point in the history
The opened file for upload is not closed.
This fix prevents possible file handle leak.

Closes-Bug: #1559079
Change-Id: Ibc58667789e8f54c74ae2bbd32717a45f7b30550
  • Loading branch information
notok committed Mar 16, 2017
1 parent ee8620d commit 809e4cf
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 70 deletions.
57 changes: 34 additions & 23 deletions swiftclient/service.py
Expand Up @@ -1714,6 +1714,7 @@ def _upload_segment_job(conn, path, container, segment_name, segment_start,
segment_name),
'log_line': '%s segment %s' % (obj_name, segment_index),
}
fp = None
try:
fp = open(path, 'rb')
fp.seek(segment_start)
Expand Down Expand Up @@ -1761,6 +1762,9 @@ def _upload_segment_job(conn, path, container, segment_name, segment_start,
if results_queue is not None:
results_queue.put(res)
return res
finally:
if fp is not None:
fp.close()

def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
chunks = []
Expand Down Expand Up @@ -2008,29 +2012,36 @@ def _upload_object_job(self, conn, container, source, obj, options,
else:
res['large_object'] = False
obr = {}
if path is not None:
content_length = getsize(path)
contents = LengthWrapper(open(path, 'rb'),
content_length,
md5=options['checksum'])
else:
content_length = None
contents = ReadableToIterable(stream,
md5=options['checksum'])

etag = conn.put_object(
container, obj, contents,
content_length=content_length, headers=put_headers,
response_dict=obr
)
res['response_dict'] = obr

if (options['checksum'] and
etag and etag != contents.get_md5sum()):
raise SwiftError('Object upload verification failed: '
'md5 mismatch, local {0} != remote {1} '
'(remote object has not been removed)'
.format(contents.get_md5sum(), etag))
fp = None
try:
if path is not None:
content_length = getsize(path)
fp = open(path, 'rb')
contents = LengthWrapper(fp,
content_length,
md5=options['checksum'])
else:
content_length = None
contents = ReadableToIterable(stream,
md5=options['checksum'])

etag = conn.put_object(
container, obj, contents,
content_length=content_length, headers=put_headers,
response_dict=obr
)
res['response_dict'] = obr

if (options['checksum'] and
etag and etag != contents.get_md5sum()):
raise SwiftError(
'Object upload verification failed: '
'md5 mismatch, local {0} != remote {1} '
'(remote object has not been removed)'
.format(contents.get_md5sum(), etag))
finally:
if fp is not None:
fp.close()

if old_manifest or old_slo_manifest_paths:
drs = []
Expand Down
119 changes: 72 additions & 47 deletions tests/unit/test_service.py
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import unicode_literals
import contextlib
import mock
import os
import six
Expand Down Expand Up @@ -1002,7 +1003,6 @@ def test_upload_with_bad_segment_size(self):
@mock.patch('swiftclient.service.stat')
@mock.patch('swiftclient.service.getmtime', return_value=1.0)
@mock.patch('swiftclient.service.getsize', return_value=4)
@mock.patch.object(builtins, 'open', return_value=six.StringIO('asdf'))
def test_upload_with_relative_path(self, *args, **kwargs):
service = SwiftService({})
objects = [{'path': "./test",
Expand All @@ -1012,7 +1012,9 @@ def test_upload_with_relative_path(self, *args, **kwargs):
{'path': ".\\test",
'strt_indx': 2}]
for obj in objects:
with mock.patch('swiftclient.service.Connection') as mock_conn:
with mock.patch('swiftclient.service.Connection') as mock_conn, \
mock.patch.object(builtins, 'open') as mock_open:
mock_open.return_value = six.StringIO('asdf')
mock_conn.return_value.head_object.side_effect = \
ClientException('Not Found', http_status=404)
mock_conn.return_value.put_object.return_value =\
Expand All @@ -1032,10 +1034,29 @@ def test_upload_with_relative_path(self, *args, **kwargs):
self.assertEqual(upload_obj_resp['object'],
obj['path'][obj['strt_indx']:])
self.assertEqual(upload_obj_resp['path'], obj['path'])
self.assertTrue(mock_open.return_value.closed)


class TestServiceUpload(_TestServiceBase):

@contextlib.contextmanager
def assert_open_results_are_closed(self):
opened_files = []
builtin_open = builtins.open

def open_wrapper(*a, **kw):
opened_files.append((builtin_open(*a, **kw), a, kw))
return opened_files[-1][0]

with mock.patch.object(builtins, 'open', open_wrapper):
yield
for fp, args, kwargs in opened_files:
formatted_args = [repr(a) for a in args]
formatted_args.extend('%s=%r' % kv for kv in kwargs.items())
formatted_args = ', '.join(formatted_args)
self.assertTrue(fp.closed,
'Failed to close open(%s)' % formatted_args)

def test_upload_object_job_file_with_unicode_path(self):
# Uploading a file results in the file object being wrapped in a
# LengthWrapper. This test sets the options in such a way that much
Expand Down Expand Up @@ -1109,11 +1130,14 @@ def test_upload_segment_job(self):
f.write(b'c' * 10)
f.flush()

# Mock the connection to return an empty etag. This
# skips etag validation which would fail as the LengthWrapper
# isn't read from.
# run read() when put_object is called to calculate md5sum
def _consuming_conn(*a, **kw):
contents = a[2]
contents.read() # Force md5 calculation
return contents.get_md5sum()

mock_conn = mock.Mock()
mock_conn.put_object.return_value = ''
mock_conn.put_object.side_effect = _consuming_conn
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
expected_r = {
'action': 'upload_segment',
Expand All @@ -1125,21 +1149,22 @@ def test_upload_segment_job(self):
'log_line': 'test_o segment 2',
'success': True,
'response_dict': {},
'segment_etag': '',
'segment_etag': md5(b'b' * 10).hexdigest(),
'attempts': 2,
}

s = SwiftService()
r = s._upload_segment_job(conn=mock_conn,
path=f.name,
container='test_c',
segment_name='test_s_1',
segment_start=10,
segment_size=10,
segment_index=2,
obj_name='test_o',
options={'segment_container': None,
'checksum': True})
with self.assert_open_results_are_closed():
r = s._upload_segment_job(conn=mock_conn,
path=f.name,
container='test_c',
segment_name='test_s_1',
segment_start=10,
segment_size=10,
segment_index=2,
obj_name='test_o',
options={'segment_container': None,
'checksum': True})

self.assertEqual(r, expected_r)

Expand All @@ -1153,10 +1178,6 @@ def test_upload_segment_job(self):
contents = mock_conn.put_object.call_args[0][2]
self.assertIsInstance(contents, utils.LengthWrapper)
self.assertEqual(len(contents), 10)
# This read forces the LengthWrapper to calculate the md5
# for the read content.
self.assertEqual(contents.read(), b'b' * 10)
self.assertEqual(contents.get_md5sum(), md5(b'b' * 10).hexdigest())

def test_etag_mismatch_with_ignore_checksum(self):
def _consuming_conn(*a, **kw):
Expand Down Expand Up @@ -1215,16 +1236,17 @@ def _consuming_conn(*a, **kw):
type(mock_conn).attempts = mock.PropertyMock(return_value=2)

s = SwiftService()
r = s._upload_segment_job(conn=mock_conn,
path=f.name,
container='test_c',
segment_name='test_s_1',
segment_start=10,
segment_size=10,
segment_index=2,
obj_name='test_o',
options={'segment_container': None,
'checksum': True})
with self.assert_open_results_are_closed():
r = s._upload_segment_job(conn=mock_conn,
path=f.name,
container='test_c',
segment_name='test_s_1',
segment_start=10,
segment_size=10,
segment_index=2,
obj_name='test_o',
options={'segment_container': None,
'checksum': True})

self.assertIn('md5 mismatch', str(r.get('error')))

Expand Down Expand Up @@ -1259,21 +1281,29 @@ def test_upload_object_job_file(self):
}
expected_mtime = '%f' % os.path.getmtime(f.name)

# run read() when put_object is called to calculate md5sum
# md5sum is verified in _upload_object_job.
def _consuming_conn(*a, **kw):
contents = a[2]
contents.read() # Force md5 calculation
return contents.get_md5sum()

mock_conn = mock.Mock()
mock_conn.put_object.return_value = ''
mock_conn.put_object.side_effect = _consuming_conn
type(mock_conn).attempts = mock.PropertyMock(return_value=2)

s = SwiftService()
r = s._upload_object_job(conn=mock_conn,
container='test_c',
source=f.name,
obj='test_o',
options={'changed': False,
'skip_identical': False,
'leave_segments': True,
'header': '',
'segment_size': 0,
'checksum': True})
with self.assert_open_results_are_closed():
r = s._upload_object_job(conn=mock_conn,
container='test_c',
source=f.name,
obj='test_o',
options={'changed': False,
'skip_identical': False,
'leave_segments': True,
'header': '',
'segment_size': 0,
'checksum': True})

mtime = r['headers']['x-object-meta-mtime']
self.assertEqual(expected_mtime, mtime)
Expand All @@ -1292,11 +1322,6 @@ def test_upload_object_job_file(self):
contents = mock_conn.put_object.call_args[0][2]
self.assertIsInstance(contents, utils.LengthWrapper)
self.assertEqual(len(contents), 30)
# This read forces the LengthWrapper to calculate the md5
# for the read content. This also checks that LengthWrapper was
# initialized with md5=True
self.assertEqual(contents.read(), b'a' * 30)
self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())

@mock.patch('swiftclient.service.time', return_value=1400000000)
def test_upload_object_job_stream(self, time_mock):
Expand Down

0 comments on commit 809e4cf

Please sign in to comment.