Skip to content

Commit

Permalink
Merge 5c331fc into 629e680
Browse files Browse the repository at this point in the history
  • Loading branch information
kindly committed Apr 15, 2020
2 parents 629e680 + 5c331fc commit b4e9528
Showing 1 changed file with 46 additions and 19 deletions.
65 changes: 46 additions & 19 deletions ocdskingfisherprocess/cli/commands/transform_collections.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import concurrent.futures
import datetime
import logging
import os
Expand All @@ -12,12 +13,38 @@


class TransformCollectionsCLICommand(ocdskingfisherprocess.cli.commands.base.CLICommand):
command = 'transform-collections'
command = "transform-collections"

def configure_subparser(self, subparser):
subparser.add_argument("--runforseconds",
help="Run for this many seconds only.")

subparser.add_argument("--threads",
help="Amount of threads to use", type=int, default=1)

def run_collection(self, collection, run_until_timestamp, args):
# Early return?
if run_until_timestamp and run_until_timestamp < datetime.datetime.utcnow().timestamp():
return

if collection.transform_type:
if not args.quiet:
print("Collection " + str(collection.database_id))
transform = get_transform_instance(
collection.transform_type,
self.config,
self.database,
collection,
run_until_timestamp=run_until_timestamp,
)
try:
transform.process()
except Exception as e:
traceback.print_tb(e.__traceback__)
with sentry_sdk.push_scope() as scope:
scope.set_tag("transform_collection", collection.database_id)
sentry_sdk.capture_exception(e)

def run_command(self, args):
logger = logging.getLogger('ocdskingfisher.cli.transform-collections')
logger.info("Starting command")
Expand All @@ -32,24 +59,24 @@ def exitfunc():

Timer(run_for_seconds + 60, exitfunc).start()

for collection in self.database.get_all_collections():
if collection.transform_type:
if not args.quiet:
print("Collection " + str(collection.database_id))
logger.info("Starting to transform collection " + str(collection.database_id))
transform = get_transform_instance(collection.transform_type, self.config, self.database,
collection, run_until_timestamp=run_until_timestamp)
try:
transform.process()
except Exception as e:
traceback.print_tb(e.__traceback__)
with sentry_sdk.push_scope() as scope:
scope.set_tag('transform_collection', collection.database_id)
sentry_sdk.capture_exception(e)

# Early return?
if run_until_timestamp and run_until_timestamp < datetime.datetime.utcnow().timestamp():
break
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = [
executor.submit(
self.run_collection, collection, run_until_timestamp, args
)
for collection in self.database.get_all_collections()
# Lets keep number of possible threads low!
# Only if is a transform and works needs doing
# [ There are more things than just "not collection.store_end_at" to check
# to work out if "works needs doing" but
# A) We don't want to duplicate lots of them here
# B) They vary by type and are complex
# C) "not collection.store_end_at" should catch a lot of collections, that will do us for now ]
if collection.transform_type and not collection.store_end_at
]

for future in concurrent.futures.as_completed(futures):
continue

# If the code above took less than 60 seconds the process will stay open, waiting for the Timer to execute.
# So just kill it to make sure.
Expand Down

0 comments on commit b4e9528

Please sign in to comment.