Skip to content

Commit

Permalink
Merge master into integration
Browse files Browse the repository at this point in the history
  • Loading branch information
movestill committed Jun 28, 2018
2 parents 672ffcc + 1816b85 commit c17d43d
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 79 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,7 @@ ENV/
.DS_Store
/token.json
/.pypirc

# Vim files
*.swp

24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
# Boss Ingest Client Changelog

## 0.9.8

### Fixed Bug:

* updated requirements.txt to be compatible with boss-manage and intern by removing pinned versions of libraries.

## 0.9.7

### Fixed Bug:

* Fixed an off by one error while looping for credentials renewal


## 0.9.6

### Fixed Bug:

* Updated to catch AccessDenied and InvalidAccessKeyId errors and then request new credentials when this occurs.

## 0.9.5

### Fixed Bug:

* The try-catch in Engine.run() was just around the ingest client’s IO to S3, but user developed plugins can be doing IO to anything and have random, transient errors as well. Moved the try statement to the top of the loop.

## 0.9.4

Expand Down
2 changes: 1 addition & 1 deletion ingestclient/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.9.6'
__version__ = '0.9.8'


def check_version():
Expand Down
162 changes: 98 additions & 64 deletions ingestclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,100 @@ def get_parser():
return parser


def start_workers(ingest_job_id, args, configuration):
"""
Start upload processes.
Args:
ingest_job_id (int):
args (Namespace): Command line arguments.
configuration(ingestclient.core.config.Configuration): A pre-loaded configuration instance.
Returns:
(list[(Process, Pipe)]): list of processes and their pipes.
"""
# Ceate worker processes
workers = []
for i in range(args.processes_nb):
new_pipe = mp.Pipe(False)
new_process = mp.Process(target=worker_process_run,
args=(args.api_token, ingest_job_id, new_pipe[0]),
kwargs={'config_file': args.config_file, 'configuration': configuration}
)
workers.append((new_process, new_pipe[1]))
new_process.start()

# Sleep to slowly ramp up load on lambda
time.sleep(.5)

return workers


def upload(engine, args, configuration, start_time):
"""
Kick off upload processes and monitor them.
Args:
engine (ingestclient.core.Engine):
args (Namespace): Command line arguments.
configuration(ingestclient.core.config.Configuration): A pre-loaded configuration instance.
start_time (float): When ingest client started in seconds since 1/1/1970 00:00.
Returns:
(bool): True if uploading not finished.
"""
workers = start_workers(engine.ingest_job_id, args, configuration)

# Start the main process engine
should_run = True
job_complete = False
while should_run:
try:
engine.monitor(workers)
# run will end if no more jobs are available, join other processes
should_run = False
job_complete = True
except KeyboardInterrupt:
# Make sure they want to stop this client
while True:
quit_uploading = input("Are you sure you want to quit uploading? (y/n)")
if quit_uploading.lower() == "y":
always_log_info("Stopping upload engine.")
should_run = False
break
elif quit_uploading.lower() == "n":
print("Continuing...")
break
else:
print("Enter 'y' or 'n' for 'yes' or 'no'")

# notify the worker processes that they should stop execution
for _, worker_pipe in workers:
worker_pipe.send(should_run)

always_log_info("Waiting for worker processes to close...\n")
time.sleep(1) # Make sure workers have cleaned up
for worker_process, worker_pipe in workers:
worker_process.join()
worker_pipe.close()

if job_complete:
# If auto-complete, mark the job as complete and cleanup
always_log_info("All upload tasks completed in {:.2f} minutes.".format((time.time() - start_time) / 60))
if not args.manual_complete:
always_log_info(" - Marking Ingest Job as complete and cleaning up. Please wait.")
if not engine.complete():
always_log_info("Unable to complete, still have chunks that aren't ingested")
return True
always_log_info(" - Cleanup Done")
else:
always_log_info(" - Auto-complete disabled. This ingest job will remain in the 'Uploading' state until you manually mark it as complete")
return False
else:
always_log_info("Client exiting")
always_log_info("Run time: {:.2f} minutes.".format((time.time() - start_time) / 60))
return False

