Skip to content

Commit

Permalink
Merge 7662e87 into a26e758
Browse files Browse the repository at this point in the history
  • Loading branch information
richmahn committed Apr 8, 2021
2 parents a26e758 + 7662e87 commit b7f65df
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 130 deletions.
5 changes: 5 additions & 0 deletions docker-compose-tXenqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ services:
build:
context: ./tXenqueue
dockerfile: Dockerfile-developBranch

networks:
default:
external:
name: tx-net
2 changes: 1 addition & 1 deletion tXenqueue/Dockerfile-developBranch
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RUN pip3 install --requirement requirements.txt
EXPOSE 8000

# Define environment variables
ENV REDIS_HOSTNAME=172.21.0.2
ENV REDIS_HOSTNAME=door43-enqueue-job_redis_1
ENV QUEUE_PREFIX dev-
ENV DEBUG_MODE True
# NOTE: The following environment variables are expected to be set for testing:
Expand Down
2 changes: 1 addition & 1 deletion tXenqueue/check_posted_tx_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def check_posted_tx_payload(request, logger) -> Tuple[bool, Dict[str,Any]]:
or request.headers['Host'].endswith('.door43.org'):
logger.info(f"Accepted request from {request.headers['Host']}")
elif debug_mode_flag \
and request.headers['Host'] == '127.0.0.1:80':
and request.headers['Host'] in ['127.0.0.1:80', 'tx-enqueue-job_proxy_1:80', 'txproxy:80']:
logger.info(f"Accepted DEBUG request from {request.headers['Host']}")
else:
logger.error(f"No Gitea user token; rejected request from {request.headers['Host']}")
Expand Down
167 changes: 39 additions & 128 deletions tXenqueue/tx_enqueue_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
Response JSON: Contains all of the given fields from the payload, plus
success: True to indicate that the job was queued
status: 'queued'
queue_name: '(dev-)tX_webhook' or '(dev-)tX_OBS_PDF_webhook' or '(dev-)tX_other_PDF_webhook'
queue_name: '(dev-)tX_webhook'
tx_job_queued_at: current date & time
output: URL of zipfile or PDF where converted output will be able to be downloaded from
output: URL of zipfile where converted output will be able to be downloaded from
expires_at: date & time when above output link may become invalid (one day later)
eta: date & time when output is expected (5 minutes later)
tx_retry_count: 0
Expand All @@ -56,10 +56,8 @@
from tx_enqueue_helpers import get_unique_job_id


OUR_NAME = 'tX_webhook' # Becomes the (perhaps prefixed) HTML queue name (and graphite name)
OUR_NAME = 'tX_webhook' # Becomes the (perhaps prefixed) queue name (and graphite name)
# -- MUST match setup.py in tx-job-handler
OUR_OBS_PDF_NAME = 'tX_OBS_PDF_webhook'
OUR_OTHER_PDF_NAME = 'tX_other_PDF_webhook'
#CALLBACK_SUFFIX = '_callback'
DEV_PREFIX = 'dev-'

Expand Down Expand Up @@ -113,22 +111,14 @@
if prefix not in ('', DEV_PREFIX):
logger.critical(f"Unexpected prefix: '{prefix}' — expected '' or '{DEV_PREFIX}'")
if prefix:
our_adjusted_convertHTML_queue_name = prefix + OUR_NAME + QUEUE_NAME_SUFFIX # Will become our main queue name
our_adjusted_convertOBSPDF_queue_name = prefix + OUR_OBS_PDF_NAME + QUEUE_NAME_SUFFIX # Will become our main queue name
our_adjusted_convertOtherPDF_queue_name = prefix + OUR_OTHER_PDF_NAME + QUEUE_NAME_SUFFIX # Will become our main queue name
our_other_adjusted_convertHTML_queue_name = OUR_NAME + QUEUE_NAME_SUFFIX # The other queue name
our_other_adjusted_convertOBSPDF_queue_name = OUR_OBS_PDF_NAME + QUEUE_NAME_SUFFIX # The other queue name
our_other_adjusted_convertOtherPDF_queue_name = OUR_OTHER_PDF_NAME + QUEUE_NAME_SUFFIX # The other queue name
our_adjusted_convert_queue_name = prefix + OUR_NAME + QUEUE_NAME_SUFFIX # Will become our main queue name
our_other_adjusted_convert_queue_name = OUR_NAME + QUEUE_NAME_SUFFIX # The other queue name
else:
our_adjusted_convertHTML_queue_name = OUR_NAME + QUEUE_NAME_SUFFIX # Will become our main queue name
our_adjusted_convertOBSPDF_queue_name = OUR_OBS_PDF_NAME + QUEUE_NAME_SUFFIX # Will become our main queue name
our_adjusted_convertOtherPDF_queue_name = OUR_OTHER_PDF_NAME + QUEUE_NAME_SUFFIX # Will become our main queue name
our_other_adjusted_convertHTML_queue_name = DEV_PREFIX + OUR_NAME + QUEUE_NAME_SUFFIX # The other queue name
our_other_adjusted_convertOBSPDF_queue_name = DEV_PREFIX + OUR_OBS_PDF_NAME + QUEUE_NAME_SUFFIX # The other queue name
our_other_adjusted_convertOtherPDF_queue_name = DEV_PREFIX + OUR_OTHER_PDF_NAME + QUEUE_NAME_SUFFIX # The other queue name
our_adjusted_convert_queue_name = OUR_NAME + QUEUE_NAME_SUFFIX # Will become our main queue name
our_other_adjusted_convert_queue_name = DEV_PREFIX + OUR_NAME + QUEUE_NAME_SUFFIX # The other queue name
# NOTE: The prefixed version must also listen at a different port (specified in gunicorn run command)
#our_callback_name = our_adjusted_convertHTML_queue_name + CALLBACK_SUFFIX
#our_other_adjusted_callback_name = our_other_adjusted_convertHTML_queue_name + CALLBACK_SUFFIX
#our_callback_name = our_adjusted_convert_queue_name + CALLBACK_SUFFIX
#our_other_adjusted_callback_name = our_other_adjusted_convert_queue_name + CALLBACK_SUFFIX


