Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add metrics, gcp logging to tokenserver scripts #1555

Merged
merged 16 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 65 additions & 25 deletions tools/tokenserver/process_account_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,22 @@
from database import Database


logger = logging.getLogger("tokenserver.scripts.process_account_deletions")
APP_LABEL = "tokenserver.scripts.process_account_events"


def process_account_events(queue_name, aws_region=None, queue_wait_time=20):
def process_account_events(
queue_name,
aws_region=None,
queue_wait_time=20,
metrics=None):
"""Process account events from an SQS queue.

This function polls the specified SQS queue for account-realted events,
processing each as it is found. It polls indefinitely and does not return;
to interrupt execution you'll need to e.g. SIGINT the process.
"""
logger.info("Processing account events from %s", queue_name)
logger = logging.getLogger(APP_LABEL)
logger.info(f"Processing account events from {queue_name}")
database = Database()
try:
# Connect to the SQS queue.
Expand All @@ -69,7 +74,7 @@ def process_account_events(queue_name, aws_region=None, queue_wait_time=20):
msg = queue.read(wait_time_seconds=queue_wait_time)
if msg is None:
continue
process_account_event(database, msg.get_body())
process_account_event(database, msg.get_body(), metrics=metrics)
# This intentionally deletes the event even if it was some
# unrecognized type. Not point leaving a backlog.
queue.delete_message(msg)
Expand All @@ -78,9 +83,10 @@ def process_account_events(queue_name, aws_region=None, queue_wait_time=20):
raise


def process_account_event(database, body):
def process_account_event(database, body, metrics=None):
"""Parse and process a single account event."""
# Try very hard not to error out if there's junk in the queue.
logger = logging.getLogger(APP_LABEL)
email = None
event_type = None
generation = None
Expand All @@ -105,23 +111,30 @@ def process_account_event(database, body):
logger.exception("Invalid account message: %s", e)
else:
if email is not None:
if event_type == "delete":
# Mark the user as retired.
# Actual cleanup is done by a separate process.
logger.info("Processing account delete for %r", email)
database.retire_user(email)
elif event_type == "reset":
logger.info("Processing account reset for %r", email)
update_generation_number(database, email, generation)
elif event_type == "passwordChange":
logger.info("Processing password change for %r", email)
update_generation_number(database, email, generation)
else:
logger.warning("Dropping unknown event type %r",
event_type)


def update_generation_number(database, email, generation):
record_metric = True
match event_type:
case "delete":
# Mark the user as retired.
# Actual cleanup is done by a separate process.
logger.info("Processing account delete for %r", email)
database.retire_user(email)
case "reset":
logger.info("Processing account reset for %r", email)
update_generation_number(
database, email, generation, metrics=metrics)
case "passwordChange":
logger.info("Processing password change for %r", email)
update_generation_number(
database, email, generation, metrics=metrics)
case _:
record_metric = False
logger.warning("Dropping unknown event type %r",
event_type)
if record_metric and metrics:
metrics.incr(event_type)


def update_generation_number(database, email, generation, metrics=None):
"""Update the maximum recorded generation number for the given user.

When the FxA server sends us an update to the user's generation
Expand All @@ -145,6 +158,8 @@ def update_generation_number(database, email, generation):
user = database.get_user(email)
if user is not None:
database.update_user(user, generation - 1)
if metrics:
metrics.incr("decr_generation")


def main(args=None):
Expand All @@ -161,17 +176,42 @@ def main(args=None):
help="Number of seconds to wait for jobs on the queue")
parser.add_option("-v", "--verbose", action="count", dest="verbosity",
help="Control verbosity of log messages")
parser.add_option("", "--human_logs", action="store_true",
help="Human readable logs")
parser.add_option(
"",
"--metric_host",
default=None,
help="Metric host name"
)
parser.add_option(
"",
"--metric_port",
default=None,
help="Metric host port"
)

opts, args = parser.parse_args(args)
# set up logging
util.configure_script_logging(opts, logger_name=APP_LABEL)

logger = logging.getLogger(APP_LABEL)
logger.info("Starting up..")

# set up metrics:
metrics = util.Metrics(opts, namespace="tokenserver")

if len(args) != 1:
parser.print_usage()
return 1

util.configure_script_logging(opts)

queue_name = args[0]

process_account_events(queue_name, opts.aws_region, opts.queue_wait_time)
process_account_events(
queue_name,
opts.aws_region,
opts.queue_wait_time,
metrics=metrics)
return 0


Expand Down
123 changes: 96 additions & 27 deletions tools/tokenserver/purge_old_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

"""

import backoff
import binascii
import hawkauthlib
import logging
import optparse
import os
import random
import requests
import time
Expand All @@ -32,13 +32,16 @@
LOGGER = "tokenserver.scripts.purge_old_records"

logger = logging.getLogger(LOGGER)
log_level = os.environ.get("PYTHON_LOG", "INFO").upper()
logger.setLevel(log_level)
logger.debug(f"Setting level to {log_level}")

PATTERN = "{node}/1.5/{uid}"


class Retry(requests.HTTPError):

def is_retryable(self):
self.response.status_code in [502, 503, 504]


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used?

Suggested change
class Retry(requests.HTTPError):
def is_retryable(self):
self.response.status_code in [502, 503, 504]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thanks. Forgot to pull that.