def main(configuration=None, parser_args=None):
"""Client UI main
Expand Down Expand Up @@ -278,73 +372,13 @@ def main(configuration=None, parser_args=None):
print("OK - Your job is waiting for you. You can resume by providing Ingest Job ID '{}' to the client".format(engine.ingest_job_id))
sys.exit(0)

# Join job
engine.join()

else:
# Join job
engine.join()

# Create worker processes
workers = []
for i in range(args.processes_nb):
new_pipe = mp.Pipe(False)
new_process = mp.Process(target=worker_process_run,
args=(args.api_token, engine.ingest_job_id, new_pipe[0]),
kwargs={'config_file': args.config_file, 'configuration': configuration}
)
workers.append((new_process, new_pipe[1]))
new_process.start()

# Sleep to slowly ramp up load on lambda
time.sleep(.5)
# Join job
engine.join()

# Start the main process engine
start_time = time.time()
should_run = True
job_complete = False
while should_run:
try:
engine.monitor(workers)
# run will end if no more jobs are available, join other processes
should_run = False
job_complete = True
except KeyboardInterrupt:
# Make sure they want to stop this client
while True:
quit_uploading = input("Are you sure you want to quit uploading? (y/n)")
if quit_uploading.lower() == "y":
always_log_info("Stopping upload engine.")
should_run = False
break
elif quit_uploading.lower() == "n":
print("Continuing...")
break
else:
print("Enter 'y' or 'n' for 'yes' or 'no'")

# notify the worker processes that they should stop execution
for _, worker_pipe in workers:
worker_pipe.send(should_run)

always_log_info("Waiting for worker processes to close...\n")
time.sleep(1) # Make sure workers have cleaned up
for worker_process, worker_pipe in workers:
worker_process.join()
worker_pipe.close()
while upload(engine, args, configuration, start_time):
pass

if job_complete:
# If auto-complete, mark the job as complete and cleanup
always_log_info("All upload tasks completed in {:.2f} minutes.".format((time.time() - start_time) / 60))
if not args.manual_complete:
always_log_info(" - Marking Ingest Job as complete and cleaning up. Please wait.")
engine.complete()
always_log_info(" - Cleanup Done")
else:
always_log_info(" - Auto-complete disabled. This ingest job will remain in the 'Uploading' state until you manually mark it as complete")
else:
always_log_info("Client exiting")
always_log_info("Run time: {:.2f} minutes.".format((time.time() - start_time) / 60))


if __name__ == '__main__':
Expand Down
38 changes: 34 additions & 4 deletions ingestclient/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def complete(self, ingest_job_id):
ingest_job_id(int): The ID of the job you'd like to complete
Returns:
None
(bool): True on successful completion
"""
Expand Down Expand Up @@ -428,15 +428,45 @@ def complete(self, ingest_job_id):
ingest_job_id(int): The ID of the job you'd like to complete
Returns:
None
(bool): True if ingest complete
"""
r = requests.post('{}/{}/ingest/{}/complete'.format(self.host, self.api_version, ingest_job_id),
headers=self.api_headers, verify=self.validate_ssl)

if r.status_code != 204:
raise Exception("Failed to complete ingest job: {}".format(r.json()))
if r.status_code == 204:
return True
if r.status_code == 202:
# Not all chunks ingested.
return False

raise Exception("Failed to complete ingest job: {}".format(r.json()))

def verify(self, ingest_job_id):
"""
Method to start verification of an ingest job
Args:
ingest_job_id(int): The ID of the job you'd like to verify
Returns:
(bool): True if verified, False if there are outstanding tiles
"""
r = requests.post('{}/{}/ingest/{}/verify'.format(self.host, self.api_version, ingest_job_id),
headers=self.api_headers, verify=self.validate_ssl)

if r.status_code == 204:
return True

# Verification process ran but still have work to do.
if r.status_code == 202:
return False

# Something went wrong.
raise Exception("Failed to verify ingest job: {}".format(r.json()))

def get_task(self, num_messages=1):
"""
Expand Down
4 changes: 2 additions & 2 deletions ingestclient/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@ def complete(self):
Args:
Returns:
None
(bool): True if successfully completed job.
"""
self.backend.complete(self.ingest_job_id)
return self.backend.complete(self.ingest_job_id)

def monitor(self, workers):
"""Method to monitor the progress of the ingest job
Expand Down
10 changes: 7 additions & 3 deletions ingestclient/plugins/catmaid.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ def process(self, x_index, y_index, z_index, t_index=None):
raise IndexError("CATMAID File Image Stack format does not support non-zero time index")

if z_index < self.parameters["ingest_job"]["extent"]["z"][0] or z_index >= self.parameters["ingest_job"]["extent"]["z"][1]:
raise IndexError("Invalid Tile Z-Index: {}".format(z_index))
raise IndexError("Invalid Tile Z-Index: {} Z-Extent: {}".format(z_index, self.parameters["ingest_job"]["extent"]["z"][1]))

if x_index > self.parameters["ingest_job"]["extent"]["x"][1] / self.parameters["ingest_job"]["tile_size"]["x"] - 1:
raise IndexError("Invalid Tile X-Index: {}".format(x_index))
raise IndexError("Invalid Tile X-Index: {} X-Extent: {} X-TileSize: {}".format(x_index,
self.parameters["ingest_job"]["extent"]["x"][1],
self.parameters["ingest_job"]["tile_size"]["x"]))

if y_index > self.parameters["ingest_job"]["extent"]["y"][1] / self.parameters["ingest_job"]["tile_size"]["y"] - 1:
raise IndexError("Invalid Tile Y-Index: {}".format(y_index))
raise IndexError("Invalid Tile Y-Index: {} Y-Extent: {} Y-TileSize: {}".format(y_index,
self.parameters["ingest_job"]["extent"]["y"][1],
self.parameters["ingest_job"]["tile_size"]["y"]))

filename = "{}_{}.{}".format(y_index, x_index, self.parameters["filetype"])
return os.path.join(self.parameters["root_dir"], "{}".format(self.parameters["ingest_job"]["resolution"]), "{}".format(z_index), filename)
Expand Down
12 changes: 7 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
jsonschema==2.5.1
six==1.10.0
requests==2.11.1
jsonschema>=2.5.1
six>=1.10.0
requests>=2.11.1
responses==0.5.1
boto3>=1.4.0
moto==0.4.25
Pillow>=3.3.1
numpy==1.11.1
nose2==0.6.5
numpy>=1.11.1
nose2>=0.6.5
intern>=0.9.6

0 comments on commit c17d43d

Please sign in to comment.