Skip to content

Commit

Permalink
refactor fixes, force flag, logging updates
Browse files Browse the repository at this point in the history
- Added the new --force flag to auto-accept prompts
- Fixed a few import errors after refactor
- added initial changelog
- stubbed out some printing fixes that will be completed once ingest status services are live
  • Loading branch information
dkleissa committed Feb 6, 2017
1 parent d15fd5f commit 1b69f08
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 83 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Boss Ingest Client Changelog


## 0.9.0

###Implemented Enhancements:

* Added CHANGELOG.md
* **IMPORTANT** Refactored client for pip installation capability
- refactored `ingest` library to `bossingest`
- refactored `client.py` to `boss-ingest`
- Note: existing ingest job configuration files that were loading built-in plugins from the boss-ingest package will need to be updated from `ingest` to `bossingest`
* Added pip install capability (`pip install boss-ingest`
* Added `--force` `-f` flag to automatically accept all prompts (useful for launching on remote nodes)
* Added initial feedback for job progress
* Added `-log_level` `-v` flag to indicate desired log level. Defaults to `warning`
* Added additional feedback on the job that is about to be created when starting a new job

###Fixed Bugs:
* Reduced excessive printing

###Merged Pull Requests:
52 changes: 29 additions & 23 deletions boss-ingest
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import multiprocessing as mp
import os
import time
import logging
from ingest.utils.log import always_log_info
from ingest.utils.console import print_estimated_job
from bossingest.utils.log import always_log_info
from bossingest.utils.console import print_estimated_job



def get_confirmation(prompt):
def get_confirmation(prompt, force=False):
"""Method to confirm decisions
Args:
Expand All @@ -39,19 +38,22 @@ def get_confirmation(prompt):
Returns:
(bool): True indicating yes, False indicating no
"""
decision = False
while True:
confirm = input("{} (y/n): ".format(prompt))
if confirm.lower() == "y":
decision = True
break
elif confirm.lower() == "n":
decision = False
break
else:
print("Enter 'y' or 'n' for 'yes' or 'no'")

return decision
if not force:
decision = False
while True:
confirm = input("{} (y/n): ".format(prompt))
if confirm.lower() == "y":
decision = True
break
elif confirm.lower() == "n":
decision = False
break
else:
print("Enter 'y' or 'n' for 'yes' or 'no'")

return decision
else:
return True


def worker_process_run(config_file, api_token, job_id, pipe):
Expand Down Expand Up @@ -111,6 +113,10 @@ def main():
action="store_true",
default=None,
help="Flag indicating if you'd like to cancel (and remove) an ingest job. This will not delete data already ingested, but will prevent continuing this ingest job.")
parser.add_argument("--force", "-f",
action="store_true",
default=False,
help="Flag indicating if you'd like ignore all confirmation prompts.")
parser.add_argument("--processes_nb", "-p", type=int,
default=1,
help="The number of client processes that will upload the images of the ingest job.")
Expand All @@ -137,7 +143,7 @@ def main():
print("Error: You must provide an ingest job ID to cancel")
sys.exit(1)

if not get_confirmation("Are you sure you want to cancel ingest job {}? ".format(args.job_id)):
if not get_confirmation("Are you sure you want to cancel ingest job {}? ".format(args.job_id), args.force):
print("Command ignored. Job not cancelled")
sys.exit(0)

Expand Down Expand Up @@ -185,7 +191,7 @@ def main():
print("Error: You must provide an ingest job ID to cancel")
sys.exit(1)

if not get_confirmation("Are you sure you want to cancel ingest job {}? ".format(args.job_id)):
if not get_confirmation("Are you sure you want to cancel ingest job {}? ".format(args.job_id), args.force):
print("Command ignored. Job not cancelled")
sys.exit(0)

Expand All @@ -200,13 +206,13 @@ def main():
# Creating a new session - make sure the user wants to do this.
print_estimated_job(args.config_file)
print("\n")
if not get_confirmation("Would you like to create a NEW ingest job?"):
if not get_confirmation("Would you like to create a NEW ingest job?", args.force):
# Don't want to create a new job
print("Exiting")
sys.exit(0)
else:
# Resuming a session - make sure the user wants to do this.
if not get_confirmation("Are you sure you want to resume ingest job {}?".format(args.job_id)):
if not get_confirmation("Are you sure you want to resume ingest job {}?".format(args.job_id), args.force):
# Don't want to resume
print("Exiting")
sys.exit(0)
Expand All @@ -215,7 +221,7 @@ def main():
question_msgs = engine.setup()
if question_msgs:
for msg in question_msgs:
if not get_confirmation(msg):
if not get_confirmation(msg, args.force):
print("Ingest job cancelled")
sys.exit(0)

Expand All @@ -225,7 +231,7 @@ def main():
always_log_info("Successfully Created Ingest Job ID: {}".format(engine.ingest_job_id))
always_log_info("Note: You need this ID to continue this job later!")

if not get_confirmation("Do you want to start uploading now?"):
if not get_confirmation("Do you want to start uploading now?", args.force):
print("OK - Your job is waiting for you. You can resume by providing Ingest Job ID '{}' to the client".format(engine.ingest_job_id))
sys.exit(0)

