Skip to content

Commit

Permalink
Merge pull request #19 from sdss/tqdm
Browse files Browse the repository at this point in the history
Adds progress bar to downloads
  • Loading branch information
havok2063 committed Jul 7, 2020
2 parents 3dd8a10 + 31ae4ab commit 67db72c
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 20 deletions.
42 changes: 32 additions & 10 deletions python/sdss_access/sync/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from datetime import datetime
from sdss_access import is_posix
from tempfile import gettempdir
from tqdm import tqdm


class Cli(object):
Expand Down Expand Up @@ -77,17 +78,38 @@ def get_background_process(self, command=None, logfile=None, errfile=None, pause
background_process = None
return background_process

def wait_for_processes(self, processes, pause=60):
def wait_for_processes(self, processes, pause=5, n_tasks=None, tasks_per_stream=None):
running_processes = [process.poll() is None for process in processes]
pause_count = 0
while any(running_processes):
running_count = sum(running_processes)
if self.verbose:
print("SDSS_ACCESS> syncing... please wait for %r rsync streams to complete [running for %r seconds]" % (running_count, pause_count * pause))
sleep(pause)
running_processes = [process.poll() is None for process in processes]
pause_count += 1
#print("SDSS_ACCESS> Done!")
postfix = {'n_files': n_tasks, 'n_streams': len(processes)} if n_tasks else {}

# set a progress bar to monitor files/streams
with tqdm(total=n_tasks, unit='files', desc='Progress', postfix=postfix) as pbar:
while any(running_processes):
running_count = sum(running_processes)
finished_files = sum([tasks_per_stream[i]
for i, r in enumerate(running_processes) if r is False])
running_files = n_tasks - finished_files

if self.verbose:
tqdm.write("SDSS_ACCESS> syncing... please wait for {0} rsync streams ({1} "
"files) to complete [running for {2} seconds]".format(running_count,
running_files,
pause_count * pause))

# update the process polling
sleep(pause)
running_processes = [process.poll() is None for process in processes]
pause_count += 1

# count the number of downloaded files
done_files = sum([tasks_per_stream[i]
for i, r in enumerate(running_processes) if r is False])
new_files = done_files - finished_files

# update the progress bar
pbar.update(new_files)

self.returncode = tuple([process.returncode for process in processes])

def foreground_run(self, command, test=False, logger=None, logall=False, message=None, outname=None, errname=None):
Expand Down Expand Up @@ -133,7 +155,7 @@ def foreground_run(self, command, test=False, logger=None, logall=False, message
if test:
return (status, out, err)

# Perform the system call
# Perform the system call
if outname is None:
outfile = TemporaryFile()
else:
Expand Down
16 changes: 10 additions & 6 deletions python/sdss_access/sync/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from os.path import isfile, exists, dirname
from sdss_access import SDSSPath
from sdss_access.sync.auth import Auth, AuthMixin
from tqdm import tqdm


class HttpAccess(AuthMixin, SDSSPath):
Expand Down Expand Up @@ -121,12 +122,15 @@ def download_url_to_path(self, url, path, force=False):

file_size_dl = 0
block_sz = 8192
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
file.write(buffer)
# set up progress bar
with tqdm(total=file_size, unit='B', unit_scale=True, unit_divisor=1024, desc='Progress') as pbar:
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
pbar.update(len(buffer))
file.write(buffer)

if self.verbose:
if path_exists:
Expand Down
14 changes: 10 additions & 4 deletions python/sdss_access/sync/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ def set_streamlet(self, index=None, location=None, source=None, destination=None
try:
n = len(location)
ok = n == len(source) and n == len(destination)
streamlet['location'], streamlet['source'], streamlet['destination'] = (location, source, destination) if ok else (None, None, None)
streamlet['location'], streamlet['source'], streamlet['destination'] = (
location, source, destination) if ok else (None, None, None)
except Exception:
streamlet['location'], streamlet['source'], streamlet['destination'] = (None, None, None)
streamlet['location'], streamlet['source'], streamlet['destination'] = (
None, None, None)

def get_streamlet(self, index=None, increment=1):
if index is None:
Expand Down Expand Up @@ -122,7 +124,7 @@ def commit_streamlet(self, streamlet=None):
streamlet['path'] = self.cli.get_path(index=streamlet['index'])
path_txt = "{0}.txt".format(streamlet['path'])
streamlet['command'] = self.command.format(path=path_txt, source=self.source, destination=self.destination)

if 'rsync -' in self.command:
self.cli.write_lines(path=path_txt, lines=[location for location in streamlet['location']])
else:
Expand All @@ -139,7 +141,11 @@ def run_streamlets(self):
if self.verbose:
print("SDSS_ACCESS> rsync stream %s logging to %s" % (streamlet['index'],streamlet['logfile'].name))

self.cli.wait_for_processes(list(streamlet['process'] for streamlet in self.streamlet))
# get the number of tasks per stream
tasks_per_stream = [len(streamlet['location']) for streamlet in self.streamlet]
# submit the stream subprocesses to the background
self.cli.wait_for_processes(list(streamlet['process'] for streamlet in self.streamlet),
n_tasks=len(self.task), tasks_per_stream=tasks_per_stream)

if any(self.cli.returncode):
path = self.streamlet[0]['path'][:-3]
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ install_requires =
requests>=2.10.0
sdss-tree>=3.0.0
sdsstools>=0.1.7
tqdm>=4.46.0

scripts =

Expand Down
7 changes: 7 additions & 0 deletions tests/sync/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ def test_svn_get(self, monkeyprod):

assert os.path.exists(full)

def test_http_public_dl(self, monkeyprod):
http = HttpAccess(release='DR14')
http.remote()
full = http.full('spec-lite', run2d='v5_10_0', plateid=3606, mjd=55182, fiberid=537)
http.get('spec-lite', run2d='v5_10_0', plateid=3606, mjd=55182, fiberid=537)
assert os.path.exists(full)

def test_nonetrc_fails(self, monkeyhome):
''' test raise error when no netrc present '''
with pytest.raises(AccessError) as cm:
Expand Down

0 comments on commit 67db72c

Please sign in to comment.