From 599e70926172920ff9feefb559e2169ba9437462 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Thu, 2 May 2024 15:34:58 -0700 Subject: [PATCH 1/9] feat: Add metrics, gcp logging to tokenserver scripts This adds GCP logging JSON output formatting for logs as well as DataDog style metric reporting. New Environs: * `METRIC_API_KEY` - Optional Datadog API key * `METRIC_APP_KEY` - Optional Datadog APP key * `METRIC_HOST` - Collector Host address * `METRIC_PORT` - Collector Host port This also adds the `--human` arg if you want to use the older, human readable logs instead. --- requirements.txt | 1 + tools/tokenserver/process_account_events.py | 82 ++++++++++++++------- tools/tokenserver/purge_old_records.py | 60 +++++++++++++-- tools/tokenserver/requirements.txt | 1 + tools/tokenserver/util.py | 63 ++++++++++++++++ 5 files changed, 174 insertions(+), 33 deletions(-) diff --git a/requirements.txt b/requirements.txt index 773c4846b5..0513455937 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ cryptography==41.0 pyfxa==0.7.7 tokenlib==2.0.0 +datadog diff --git a/tools/tokenserver/process_account_events.py b/tools/tokenserver/process_account_events.py index 69cf700cf7..f96083c0c6 100644 --- a/tools/tokenserver/process_account_events.py +++ b/tools/tokenserver/process_account_events.py @@ -40,17 +40,22 @@ from database import Database -logger = logging.getLogger("tokenserver.scripts.process_account_deletions") +APP_LABEL = "tokenserver.scripts.process_account_events" -def process_account_events(queue_name, aws_region=None, queue_wait_time=20): +def process_account_events( + queue_name, + aws_region=None, + queue_wait_time=20, + metrics=None): """Process account events from an SQS queue. This function polls the specified SQS queue for account-realted events, processing each as it is found. It polls indefinitely and does not return; to interrupt execution you'll need to e.g. SIGINT the process. """ - logger.info("Processing account events from %s", queue_name) + logger = logging.getLogger(APP_LABEL) + logger.info(f"Processing account events from {queue_name}") database = Database() try: # Connect to the SQS queue. @@ -69,7 +74,7 @@ def process_account_events(queue_name, aws_region=None, queue_wait_time=20): msg = queue.read(wait_time_seconds=queue_wait_time) if msg is None: continue - process_account_event(database, msg.get_body()) + process_account_event(database, msg.get_body(), metrics=metrics) # This intentionally deletes the event even if it was some # unrecognized type. Not point leaving a backlog. queue.delete_message(msg) @@ -78,9 +83,10 @@ def process_account_events(queue_name, aws_region=None, queue_wait_time=20): raise -def process_account_event(database, body): +def process_account_event(database, body, metrics=None): """Parse and process a single account event.""" # Try very hard not to error out if there's junk in the queue. + logger = logging.getLogger(APP_LABEL) email = None event_type = None generation = None @@ -105,23 +111,30 @@ def process_account_event(database, body): logger.exception("Invalid account message: %s", e) else: if email is not None: - if event_type == "delete": - # Mark the user as retired. - # Actual cleanup is done by a separate process. - logger.info("Processing account delete for %r", email) - database.retire_user(email) - elif event_type == "reset": - logger.info("Processing account reset for %r", email) - update_generation_number(database, email, generation) - elif event_type == "passwordChange": - logger.info("Processing password change for %r", email) - update_generation_number(database, email, generation) - else: - logger.warning("Dropping unknown event type %r", - event_type) - - -def update_generation_number(database, email, generation): + record_metric = True + match event_type: + case "delete": + # Mark the user as retired. + # Actual cleanup is done by a separate process. + logger.info("Processing account delete for %r", email) + database.retire_user(email) + case "reset": + logger.info("Processing account reset for %r", email) + update_generation_number( + database, email, generation, metrics=metrics) + case "passwordChange": + logger.info("Processing password change for %r", email) + update_generation_number( + database, email, generation, metrics=metrics) + case _: + record_metric = False + logger.warning("Dropping unknown event type %r", + event_type) + if record_metric: + metrics and metrics.incr(event_type) + + +def update_generation_number(database, email, generation, metrics=None): """Update the maximum recorded generation number for the given user. When the FxA server sends us an update to the user's generation @@ -145,6 +158,7 @@ def update_generation_number(database, email, generation): user = database.get_user(email) if user is not None: database.update_user(user, generation - 1) + metrics and metrics.incr("decr_generation") def main(args=None): @@ -161,17 +175,35 @@ def main(args=None): help="Number of seconds to wait for jobs on the queue") parser.add_option("-v", "--verbose", action="count", dest="verbosity", help="Control verbosity of log messages") + parser.add_option("", "--human", action="store_true", + help="Human readable logs") opts, args = parser.parse_args(args) + # set up logging + if not getattr(opts, "app_label", None): + setattr(opts, "app_label", APP_LABEL) + if opts.human: + util.configure_script_logging(opts) + else: + util.configure_gcp_logging(opts) + + logger = logging.getLogger(APP_LABEL) + logger.info("Starting up..") + + # set up metrics: + metrics = util.Metrics(opts) + if len(args) != 1: parser.print_usage() return 1 - util.configure_script_logging(opts) - queue_name = args[0] - process_account_events(queue_name, opts.aws_region, opts.queue_wait_time) + process_account_events( + queue_name, + opts.aws_region, + opts.queue_wait_time, + metrics=metrics) return 0 diff --git a/tools/tokenserver/purge_old_records.py b/tools/tokenserver/purge_old_records.py index 89f90a59f6..f6ce85ae80 100644 --- a/tools/tokenserver/purge_old_records.py +++ b/tools/tokenserver/purge_old_records.py @@ -50,6 +50,7 @@ def purge_old_records( force=False, override_node=None, uid_range=None, + metrics=None, ): """Purge old records from the database. @@ -103,6 +104,9 @@ def purge_old_records( row.node ) if not dryrun: + metrics and metrics.incr( + "delete_user", + tags={"type": "nodeless"}) database.delete_user_record(row.uid) # NOTE: only delete_user+service_data calls count # against the counter @@ -116,13 +120,21 @@ def purge_old_records( row, secret, timeout=request_timeout, - dryrun=dryrun + dryrun=dryrun, + metrics=metrics, + ) + metrics and metrics.incr( + "delete_data" ) database.delete_user_record(row.uid) + metrics and metrics.incr( + "delete_user", + tags={"type": "not_down"} + ) counter += 1 elif force: delete_sd = not points_to_active( - database, row, override_node) + database, row, override_node, metrics=metrics) logger.info( "Forcing tokenserver record delete: " f"{row.uid} on {row.node} " @@ -145,7 +157,12 @@ def purge_old_records( dryrun=dryrun, # if an override was specifed, # use that node ID - override_node=override_node + override_node=override_node, + metrics=metrics, + ) + metrics and metrics( + "delete_data", + tags={"type": "force"} ) except requests.HTTPError: logger.warn( @@ -157,9 +174,14 @@ def purge_old_records( # reachable raise database.delete_user_record(row.uid) + metrics and metrics.incr( + "delete_data", + tags={"type": "force"} + ) counter += 1 if max_records and counter >= max_records: logger.info("Reached max_records, exiting") + metrics and metrics.incr("max_records") return True if len(rows) < max_per_loop: break @@ -172,7 +194,8 @@ def purge_old_records( def delete_service_data( - user, secret, timeout=60, dryrun=False, override_node=None): + user, secret, timeout=60, dryrun=False, override_node=None, + metrics=None): """Send a data-deletion request to the user's service node. This is a little bit of hackery to cause the user's service node to @@ -202,10 +225,11 @@ def delete_service_data( return resp = requests.delete(endpoint, auth=auth, timeout=timeout) if resp.status_code >= 400 and resp.status_code != 404: + metrics and metrics.incr("error.gone") resp.raise_for_status() -def points_to_active(database, replaced_at_row, override_node): +def points_to_active(database, replaced_at_row, override_node, metrics=None): """Determine if a `replaced_at` user record has the same generation/client_state as their active record. @@ -232,7 +256,10 @@ def points_to_active(database, replaced_at_row, override_node): replaced_at_row_keys_changed_at or replaced_at_row.generation, binascii.unhexlify(replaced_at_row.client_state), ) - return user_fxa_kid == replaced_at_row_fxa_kid + override = user_fxa_kid == replaced_at_row_fxa_kid + if override and metrics: + metrics.incr("override") + return override return False @@ -339,9 +366,26 @@ def main(args=None): default=None, help="End of UID range to check" ) + parser.add_option( + "", + "--human", + action="store_true", + help="Human readable logs" + ) opts, args = parser.parse_args(args) + # set up logging + if not getattr(opts, "app_label", None): + setattr(opts, "app_label", LOGGER) + if opts.human: + util.configure_script_logging(opts) + else: + util.configure_gcp_logging(opts) + + # set up metrics: + metrics = util.Metrics(opts) + if len(args) == 0: parser.print_usage() return 1 @@ -350,8 +394,6 @@ def main(args=None): secret = args[-1] logger.debug(f"Secret: {secret}") - util.configure_script_logging(opts) - uid_range = None if opts.range_start or opts.range_end: uid_range = (opts.range_start, opts.range_end) @@ -368,6 +410,7 @@ def main(args=None): force=opts.force, override_node=opts.override_node, uid_range=uid_range, + metrics=metrics, ) if not opts.oneshot: while True: @@ -388,6 +431,7 @@ def main(args=None): force=opts.force, override_node=opts.override_node, uid_range=uid_range, + metrics=metrics, ) return 0 diff --git a/tools/tokenserver/requirements.txt b/tools/tokenserver/requirements.txt index 2fc4fa3752..cd920255de 100644 --- a/tools/tokenserver/requirements.txt +++ b/tools/tokenserver/requirements.txt @@ -6,3 +6,4 @@ sqlalchemy==1.4.46 testfixtures tokenlib==2.0.0 PyBrowserID==0.14.0 +datadog diff --git a/tools/tokenserver/util.py b/tools/tokenserver/util.py index 2810da51e7..16dbc93b46 100644 --- a/tools/tokenserver/util.py +++ b/tools/tokenserver/util.py @@ -10,6 +10,11 @@ import sys import time import logging +import os +import json +from datetime import datetime + +import datadog from browserid.utils import encode_bytes as encode_bytes_b64 @@ -45,6 +50,42 @@ def configure_script_logging(opts=None): logger.setLevel(loglevel) +# We need to reformat a few things to get the record to display correctly +# This includes "escaping" the message as well as converting the timestamp +# into a parsable format. +class GCP_JSON_Formatter(logging.Formatter): + + def format(self, record): + return json.dumps({ + "severity": record.levelname, + "message": record.getMessage(), + "timestamp": datetime.fromtimestamp( + record.created).strftime( + "%Y-%m-%dT%H:%M:%SZ%z" # RFC3339 + ), + }) + + +def configure_gcp_logging(opts=None): + """Add or override the default handler to write a GCP logging compatible + error message. + """ + verbosity = (opts and getattr(opts, "verbosity", 0)) or 0 + logger = logging.getLogger(getattr(opts, "app_label", "")) + level = os.environ.get("PYTHON_LOG", "").upper() or \ + max(logging.DEBUG, logging.WARNING - (verbosity * 10)) or \ + logger.getEffectiveLevel() + + if logger.hasHandlers(): + handler = logger.handlers[0] + else: + handler = logging.StreamHandler() + handler.setLevel(level) + logger.addHandler(handler) + handler.setFormatter(GCP_JSON_Formatter()) + logger.setLevel(level) + + def format_key_id(keys_changed_at, key_hash): """Format an FxA key ID from a timestamp and key hash.""" return "{:013d}-{}".format( @@ -56,3 +97,25 @@ def format_key_id(keys_changed_at, key_hash): def get_timestamp(): """Get current timestamp in milliseconds.""" return int(time.time() * 1000) + + +class Metrics(): + prefix = "" + + def __init__(cls, opts): + options = dict( + namespace=getattr(opts, "app_label", ""), + api_key=getattr( + opts, "metric_api_key", os.environ.get("METRIC_API_KEY")), + app_key=getattr( + opts, "metric_app_key", os.environ.get("METRIC_APP_KEY")), + statsd_host=getattr( + opts, "metric_host", os.environ.get("METRIC_HOST")), + statsd_port=getattr( + opts, "metric_port", os.environ.get("METRIC_PORT")), + ) + cls.prefix = options.get("namespace") + datadog.initialize(**options) + + def incr(self, label, tags=None): + datadog.statsd.increment(label, tags=tags) From e28fb6f84e0381a090fd13299c0ae548ca0a214c Mon Sep 17 00:00:00 2001 From: jrconlin Date: Fri, 3 May 2024 14:14:50 -0700 Subject: [PATCH 2/9] f add dumb retry logic. --- tools/tokenserver/purge_old_records.py | 117 ++++++++++++++++--------- 1 file changed, 74 insertions(+), 43 deletions(-) diff --git a/tools/tokenserver/purge_old_records.py b/tools/tokenserver/purge_old_records.py index f6ce85ae80..4eebd84a54 100644 --- a/tools/tokenserver/purge_old_records.py +++ b/tools/tokenserver/purge_old_records.py @@ -19,7 +19,6 @@ import hawkauthlib import logging import optparse -import os import random import requests import time @@ -32,9 +31,6 @@ LOGGER = "tokenserver.scripts.purge_old_records" logger = logging.getLogger(LOGGER) -log_level = os.environ.get("PYTHON_LOG", "INFO").upper() -logger.setLevel(log_level) -logger.debug(f"Setting level to {log_level}") PATTERN = "{node}/1.5/{uid}" @@ -116,21 +112,49 @@ def purge_old_records( row.uid, row.node) if not dryrun: - delete_service_data( - row, - secret, - timeout=request_timeout, - dryrun=dryrun, - metrics=metrics, - ) - metrics and metrics.incr( - "delete_data" - ) - database.delete_user_record(row.uid) - metrics and metrics.incr( - "delete_user", - tags={"type": "not_down"} - ) + for loop in range(0, 10): + try: + loop -= 1 + delete_service_data( + row, + secret, + timeout=request_timeout, + dryrun=dryrun, + metrics=metrics, + ) + metrics and metrics.incr( + "delete_data" + ) + break + except requests.HTTPError as ex: + if ex.response.status_code not in \ + [502, 503, 504]: + nap = random.randint(5, 20) + logger.info( + f"retry delete_data in " + f"{row.uid} on {row.node}" + f" in {nap}s") + time.sleep(nap) + continue + raise + for loop in range(0, 10): + try: + database.delete_user_record(row.uid) + metrics and metrics.incr( + "delete_user", + tags={"type": "not_down"} + ) + break + except requests.HTTPError as ex: + if ex.response.status_code not in \ + [502, 503, 504]: + nap = random.randint(5, 20) + logger.info( + f"retry delete_user for " + f"{row.uid} in {nap}s") + time.sleep(nap) + continue + raise counter += 1 elif force: delete_sd = not points_to_active( @@ -149,30 +173,37 @@ def purge_old_records( # request refers to a node not contained by # the existing data set. # (The call mimics a user DELETE request.) - try: - delete_service_data( - row, - secret, - timeout=request_timeout, - dryrun=dryrun, - # if an override was specifed, - # use that node ID - override_node=override_node, - metrics=metrics, - ) - metrics and metrics( - "delete_data", - tags={"type": "force"} - ) - except requests.HTTPError: - logger.warn( - "Delete failed for user " - f"{row.uid} [{row.node}]" - ) - if override_node: - # Assume the override_node should be - # reachable - raise + for loop in range(0, 10): + try: + delete_service_data( + row, + secret, + timeout=request_timeout, + dryrun=dryrun, + # if an override was specifed, + # use that node ID + override_node=override_node, + metrics=metrics, + ) + metrics and metrics( + "delete_data", + tags={"type": "force"} + ) + break + except requests.HTTPError as ex: + if ex.response.status_code not in \ + [502, 503, 504]: + time.sleep(random.randint(5, 20)) + continue + logger.warn( + "Delete failed for user " + f"{row.uid} [{row.node}]" + ) + if override_node: + # Assume the override_node should be + # reachable + raise + database.delete_user_record(row.uid) metrics and metrics.incr( "delete_data", From a27e8530536001471b63bfd2392fd39db5509b29 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Fri, 3 May 2024 15:43:35 -0700 Subject: [PATCH 3/9] f switch to statsd --- tools/tokenserver/process_account_events.py | 18 ++++++++++++++++++ tools/tokenserver/purge_old_records.py | 18 ++++++++++++++++++ tools/tokenserver/requirements.txt | 2 +- tools/tokenserver/util.py | 14 +++++++------- 4 files changed, 44 insertions(+), 8 deletions(-) diff --git a/tools/tokenserver/process_account_events.py b/tools/tokenserver/process_account_events.py index f96083c0c6..a6798f5eaa 100644 --- a/tools/tokenserver/process_account_events.py +++ b/tools/tokenserver/process_account_events.py @@ -177,6 +177,24 @@ def main(args=None): help="Control verbosity of log messages") parser.add_option("", "--human", action="store_true", help="Human readable logs") + parser.add_option( + "", + "--metric_host", + default=None, + help="Metric host name" + ) + parser.add_option( + "", + "--metric_port", + default=None, + help="Metric host port" + ) + parser.add_option( + "", + "--metric_path", + default=None, + help="Metric host socket path" + ) opts, args = parser.parse_args(args) # set up logging diff --git a/tools/tokenserver/purge_old_records.py b/tools/tokenserver/purge_old_records.py index 4eebd84a54..b2717f627e 100644 --- a/tools/tokenserver/purge_old_records.py +++ b/tools/tokenserver/purge_old_records.py @@ -403,6 +403,24 @@ def main(args=None): action="store_true", help="Human readable logs" ) + parser.add_option( + "", + "--metric_host", + default=None, + help="Metric host name" + ) + parser.add_option( + "", + "--metric_port", + default=None, + help="Metric host port" + ) + parser.add_option( + "", + "--metric_path", + default=None, + help="Metric host socket path" + ) opts, args = parser.parse_args(args) diff --git a/tools/tokenserver/requirements.txt b/tools/tokenserver/requirements.txt index cd920255de..f18358ccba 100644 --- a/tools/tokenserver/requirements.txt +++ b/tools/tokenserver/requirements.txt @@ -6,4 +6,4 @@ sqlalchemy==1.4.46 testfixtures tokenlib==2.0.0 PyBrowserID==0.14.0 -datadog +statsd diff --git a/tools/tokenserver/util.py b/tools/tokenserver/util.py index 16dbc93b46..be78ad4c3a 100644 --- a/tools/tokenserver/util.py +++ b/tools/tokenserver/util.py @@ -14,7 +14,7 @@ import json from datetime import datetime -import datadog +from datadog import initialize, statsd from browserid.utils import encode_bytes as encode_bytes_b64 @@ -101,21 +101,21 @@ def get_timestamp(): class Metrics(): prefix = "" + client = None def __init__(cls, opts): options = dict( namespace=getattr(opts, "app_label", ""), - api_key=getattr( - opts, "metric_api_key", os.environ.get("METRIC_API_KEY")), - app_key=getattr( - opts, "metric_app_key", os.environ.get("METRIC_APP_KEY")), + statsd_namespace=getattr(opts, "app_label", ""), statsd_host=getattr( opts, "metric_host", os.environ.get("METRIC_HOST")), statsd_port=getattr( opts, "metric_port", os.environ.get("METRIC_PORT")), + statsd_socket_path=getattr( + opts, "metric_path", os.environ.get("METRIC_PATH")), ) cls.prefix = options.get("namespace") - datadog.initialize(**options) + initialize(**options) def incr(self, label, tags=None): - datadog.statsd.increment(label, tags=tags) + statsd.increment(label, tags=tags) From f0f2ed949b72b2af99c33d78b7a2c66d66cc4a06 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Fri, 17 May 2024 17:08:26 -0700 Subject: [PATCH 4/9] f r's --- requirements.txt | 1 - tools/tokenserver/process_account_events.py | 17 +-- tools/tokenserver/purge_old_records.py | 143 +++++++++----------- tools/tokenserver/requirements.txt | 4 +- tools/tokenserver/util.py | 65 ++++----- 5 files changed, 100 insertions(+), 130 deletions(-) diff --git a/requirements.txt b/requirements.txt index 0513455937..773c4846b5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,3 @@ cryptography==41.0 pyfxa==0.7.7 tokenlib==2.0.0 -datadog diff --git a/tools/tokenserver/process_account_events.py b/tools/tokenserver/process_account_events.py index a6798f5eaa..21b60cc3ba 100644 --- a/tools/tokenserver/process_account_events.py +++ b/tools/tokenserver/process_account_events.py @@ -130,8 +130,8 @@ def process_account_event(database, body, metrics=None): record_metric = False logger.warning("Dropping unknown event type %r", event_type) - if record_metric: - metrics and metrics.incr(event_type) + if record_metric and metrics: + metrics.incr(event_type) def update_generation_number(database, email, generation, metrics=None): @@ -158,7 +158,8 @@ def update_generation_number(database, email, generation, metrics=None): user = database.get_user(email) if user is not None: database.update_user(user, generation - 1) - metrics and metrics.incr("decr_generation") + if metrics: + metrics.incr("decr_generation") def main(args=None): @@ -175,7 +176,7 @@ def main(args=None): help="Number of seconds to wait for jobs on the queue") parser.add_option("-v", "--verbose", action="count", dest="verbosity", help="Control verbosity of log messages") - parser.add_option("", "--human", action="store_true", + parser.add_option("", "--human_logs", action="store_true", help="Human readable logs") parser.add_option( "", @@ -189,18 +190,12 @@ def main(args=None): default=None, help="Metric host port" ) - parser.add_option( - "", - "--metric_path", - default=None, - help="Metric host socket path" - ) opts, args = parser.parse_args(args) # set up logging if not getattr(opts, "app_label", None): setattr(opts, "app_label", APP_LABEL) - if opts.human: + if opts.human_logs: util.configure_script_logging(opts) else: util.configure_gcp_logging(opts) diff --git a/tools/tokenserver/purge_old_records.py b/tools/tokenserver/purge_old_records.py index b2717f627e..8ed867b66d 100644 --- a/tools/tokenserver/purge_old_records.py +++ b/tools/tokenserver/purge_old_records.py @@ -15,6 +15,7 @@ """ +import backoff import binascii import hawkauthlib import logging @@ -35,6 +36,12 @@ PATTERN = "{node}/1.5/{uid}" +class Retry(requests.HTTPError): + + def is_retryable(self): + self.response.status_code in [502, 503, 504] + + def purge_old_records( secret, grace_period=-1, @@ -103,7 +110,7 @@ def purge_old_records( metrics and metrics.incr( "delete_user", tags={"type": "nodeless"}) - database.delete_user_record(row.uid) + retryable(database.delete_user_record, row.uid) # NOTE: only delete_user+service_data calls count # against the counter elif not row.downed: @@ -112,49 +119,24 @@ def purge_old_records( row.uid, row.node) if not dryrun: - for loop in range(0, 10): - try: - loop -= 1 - delete_service_data( - row, - secret, - timeout=request_timeout, - dryrun=dryrun, - metrics=metrics, - ) - metrics and metrics.incr( - "delete_data" - ) - break - except requests.HTTPError as ex: - if ex.response.status_code not in \ - [502, 503, 504]: - nap = random.randint(5, 20) - logger.info( - f"retry delete_data in " - f"{row.uid} on {row.node}" - f" in {nap}s") - time.sleep(nap) - continue - raise - for loop in range(0, 10): - try: - database.delete_user_record(row.uid) - metrics and metrics.incr( - "delete_user", - tags={"type": "not_down"} - ) - break - except requests.HTTPError as ex: - if ex.response.status_code not in \ - [502, 503, 504]: - nap = random.randint(5, 20) - logger.info( - f"retry delete_user for " - f"{row.uid} in {nap}s") - time.sleep(nap) - continue - raise + retryable( + delete_service_data, + row, + secret, + timeout=request_timeout, + dryrun=dryrun, + metrics=metrics, + ) + if metrics: + metrics.incr("delete_data") + retryable( + database.delete_user_record, + row.uid) + if metrics: + metrics.incr( + "delete_user", + tags={"type": "not_down"} + ) counter += 1 elif force: delete_sd = not points_to_active( @@ -173,42 +155,31 @@ def purge_old_records( # request refers to a node not contained by # the existing data set. # (The call mimics a user DELETE request.) - for loop in range(0, 10): - try: - delete_service_data( - row, - secret, - timeout=request_timeout, - dryrun=dryrun, - # if an override was specifed, - # use that node ID - override_node=override_node, - metrics=metrics, - ) - metrics and metrics( + retryable( + delete_service_data, + row, + secret, + timeout=request_timeout, + dryrun=dryrun, + # if an override was specifed, + # use that node ID + override_node=override_node, + metrics=metrics, + ) + if metrics: + metrics( "delete_data", tags={"type": "force"} ) - break - except requests.HTTPError as ex: - if ex.response.status_code not in \ - [502, 503, 504]: - time.sleep(random.randint(5, 20)) - continue - logger.warn( - "Delete failed for user " - f"{row.uid} [{row.node}]" - ) - if override_node: - # Assume the override_node should be - # reachable - raise - database.delete_user_record(row.uid) - metrics and metrics.incr( - "delete_data", - tags={"type": "force"} - ) + retryable( + database.delete_user_record, + row.uid) + if metrics: + metrics.incr( + "delete_data", + tags={"type": "force"} + ) counter += 1 if max_records and counter >= max_records: logger.info("Reached max_records, exiting") @@ -260,6 +231,19 @@ def delete_service_data( resp.raise_for_status() +def retry_giveup(e): + return 500 <= e.response.status_code < 505 + + +@backoff.on_exception( + backoff.expo, + requests.HTTPError, + giveup=retry_giveup + ) +def retryable(fn, *args, **kwargs): + fn(*args, **kwargs) + + def points_to_active(database, replaced_at_row, override_node, metrics=None): """Determine if a `replaced_at` user record has the same generation/client_state as their active record. @@ -399,7 +383,7 @@ def main(args=None): ) parser.add_option( "", - "--human", + "--human_logs", action="store_true", help="Human readable logs" ) @@ -427,10 +411,7 @@ def main(args=None): # set up logging if not getattr(opts, "app_label", None): setattr(opts, "app_label", LOGGER) - if opts.human: - util.configure_script_logging(opts) - else: - util.configure_gcp_logging(opts) + util.configure_script_logging(opts) # set up metrics: metrics = util.Metrics(opts) diff --git a/tools/tokenserver/requirements.txt b/tools/tokenserver/requirements.txt index f18358ccba..809a747315 100644 --- a/tools/tokenserver/requirements.txt +++ b/tools/tokenserver/requirements.txt @@ -6,4 +6,6 @@ sqlalchemy==1.4.46 testfixtures tokenlib==2.0.0 PyBrowserID==0.14.0 -statsd +datadog +backoff + diff --git a/tools/tokenserver/util.py b/tools/tokenserver/util.py index be78ad4c3a..339d4e55c5 100644 --- a/tools/tokenserver/util.py +++ b/tools/tokenserver/util.py @@ -35,19 +35,36 @@ def configure_script_logging(opts=None): formatting that's more for human readability than machine parsing. It also takes care of the --verbosity command-line option. """ - if not opts or not opts.verbosity: - loglevel = logging.WARNING - elif opts.verbosity == 1: - loglevel = logging.INFO + + verbosity = ( + opts and getattr( + opts, "verbosity", logging.NOTSET)) or logging.NOTSET + logger = logging.getLogger(getattr(opts, "app_label", "")) + level = os.environ.get("PYTHON_LOG", "").upper() or \ + max(logging.DEBUG, logging.WARNING - (verbosity * 10)) or \ + logger.getEffectiveLevel() + + # if we've already written a log message, the handler may already + # be defined. We don't want to duplicate messages if possible, so + # check and potentially adjust the existing logger's handler. + if logger.hasHandlers(): + handler = logger.handlers[0] else: - loglevel = logging.DEBUG + handler = logging.StreamHandler() - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter("%(message)s")) - handler.setLevel(loglevel) + formatter = GCP_JSON_Formatter() + # if we've opted for "human_logs", specify a simpler message. + if opts: + if getattr(opts, "human_logs", None): + formatter = logging.Formatter( + "{levelname:<8s}: {message}", + style="{") + + handler.setFormatter(formatter) + handler.setLevel(level) logger = logging.getLogger("") logger.addHandler(handler) - logger.setLevel(loglevel) + logger.setLevel(level) # We need to reformat a few things to get the record to display correctly @@ -61,31 +78,11 @@ def format(self, record): "message": record.getMessage(), "timestamp": datetime.fromtimestamp( record.created).strftime( - "%Y-%m-%dT%H:%M:%SZ%z" # RFC3339 + "%Y-%m-%dT%H:%M:%SZ" # RFC3339 ), }) -def configure_gcp_logging(opts=None): - """Add or override the default handler to write a GCP logging compatible - error message. - """ - verbosity = (opts and getattr(opts, "verbosity", 0)) or 0 - logger = logging.getLogger(getattr(opts, "app_label", "")) - level = os.environ.get("PYTHON_LOG", "").upper() or \ - max(logging.DEBUG, logging.WARNING - (verbosity * 10)) or \ - logger.getEffectiveLevel() - - if logger.hasHandlers(): - handler = logger.handlers[0] - else: - handler = logging.StreamHandler() - handler.setLevel(level) - logger.addHandler(handler) - handler.setFormatter(GCP_JSON_Formatter()) - logger.setLevel(level) - - def format_key_id(keys_changed_at, key_hash): """Format an FxA key ID from a timestamp and key hash.""" return "{:013d}-{}".format( @@ -100,10 +97,8 @@ def get_timestamp(): class Metrics(): - prefix = "" - client = None - def __init__(cls, opts): + def __init__(self, opts): options = dict( namespace=getattr(opts, "app_label", ""), statsd_namespace=getattr(opts, "app_label", ""), @@ -111,10 +106,8 @@ def __init__(cls, opts): opts, "metric_host", os.environ.get("METRIC_HOST")), statsd_port=getattr( opts, "metric_port", os.environ.get("METRIC_PORT")), - statsd_socket_path=getattr( - opts, "metric_path", os.environ.get("METRIC_PATH")), ) - cls.prefix = options.get("namespace") + self.prefix = options.get("namespace") initialize(**options) def incr(self, label, tags=None): From 58d7d4407ea8a3f72ac1935e7faa3a66752a4c5e Mon Sep 17 00:00:00 2001 From: jrconlin Date: Fri, 17 May 2024 17:14:40 -0700 Subject: [PATCH 5/9] more r's --- tools/tokenserver/process_account_events.py | 5 +---- tools/tokenserver/purge_old_records.py | 8 +------- tools/tokenserver/util.py | 4 ++-- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/tools/tokenserver/process_account_events.py b/tools/tokenserver/process_account_events.py index 21b60cc3ba..d432024332 100644 --- a/tools/tokenserver/process_account_events.py +++ b/tools/tokenserver/process_account_events.py @@ -195,10 +195,7 @@ def main(args=None): # set up logging if not getattr(opts, "app_label", None): setattr(opts, "app_label", APP_LABEL) - if opts.human_logs: - util.configure_script_logging(opts) - else: - util.configure_gcp_logging(opts) + util.configure_script_logging(opts, logger_name=APP_LABEL) logger = logging.getLogger(APP_LABEL) logger.info("Starting up..") diff --git a/tools/tokenserver/purge_old_records.py b/tools/tokenserver/purge_old_records.py index 8ed867b66d..aaf72009b0 100644 --- a/tools/tokenserver/purge_old_records.py +++ b/tools/tokenserver/purge_old_records.py @@ -399,19 +399,13 @@ def main(args=None): default=None, help="Metric host port" ) - parser.add_option( - "", - "--metric_path", - default=None, - help="Metric host socket path" - ) opts, args = parser.parse_args(args) # set up logging if not getattr(opts, "app_label", None): setattr(opts, "app_label", LOGGER) - util.configure_script_logging(opts) + util.configure_script_logging(opts, logger_name=LOGGER) # set up metrics: metrics = util.Metrics(opts) diff --git a/tools/tokenserver/util.py b/tools/tokenserver/util.py index 339d4e55c5..6459d8d2d5 100644 --- a/tools/tokenserver/util.py +++ b/tools/tokenserver/util.py @@ -28,7 +28,7 @@ def run_script(main): sys.exit(exitcode) -def configure_script_logging(opts=None): +def configure_script_logging(opts=None, logger_name=""): """Configure stdlib logging to produce output from the script. This basically configures logging to send messages to stderr, with @@ -39,7 +39,7 @@ def configure_script_logging(opts=None): verbosity = ( opts and getattr( opts, "verbosity", logging.NOTSET)) or logging.NOTSET - logger = logging.getLogger(getattr(opts, "app_label", "")) + logger = logging.getLogger(logger_name) level = os.environ.get("PYTHON_LOG", "").upper() or \ max(logging.DEBUG, logging.WARNING - (verbosity * 10)) or \ logger.getEffectiveLevel() From ad8c0f5f55a973ea9befdf5ae036e4c8808188b3 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Fri, 17 May 2024 17:18:37 -0700 Subject: [PATCH 6/9] f more r's --- tools/tokenserver/purge_old_records.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tools/tokenserver/purge_old_records.py b/tools/tokenserver/purge_old_records.py index aaf72009b0..54f0a09d70 100644 --- a/tools/tokenserver/purge_old_records.py +++ b/tools/tokenserver/purge_old_records.py @@ -107,9 +107,10 @@ def purge_old_records( row.node ) if not dryrun: - metrics and metrics.incr( - "delete_user", - tags={"type": "nodeless"}) + if metrics: + metrics.incr( + "delete_user", + tags={"type": "nodeless"}) retryable(database.delete_user_record, row.uid) # NOTE: only delete_user+service_data calls count # against the counter @@ -183,7 +184,8 @@ def purge_old_records( counter += 1 if max_records and counter >= max_records: logger.info("Reached max_records, exiting") - metrics and metrics.incr("max_records") + if metrics: + metrics.incr("max_records") return True if len(rows) < max_per_loop: break @@ -227,7 +229,8 @@ def delete_service_data( return resp = requests.delete(endpoint, auth=auth, timeout=timeout) if resp.status_code >= 400 and resp.status_code != 404: - metrics and metrics.incr("error.gone") + if metrics: + metrics.incr("error.gone") resp.raise_for_status() From 0569da3794acacf169ac1f37d40b3b102f1bd003 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Fri, 17 May 2024 17:26:21 -0700 Subject: [PATCH 7/9] f more r's --- tools/tokenserver/process_account_events.py | 4 +--- tools/tokenserver/purge_old_records.py | 6 ++---- tools/tokenserver/util.py | 6 +++--- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/tools/tokenserver/process_account_events.py b/tools/tokenserver/process_account_events.py index d432024332..a3658f8267 100644 --- a/tools/tokenserver/process_account_events.py +++ b/tools/tokenserver/process_account_events.py @@ -193,15 +193,13 @@ def main(args=None): opts, args = parser.parse_args(args) # set up logging - if not getattr(opts, "app_label", None): - setattr(opts, "app_label", APP_LABEL) util.configure_script_logging(opts, logger_name=APP_LABEL) logger = logging.getLogger(APP_LABEL) logger.info("Starting up..") # set up metrics: - metrics = util.Metrics(opts) + metrics = util.Metrics(opts, namespace="tokenserver") if len(args) != 1: parser.print_usage() diff --git a/tools/tokenserver/purge_old_records.py b/tools/tokenserver/purge_old_records.py index 54f0a09d70..b8995e2a0c 100644 --- a/tools/tokenserver/purge_old_records.py +++ b/tools/tokenserver/purge_old_records.py @@ -168,7 +168,7 @@ def purge_old_records( metrics=metrics, ) if metrics: - metrics( + metrics.incr( "delete_data", tags={"type": "force"} ) @@ -406,12 +406,10 @@ def main(args=None): opts, args = parser.parse_args(args) # set up logging - if not getattr(opts, "app_label", None): - setattr(opts, "app_label", LOGGER) util.configure_script_logging(opts, logger_name=LOGGER) # set up metrics: - metrics = util.Metrics(opts) + metrics = util.Metrics(opts, namespace="tokenserver") if len(args) == 0: parser.print_usage() diff --git a/tools/tokenserver/util.py b/tools/tokenserver/util.py index 6459d8d2d5..5e6fec3bf9 100644 --- a/tools/tokenserver/util.py +++ b/tools/tokenserver/util.py @@ -98,10 +98,10 @@ def get_timestamp(): class Metrics(): - def __init__(self, opts): + def __init__(self, opts, namespace=""): options = dict( - namespace=getattr(opts, "app_label", ""), - statsd_namespace=getattr(opts, "app_label", ""), + namespace=namespace, + statsd_namespace=namespace, statsd_host=getattr( opts, "metric_host", os.environ.get("METRIC_HOST")), statsd_port=getattr( From 84fa7ceac1037492ad9baa58c67128fcb62e26a2 Mon Sep 17 00:00:00 2001 From: jrconlin Date: Wed, 22 May 2024 09:55:09 -0700 Subject: [PATCH 8/9] f r's --- tools/tokenserver/process_account_events.py | 6 ++++-- tools/tokenserver/purge_old_records.py | 16 ++++++---------- tools/tokenserver/util.py | 1 + 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/tools/tokenserver/process_account_events.py b/tools/tokenserver/process_account_events.py index a3658f8267..648d8f9cb8 100644 --- a/tools/tokenserver/process_account_events.py +++ b/tools/tokenserver/process_account_events.py @@ -40,6 +40,9 @@ from database import Database +# Logging is initialized in `main` by `util.configure_script_logging()` +# Please do not call `logging.basicConfig()` before then, since this may +# cause duplicate error messages to be generated. APP_LABEL = "tokenserver.scripts.process_account_events" @@ -193,9 +196,8 @@ def main(args=None): opts, args = parser.parse_args(args) # set up logging - util.configure_script_logging(opts, logger_name=APP_LABEL) + logger = util.configure_script_logging(opts, logger_name=APP_LABEL) - logger = logging.getLogger(APP_LABEL) logger.info("Starting up..") # set up metrics: diff --git a/tools/tokenserver/purge_old_records.py b/tools/tokenserver/purge_old_records.py index b8995e2a0c..930b758631 100644 --- a/tools/tokenserver/purge_old_records.py +++ b/tools/tokenserver/purge_old_records.py @@ -29,19 +29,15 @@ from database import Database from util import format_key_id -LOGGER = "tokenserver.scripts.purge_old_records" -logger = logging.getLogger(LOGGER) +# Logging is initialized in `main` by `util.configure_script_logging()` +# Please do not call `logging.basicConfig()` before then, since this may +# cause duplicate error messages to be generated. +LOGGER = "tokenserver.scripts.purge_old_records" PATTERN = "{node}/1.5/{uid}" -class Retry(requests.HTTPError): - - def is_retryable(self): - self.response.status_code in [502, 503, 504] - - def purge_old_records( secret, grace_period=-1, @@ -67,6 +63,7 @@ def purge_old_records( a (likely) different set of records to work on. A cheap, imperfect randomization. """ + logger = logging.getLogger(LOGGER) logger.info("Purging old user records") try: database = Database() @@ -299,7 +296,6 @@ def main(args=None): This function parses command-line arguments and passes them on to the purge_old_records() function. """ - logger = logging.getLogger(LOGGER) usage = "usage: %prog [options] secret" parser = optparse.OptionParser(usage=usage) parser.add_option( @@ -406,7 +402,7 @@ def main(args=None): opts, args = parser.parse_args(args) # set up logging - util.configure_script_logging(opts, logger_name=LOGGER) + logger = util.configure_script_logging(opts, logger_name=LOGGER) # set up metrics: metrics = util.Metrics(opts, namespace="tokenserver") diff --git a/tools/tokenserver/util.py b/tools/tokenserver/util.py index 5e6fec3bf9..304baade91 100644 --- a/tools/tokenserver/util.py +++ b/tools/tokenserver/util.py @@ -65,6 +65,7 @@ def configure_script_logging(opts=None, logger_name=""): logger = logging.getLogger("") logger.addHandler(handler) logger.setLevel(level) + return logger # We need to reformat a few things to get the record to display correctly From d34fedaf65c8a1a95d2a0a9d2d6800af3a7c3c46 Mon Sep 17 00:00:00 2001 From: JR Conlin Date: Wed, 22 May 2024 16:15:41 -0700 Subject: [PATCH 9/9] f r's Co-authored-by: Philip Jenvey --- tools/tokenserver/util.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tools/tokenserver/util.py b/tools/tokenserver/util.py index 304baade91..c87857bd50 100644 --- a/tools/tokenserver/util.py +++ b/tools/tokenserver/util.py @@ -44,9 +44,7 @@ def configure_script_logging(opts=None, logger_name=""): max(logging.DEBUG, logging.WARNING - (verbosity * 10)) or \ logger.getEffectiveLevel() - # if we've already written a log message, the handler may already - # be defined. We don't want to duplicate messages if possible, so - # check and potentially adjust the existing logger's handler. + # if we've previously setup a handler, adjust it instead if logger.hasHandlers(): handler = logger.handlers[0] else: