Skip to content

Commit

Permalink
Completing dataset uploader
Browse files Browse the repository at this point in the history
Fixing a bug where it wouldn't wait until concurrent uploads were <= 5
Adding a wait until all files are fully uploaded
  • Loading branch information
mtlynch committed Feb 11, 2018
1 parent b20c31a commit 0b09519
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 11 deletions.
31 changes: 20 additions & 11 deletions sia_load_tester/dataset_uploader.py
Expand Up @@ -37,15 +37,20 @@ def upload(self):
uploaded to Sia.
"""
while not self._upload_queue.empty():
if self._too_many_uploads_in_progress():
logger.info('Too many uploads in progress: %d >= %d',
self._count_uploads_in_progress(),
_MAX_CONCURRENT_UPLOADS)
self._wait()
logger.info('%d files left to upload', self._upload_queue.qsize())
self._wait_until_next_upload()
job = self._upload_queue.get()
logger.info('Uploading next file to Sia: %s', job.local_path)
self._process_upload_job_async(job)
# TODO(mtlynch): Wait for uploadprogress to reach 100 for all files.
if not self._process_upload_job_async(job):
self._upload_queue.put(job)
self._wait_until_zero_uploads_in_progress()

def _wait_until_next_upload(self):
while self._too_many_uploads_in_progress():
logger.info(('Too many uploads in progress: %d >= %d.'
' Sleeping for %d seconds'),
self._count_uploads_in_progress(),
_MAX_CONCURRENT_UPLOADS, _SLEEP_SECONDS)
self._sleep_fn(_SLEEP_SECONDS)

def _too_many_uploads_in_progress(self):
return self._count_uploads_in_progress() >= _MAX_CONCURRENT_UPLOADS
Expand All @@ -57,14 +62,18 @@ def _count_uploads_in_progress(self):
n += 1
return n

def _wait(self):
logger.info('Nothing to do. Sleeping for %d seconds', _SLEEP_SECONDS)
self._sleep_fn(_SLEEP_SECONDS)
def _wait_until_zero_uploads_in_progress(self):
while self._count_uploads_in_progress() > 0:
logger.info(
('Waiting for remaining uploads to complete.'
' %d uploads still in progress. Sleeping for %d seconds'),
self._count_uploads_in_progress(), _SLEEP_SECONDS)

def _process_upload_job_async(self, job):
"""Starts a single file upload to Sia.
Args:
job: Sia upload job to process.
"""
logger.info('Uploading file to Sia: %s', job.local_path)
return self._sia_client.upload_file_async(job.local_path, job.sia_path)
181 changes: 181 additions & 0 deletions tests/test_dataset_uploader.py
Expand Up @@ -106,3 +106,184 @@ def test_uploads_file_when_one_is_missing_from_sia(self):

self.mock_sia_api_impl.set_renter_upload.assert_called_once_with(
'c.txt', source='/dummy-path/c.txt')

def test_does_not_start_new_uploads_when_too_many_uploads_are_in_progress(
self):
dummy_dataset = dataset.Dataset('/dummy-path', [
'/dummy-path/1.txt', '/dummy-path/2.txt', '/dummy-path/3.txt',
'/dummy-path/4.txt', '/dummy-path/5.txt', '/dummy-path/6.txt',
'/dummy-path/7.txt'
])
files_state = []
# Initial mock file state.
files_state.append({
u'files': [
{
u'siapath': u'1.txt',
u'localpath': u'/dummy-path/1.txt',
u'uploadprogress': 15,
},
{
u'siapath': u'2.txt',
u'localpath': u'/dummy-path/2.txt',
u'uploadprogress': 18,
},
{
u'siapath': u'3.txt',
u'localpath': u'/dummy-path/3.txt',
u'uploadprogress': 19,
},
{
u'siapath': u'4.txt',
u'localpath': u'/dummy-path/4.txt',
u'uploadprogress': 16,
},
{
u'siapath': u'5.txt',
u'localpath': u'/dummy-path/5.txt',
u'uploadprogress': 5,
},
]
})
# File state after one upload completes.
files_state.append({
u'files': [
{
u'siapath': u'1.txt',
u'localpath': u'/dummy-path/1.txt',
u'uploadprogress': 100,
},
{
u'siapath': u'2.txt',
u'localpath': u'/dummy-path/2.txt',
u'uploadprogress': 88,
},
{
u'siapath': u'3.txt',
u'localpath': u'/dummy-path/3.txt',
u'uploadprogress': 79,
},
{
u'siapath': u'4.txt',
u'localpath': u'/dummy-path/4.txt',
u'uploadprogress': 56,
},
{
u'siapath': u'5.txt',
u'localpath': u'/dummy-path/5.txt',
u'uploadprogress': 95,
},
]
})
# File state after two uploads complete.
files_state.append({
u'files': [
{
u'siapath': u'1.txt',
u'localpath': u'/dummy-path/1.txt',
u'uploadprogress': 100,
},
{
u'siapath': u'2.txt',
u'localpath': u'/dummy-path/2.txt',
u'uploadprogress': 100,
},
{
u'siapath': u'3.txt',
u'localpath': u'/dummy-path/3.txt',
u'uploadprogress': 85,
},
{
u'siapath': u'4.txt',
u'localpath': u'/dummy-path/4.txt',
u'uploadprogress': 84,
},
{
u'siapath': u'5.txt',
u'localpath': u'/dummy-path/5.txt',
u'uploadprogress': 92,
},
{
u'siapath': u'6.txt',
u'localpath': u'/dummy-path/6.txt',
u'uploadprogress': 12,
},
]
})
# File state after all uploads complete.
files_state.append({
u'files': [
{
u'siapath': u'1.txt',
u'localpath': u'/dummy-path/1.txt',
u'uploadprogress': 100,
},
{
u'siapath': u'2.txt',
u'localpath': u'/dummy-path/2.txt',
u'uploadprogress': 100,
},
{
u'siapath': u'3.txt',
u'localpath': u'/dummy-path/3.txt',
u'uploadprogress': 100,
},
{
u'siapath': u'4.txt',
u'localpath': u'/dummy-path/4.txt',
u'uploadprogress': 100,
},
{
u'siapath': u'5.txt',
u'localpath': u'/dummy-path/5.txt',
u'uploadprogress': 100,
},
{
u'siapath': u'6.txt',
u'localpath': u'/dummy-path/6.txt',
u'uploadprogress': 100,
},
{
u'siapath': u'7.txt',
u'localpath': u'/dummy-path/7.txt',
u'uploadprogress': 100,
},
]
})
# Use a list so that mock_sleep will capture the variable. In Python 3,
# the better solution is to use the nonlocal keyword.
total_seconds_elapsed = [0]

def mock_sleep(sleep_seconds):
total_seconds_elapsed[0] += sleep_seconds

self.mock_sleep_fn = mock_sleep

# Simulate passage of time by returning a later state depending on how
# many seconds the uploader has slept.
def mock_renter_files():
if total_seconds_elapsed[0] < 40:
return files_state[0]
elif total_seconds_elapsed[0] < 60:
total_seconds_elapsed[0] += 25
return files_state[1]
elif total_seconds_elapsed[0] < 95:
total_seconds_elapsed[0] += 5
return files_state[2]
else:
return files_state[3]

self.mock_sia_api_impl.get_renter_files.side_effect = mock_renter_files

self.mock_sia_api_impl.set_renter_upload.return_value = True
queue = upload_queue.from_dataset_and_sia_client(
dummy_dataset, self.mock_sia_client)
uploader = dataset_uploader.DatasetUploader(queue, self.mock_sia_client,
self.mock_sleep_fn)

uploader.upload()

self.mock_sia_api_impl.set_renter_upload.assert_has_calls([
mock.call('6.txt', source='/dummy-path/6.txt'),
mock.call('7.txt', source='/dummy-path/7.txt')
])

0 comments on commit 0b09519

Please sign in to comment.