prefix_string = f" with prefix '{prefix}'" if prefix else ""
Expand Down Expand Up @@ -197,51 +187,29 @@ def job_receiver():
Accepts POST requests and checks the (json) payload
Queues the approved jobs at redis instance at global redis_hostname:6379.
Queue name is our_adjusted_convertHTML_queue_name (may have been prefixed).
Queue name is our_adjusted_convert_queue_name (may have been prefixed).
"""
#assert request.method == 'POST'
stats_client.incr('posts.attempted')
logger.info(f"tX {'('+prefix+')' if prefix else ''} enqueue received request: {request}")

# Collect and log some helpful information for all three queues
HTML_queue = Queue(our_adjusted_convertHTML_queue_name, connection=redis_connection)
len_HTML_queue = len(HTML_queue)
stats_client.gauge('queue.HTML.length.current', len_HTML_queue)
len_HTML_failed_queue = handle_failed_queue(our_adjusted_convertHTML_queue_name)
stats_client.gauge('queue.HTML.length.failed', len_HTML_failed_queue)
OBSPDF_queue = Queue(our_adjusted_convertOBSPDF_queue_name, connection=redis_connection)
len_OBSPDF_queue = len(OBSPDF_queue)
stats_client.gauge('queue.OBSPDF.length.current', len_OBSPDF_queue)
len_OBSPDF_failed_queue = handle_failed_queue(our_adjusted_convertOBSPDF_queue_name)
stats_client.gauge('queue.OBSPDF.length.failed', len_OBSPDF_failed_queue)
otherPDF_queue = Queue(our_adjusted_convertOtherPDF_queue_name, connection=redis_connection)
len_otherPDF_queue = len(otherPDF_queue)
stats_client.gauge('queue.OBSPDF.length.current', len_otherPDF_queue)
len_otherPDF_failed_queue = handle_failed_queue(our_adjusted_convertOtherPDF_queue_name)
stats_client.gauge('queue.OBSPDF.length.failed', len_otherPDF_failed_queue)
queue = Queue(our_adjusted_convert_queue_name, connection=redis_connection)
len_queue = len(queue)
stats_client.gauge('queue.HTML.length.current', len_queue)
len_failed_queue = handle_failed_queue(our_adjusted_convert_queue_name)
stats_client.gauge('queue.HTML.length.failed', len_failed_queue)

# Find out how many workers we have
total_worker_count = Worker.count(connection=redis_connection)
logger.debug(f"Total rq workers = {total_worker_count}")
queue1_worker_count = Worker.count(queue=HTML_queue)
logger.debug(f"Our {our_adjusted_convertHTML_queue_name} queue workers = {queue1_worker_count}")
stats_client.gauge('workers.HTML.available', queue1_worker_count)
if queue1_worker_count < 1:
logger.critical(f"{prefixed_our_name} has no HTML job handler workers running!")
queue_worker_count = Worker.count(queue=queue)
logger.debug(f"Our {our_adjusted_convert_queue_name} queue workers = {queue_worker_count}")
stats_client.gauge('workers.HTML.available', queue_worker_count)
if queue_worker_count < 1:
logger.critical(f"{prefixed_our_name} has no job handler workers running!")
# Go ahead and queue the job anyway for when a worker is restarted
queue2_worker_count = Worker.count(queue=OBSPDF_queue)
logger.debug(f"Our {our_adjusted_convertOBSPDF_queue_name} queue workers = {queue2_worker_count}")
stats_client.gauge('workers.OBSPDF.available', queue2_worker_count)
if queue2_worker_count < 1:
logger.critical(f"{prefixed_our_name} has no OBSPDF job handler workers running!")
# Go ahead and queue the job anyway for when a worker is restarted
queue3_worker_count = Worker.count(queue=otherPDF_queue)
logger.debug(f"Our {our_adjusted_convertOtherPDF_queue_name} queue workers = {queue3_worker_count}")
stats_client.gauge('workers.otherPDF.available', queue3_worker_count)
if queue3_worker_count < 1:
logger.critical(f"{prefixed_our_name} has no otherPDF job handler workers running!")
# Go ahead and queue the job anyway for when a worker is restarted


response_ok_flag, response_dict = check_posted_tx_payload(request, logger)
# response_dict is json payload if successful, else error info
if response_ok_flag:
Expand All @@ -250,64 +218,15 @@ def job_receiver():
our_job_id = response_dict['job_id'] if 'job_id' in response_dict \
else get_unique_job_id()

# Determine which worker to queue this request for
if response_dict['output_format'] == 'html':
job_type, not_job_type = 'HTML', 'PDF'
our_adjusted_queue_name = our_adjusted_convertHTML_queue_name
our_other_adjusted_queue_name = our_other_adjusted_convertHTML_queue_name
our_adjusted_queue_name2 = our_adjusted_convertOBSPDF_queue_name
our_other_adjusted_queue_name2 = our_other_adjusted_convertOBSPDF_queue_name
our_adjusted_queue_name3 = our_adjusted_convertOtherPDF_queue_name
our_other_adjusted_queue_name3 = our_other_adjusted_convertOtherPDF_queue_name
our_queue = HTML_queue
expected_output_URL = f"{TX_JOB_CDN_BUCKET}{our_job_id}.zip"
elif response_dict['output_format'] == 'pdf':
not_job_type = 'HTML'
our_adjusted_queue_name2 = our_adjusted_convertHTML_queue_name
our_other_adjusted_queue_name2 = our_other_adjusted_convertHTML_queue_name

# Try to guess where the output PDF will end up
expected_output_URL = 'UNKNOWN'
if 'identifier' in response_dict \
and '/' not in response_dict['identifier']:
if response_dict['identifier'].count('--') == 2:
# Expected identifier in form '<repo_owner_username>/<repo_name>--<branch_or_tag_name>'
# e.g. 'unfoldingWord/en_obs--v1'
logger.debug("Using1 'identifier' field to determine expected_output_URL…")
repo_owner_username, repo_name, branch_or_tag_name = response_dict['identifier'].split('--')
expected_output_URL = f"{PDF_CDN_BUCKET}{repo_owner_username}/{repo_name}/{branch_or_tag_name}/{response_dict['identifier']}.pdf"
elif response_dict['identifier'].count('--') == 3:
# Expected identifier in form '<repo_owner_username>/<repo_name>--<branch_name>--<commit_hash>'
# e.g. 'unfoldingWord/en_obs--master--7dac1e5ba2'
logger.debug("Using2 'identifier' field to determine expected_output_URL…")
repo_owner_username, repo_name, branch_or_tag_name, _commit_hash = response_dict['identifier'].split('--')
expected_output_URL = f"{PDF_CDN_BUCKET}{repo_owner_username}/{repo_name}/{branch_or_tag_name}/{repo_owner_username}--{repo_name}--{branch_or_tag_name}.pdf"
elif response_dict['source'].count('/') == 6 \
and response_dict['source'].endswith('.zip'):
# Expected URL in form 'https://git.door43.org/<repo_owner_username>/<repo_name>/archive/<branch_or_tag_name>.zip'
# e.g. 'https://git.door43.org/unfoldingWord/en_obs/archive/master.zip'
logger.debug("Using 'source' field to determine expected_output_URL…")
parts = response_dict['source'][:-4].split('/') # Remove the .zip first
if len(parts) != 7:
logger.critical(f"Source field is in unexpected form: '{response_dict['source']}' -> {parts}.zip")
expected_output_URL = f"{PDF_CDN_BUCKET}{parts[3]}/{parts[4]}/{parts[6]}/{parts[3]}--{parts[4]}--{parts[6]}"
logger.info(f"Got expected_output_URL = {expected_output_URL}")

# Determine the correct PDF creation queue
if response_dict['resource_type'] == 'Open_Bible_Stories': # subject
job_type = 'OBS-PDF'
our_adjusted_queue_name = our_adjusted_convertOBSPDF_queue_name
our_other_adjusted_queue_name = our_other_adjusted_convertOBSPDF_queue_name
our_adjusted_queue_name3 = our_adjusted_convertOtherPDF_queue_name
our_other_adjusted_queue_name3 = our_other_adjusted_convertOtherPDF_queue_name
our_queue = OBSPDF_queue
else: # not OBS
job_type = 'other-PDF'
our_adjusted_queue_name = our_adjusted_convertOtherPDF_queue_name
our_other_adjusted_queue_name = our_other_adjusted_convertOtherPDF_queue_name
our_adjusted_queue_name3 = our_adjusted_convertOBSPDF_queue_name
our_other_adjusted_queue_name3 = our_other_adjusted_convertOBSPDF_queue_name
our_queue = otherPDF_queue
job_type = response_dict['output_format'].upper()
our_adjusted_queue_name = our_adjusted_convert_queue_name
our_other_adjusted_queue_name = our_other_adjusted_convert_queue_name
our_queue = queue
if job_type == 'PDF':
expected_output_URL = f'{PDF_CDN_BUCKET}{our_job_id}_pdf.zip'
else:
expected_output_URL = f'{TX_JOB_CDN_BUCKET}{our_job_id}.zip'
logger.info(f"Got expected_output_URL = {expected_output_URL}")

# Extend the given payload (dict) to add our required fields
#logger.debug("Building our response dict…")
Expand Down Expand Up @@ -335,33 +254,25 @@ def job_receiver():
# NOTE: The above line can return a result from the webhook.job function. (By default, the result remains available for 500s.)

# Find out who our workers are
#workers = Worker.all(connection=redis_connection) # Returns the actual worker objects
#logger.debug(f"Total rq workers ({len(workers)}): {workers}")
#our_queue_workers = Worker.all(queue=our_queue)
#logger.debug(f"Our {our_adjusted_queue_name} queue workers ({len(our_queue_workers)}): {our_queue_workers}")
workers = Worker.all(connection=redis_connection) # Returns the actual worker objects
logger.debug(f"Total rq workers ({len(workers)}): {workers}")
our_queue_workers = Worker.all(queue=our_queue)
logger.debug(f"Our {our_adjusted_queue_name} queue workers ({len(our_queue_workers)}): {our_queue_workers}")

# Find out how many workers we have
#worker_count = Worker.count(connection=redis_connection)
#logger.debug(f"Total rq workers = {worker_count}")
#our_queue_worker_count = Worker.count(queue=our_queue)
#logger.debug(f"Our {our_adjusted_queue_name} queue workers = {our_queue_worker_count}")
worker_count = Worker.count(connection=redis_connection)
logger.debug(f"Total rq workers = {worker_count}")
our_queue_worker_count = Worker.count(queue=our_queue)
logger.debug(f"Our {our_adjusted_queue_name} queue workers = {our_queue_worker_count}")

len_our_queue = len(our_queue) # Update
other_queue = Queue(our_other_adjusted_queue_name, connection=redis_connection)
queue2 = Queue(our_adjusted_queue_name2, connection=redis_connection)
other_queue2 = Queue(our_other_adjusted_queue_name2, connection=redis_connection)
queue3 = Queue(our_adjusted_queue_name3, connection=redis_connection)
other_queue3 = Queue(our_other_adjusted_queue_name3, connection=redis_connection)
logger.info(f"{prefixed_our_name} queued valid {job_type} job to {our_adjusted_queue_name} queue " \
f"({len_our_queue} {job_type} jobs now " \
f"for {Worker.count(queue=our_queue)} workers, " \
f"{len(other_queue)} {job_type} jobs in {our_other_adjusted_queue_name} queue " \
f"for {Worker.count(queue=other_queue)} workers, " \
f"{len_HTML_failed_queue} failed {job_type} jobs), " \
f"({len(queue2)} {not_job_type} jobs in {our_adjusted_queue_name2} queue, " \
f"{len(other_queue2)} {not_job_type} jobs in {our_other_adjusted_queue_name2} queue) " \
f"({len(queue3)} {not_job_type} jobs in {our_adjusted_queue_name3} queue, " \
f"{len(other_queue3)} {not_job_type} jobs in {our_other_adjusted_queue_name3} queue) " \
f"{len_failed_queue} failed {job_type} jobs), " \
f"at {datetime.utcnow()}\n")
stats_client.incr('posts.succeeded')
return jsonify(our_response_dict)
Expand Down

0 comments on commit b7f65df

Please sign in to comment.