def purge_old_records(
secret,
grace_period=-1,
Expand All @@ -50,6 +53,7 @@ def purge_old_records(
force=False,
override_node=None,
uid_range=None,
metrics=None,
):
"""Purge old records from the database.

Expand Down Expand Up @@ -103,7 +107,11 @@ def purge_old_records(
row.node
)
if not dryrun:
database.delete_user_record(row.uid)
if metrics:
metrics.incr(
"delete_user",
tags={"type": "nodeless"})
retryable(database.delete_user_record, row.uid)
# NOTE: only delete_user+service_data calls count
# against the counter
elif not row.downed:
Expand All @@ -112,17 +120,28 @@ def purge_old_records(
row.uid,
row.node)
if not dryrun:
delete_service_data(
retryable(
delete_service_data,
row,
secret,
timeout=request_timeout,
dryrun=dryrun
dryrun=dryrun,
metrics=metrics,
)
database.delete_user_record(row.uid)
if metrics:
metrics.incr("delete_data")
retryable(
database.delete_user_record,
row.uid)
if metrics:
metrics.incr(
"delete_user",
tags={"type": "not_down"}
)
counter += 1
elif force:
delete_sd = not points_to_active(
database, row, override_node)
database, row, override_node, metrics=metrics)
logger.info(
"Forcing tokenserver record delete: "
f"{row.uid} on {row.node} "
Expand All @@ -137,29 +156,36 @@ def purge_old_records(
# request refers to a node not contained by
# the existing data set.
# (The call mimics a user DELETE request.)
try:
delete_service_data(
retryable(
delete_service_data,
row,
secret,
timeout=request_timeout,
dryrun=dryrun,
# if an override was specifed,
# use that node ID
override_node=override_node
override_node=override_node,
metrics=metrics,
)
except requests.HTTPError:
logger.warn(
"Delete failed for user "
f"{row.uid} [{row.node}]"
)
if override_node:
# Assume the override_node should be
# reachable
raise
database.delete_user_record(row.uid)
if metrics:
metrics.incr(
"delete_data",
tags={"type": "force"}
)

retryable(
database.delete_user_record,
row.uid)
if metrics:
metrics.incr(
"delete_data",
tags={"type": "force"}
)
counter += 1
if max_records and counter >= max_records:
logger.info("Reached max_records, exiting")
if metrics:
metrics.incr("max_records")
return True
if len(rows) < max_per_loop:
break
Expand All @@ -172,7 +198,8 @@ def purge_old_records(


def delete_service_data(
user, secret, timeout=60, dryrun=False, override_node=None):
user, secret, timeout=60, dryrun=False, override_node=None,
metrics=None):
"""Send a data-deletion request to the user's service node.

This is a little bit of hackery to cause the user's service node to
Expand Down Expand Up @@ -202,10 +229,25 @@ def delete_service_data(
return
resp = requests.delete(endpoint, auth=auth, timeout=timeout)
if resp.status_code >= 400 and resp.status_code != 404:
if metrics:
metrics.incr("error.gone")
resp.raise_for_status()


def points_to_active(database, replaced_at_row, override_node):
def retry_giveup(e):
return 500 <= e.response.status_code < 505


@backoff.on_exception(
backoff.expo,
requests.HTTPError,
giveup=retry_giveup
)
def retryable(fn, *args, **kwargs):
fn(*args, **kwargs)


def points_to_active(database, replaced_at_row, override_node, metrics=None):
"""Determine if a `replaced_at` user record has the same
generation/client_state as their active record.

Expand All @@ -232,7 +274,10 @@ def points_to_active(database, replaced_at_row, override_node):
replaced_at_row_keys_changed_at or replaced_at_row.generation,
binascii.unhexlify(replaced_at_row.client_state),
)
return user_fxa_kid == replaced_at_row_fxa_kid
override = user_fxa_kid == replaced_at_row_fxa_kid
if override and metrics:
metrics.incr("override")
return override
return False


Expand Down Expand Up @@ -339,9 +384,33 @@ def main(args=None):
default=None,
help="End of UID range to check"
)
parser.add_option(
"",
"--human_logs",
action="store_true",
help="Human readable logs"
)
parser.add_option(
"",
"--metric_host",
default=None,
help="Metric host name"
)
parser.add_option(
"",
"--metric_port",
default=None,
help="Metric host port"
)

opts, args = parser.parse_args(args)

# set up logging
util.configure_script_logging(opts, logger_name=LOGGER)

# set up metrics:
metrics = util.Metrics(opts, namespace="tokenserver")

if len(args) == 0:
parser.print_usage()
return 1
Expand All @@ -350,8 +419,6 @@ def main(args=None):
secret = args[-1]
logger.debug(f"Secret: {secret}")

util.configure_script_logging(opts)

uid_range = None
if opts.range_start or opts.range_end:
uid_range = (opts.range_start, opts.range_end)
Expand All @@ -368,6 +435,7 @@ def main(args=None):
force=opts.force,
override_node=opts.override_node,
uid_range=uid_range,
metrics=metrics,
)
if not opts.oneshot:
while True:
Expand All @@ -388,6 +456,7 @@ def main(args=None):
force=opts.force,
override_node=opts.override_node,
uid_range=uid_range,
metrics=metrics,
)
return 0

Expand Down