Expand Down
10 changes: 4 additions & 6 deletions bossingest/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,11 @@ def join(self, ingest_job_id):
else:
result = r.json()
job_status = int(result['ingest_job']["status"])
wp.print_msg("Waiting for ingest job to be created")
wp.print_msg("(pid={}) Waiting for ingest job to be created".format(os.getpid()))
if job_status == 0:
time.sleep(5)
else:
wp.finished()
always_log_info("Ingest Job ready for uploading")

creds = result["credentials"]
queue = result["ingest_job"]["upload_queue"]
tile_bucket = result["tile_bucket_name"]
Expand Down Expand Up @@ -433,12 +431,12 @@ def get_task(self, num_messages=1):
msg = self.queue.receive_messages(MaxNumberOfMessages=1, WaitTimeSeconds=1)
break
except botocore.exceptions.ClientError as e:
print("Waiting for credentials to be valid")
print("(pid={}) Waiting for credentials to be valid".format(os.getpid()))
try_cnt += 1
time.sleep(5)
time.sleep(15)

if try_cnt >= 20:
raise Exception("Credentials failed to be come valid")
raise Exception("(pid={}) Credentials failed to be come valid".format(os.getpid()))

if msg:
return msg[0].message_id, msg[0].receipt_handle, json.loads(msg[0].body)
Expand Down
88 changes: 37 additions & 51 deletions bossingest/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,10 @@
from ..utils.log import always_log_info
import os
from math import floor
from tqdm import tqdm
import random
import contextlib
import sys


class DummyTqdmFile(object):
"""Dummy file-like that will write to tqdm"""
file = None

def __init__(self, file):
self.file = file

def write(self, x):
# Avoid print() second call (useless \n)
if len(x.rstrip()) > 0:
tqdm.write(x, file=self.file)


@contextlib.contextmanager
def stdout_redirect_to_tqdm():
save_stdout = sys.stdout
try:
sys.stdout = DummyTqdmFile(sys.stdout)
yield save_stdout
# Relay exceptions
except Exception as exc:
raise exc
# Always restore sys.stdout if necessary
finally:
sys.stdout = save_stdout

from .config import Configuration, ConfigFileError
from collections import deque


class Engine(object):
def __init__(self, config_file=None, backend_api_token=None, ingest_job_id=None):
Expand Down Expand Up @@ -199,27 +170,42 @@ def monitor(self):
Returns:
None
"""
with stdout_redirect_to_tqdm() as save_stdout:
# tqdm call need to specify sys.stdout, not sys.stderr (default)
# and dynamic_ncols=True to autodetect console width
pbar = tqdm(desc="Upload Progress", unit='tiles', unit_scale=True, miniters=1, total=self.tile_count,
file=save_stdout, dynamic_ncols=True)
while True:
logger = logging.getLogger('ingest-client')

if (datetime.datetime.now() - self.credential_create_time).total_seconds() > self.backend.credential_timeout:
logger.warning("(pid={}) Credentials are expiring soon, attempting to renew credentials".format(os.getpid()))
self.join()
always_log_info("(pid={}) Credentials refreshed successfully".format(os.getpid()))

# monitor and loop
num_tasks = self.backend.get_num_tasks()
if not num_tasks:
num_tasks = random.randint(2, self.tile_count)
logger = logging.getLogger('ingest-client')
tile_rate_samples = deque(maxlen=6)
last_task_count = None
start_time = time.time()
print_time = time.time()
avg_tile_rate = 0
while True:
if (datetime.datetime.now() - self.credential_create_time).total_seconds() > self.backend.credential_timeout:
logger.warning("(pid={}) Credentials are expiring soon, attempting to renew credentials".format(os.getpid()))
self.join()
always_log_info("(pid={}) Credentials refreshed successfully".format(os.getpid()))

# monitor and loop
num_tasks = self.backend.get_num_tasks()
if num_tasks:
if not last_task_count:
last_task_count = num_tasks
continue

tile_rate_samples.append(last_task_count - num_tasks)

avg_tile_rate = sum(tile_rate_samples) / float(len(tile_rate_samples))

if (time.time() - print_time) > 30:
print_time = time.time()
# Print an update every 30 seconds
if num_tasks:
# Update bar
pbar.update(num_tasks)
time.sleep(3)
always_log_info("Uploading {:.2f} tiles/min - Approx {:d} tiles remaining - Elapsed time {:.2f} minutes".format(avg_tile_rate * 5,
num_tasks,
(time.time() - start_time) / 60))

else:
always_log_info("Uploading in progress: Elapsed time {:.2f} minutes".format((time.time() - start_time) / 60))

# Wait to loop
time.sleep(5)

def run(self):
"""Method to run the upload loop
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ moto==0.4.25
Pillow==3.3.1
numpy==1.11.1
nose2==0.6.5
tqdm
#tqdm
8 changes: 6 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# twine upload dist/*


__version__ = '0.9.2'
__version__ = '0.9.0'

here = path.abspath(path.dirname(__file__))

Expand Down Expand Up @@ -47,6 +47,8 @@
classifiers=[
'Development Status :: 4 - Beta',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 2.7',
],
keywords=[
'brain',
Expand All @@ -60,6 +62,8 @@
'calcium',
'database',
'boss',
'microns'
'microns',
'iarpa',
'jhu'
]
)

0 comments on commit 1b69f08

Please sign in to comment.