Skip to content

Commit

Permalink
Implement operational database purging
Browse files Browse the repository at this point in the history
Add a cloud function and a cloud scheduler job for purging the
operational database of data that is older than six months, rounded down
to month boundary.
  • Loading branch information
spbnick committed Jan 17, 2024
1 parent 7da6859 commit 2b390c3
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 1 deletion.
8 changes: 7 additions & 1 deletion cloud
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ function execute_command() {
declare -r load_queue_trigger_topic="${prefix}load_queue_trigger"
declare -r cache_bucket_name="${project}_${prefix}cache"
declare -r pick_notifications_trigger_topic="${prefix}pick_notifications_trigger"
declare -r purge_op_db_trigger_topic="${prefix}purge_op_db_trigger"
declare -r cache_redirect_function_name="cache_redirect"
declare cache_redirector_url="https://${CLOUD_FUNCTION_REGION}"
declare cache_redirector_url+="-${project}.cloudfunctions.net/"
Expand Down Expand Up @@ -255,6 +256,7 @@ function execute_command() {
--new-load-subscription="$new_load_subscription"
--updated-publish="$updated_publish"
--updated-topic="$updated_topic"
--purge-op-db-trigger-topic="$purge_op_db_trigger_topic"
--updated-urls-topic="$updated_urls_topic"
--spool-collection-path="$spool_collection_path"
--extra-cc="$extra_cc"
Expand Down Expand Up @@ -310,6 +312,7 @@ function execute_command() {
--updated-debug-subscription="$updated_debug_subscription" \
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-op-db-trigger-topic "$purge_op_db_trigger_topic" \
--smtp-topic="$smtp_topic" \
--smtp-subscription="$smtp_subscription"
sections_run "$sections" firestore_deploy "$project"
Expand All @@ -322,6 +325,7 @@ function execute_command() {
--load-queue-trigger-topic="$load_queue_trigger_topic" \
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-op-db-trigger-topic "$purge_op_db_trigger_topic" \
--updated-urls-topic="$updated_urls_topic" \
--updated-topic="$updated_topic" \
--cache-redirect-function-name="$cache_redirect_function_name" \
Expand All @@ -331,7 +335,8 @@ function execute_command() {
--project="$project" \
--prefix="$prefix" \
--load-queue-trigger-topic="$load_queue_trigger_topic" \
--pick-notifications-trigger-topic="$pick_notifications_trigger_topic"
--pick-notifications-trigger-topic="$pick_notifications_trigger_topic" \
--purge-op-db-trigger-topic="$purge_op_db_trigger_topic"
sections_run "$sections" submitters_deploy \
"$project" "$new_topic" "${submitters[@]}"
elif [ "$command" == "shell" ]; then
Expand Down Expand Up @@ -361,6 +366,7 @@ function execute_command() {
--load-queue-trigger-topic="$load_queue_trigger_topic" \
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-op-db-trigger-topic "$purge_op_db_trigger_topic" \
--new-topic="$new_topic" \
--new-load-subscription="$new_load_subscription" \
--new-debug-subscription="$new_debug_subscription" \
Expand Down
15 changes: 15 additions & 0 deletions kcidb/cloud/cloud_functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ declare _CLOUD_FUNCTIONS_SH=
# --new-topic=NAME --new-load-subscription=NAME
# --updated-publish=true|false
# --updated-topic=NAME
# --purge-op-db-trigger-topic=NAME
# --updated-urls-topic=NAME
# --cache-bucket-name=NAME
# --cache-redirector-url=URL
Expand All @@ -38,6 +39,7 @@ function cloud_functions_env() {
heavy_asserts \
new_topic new_load_subscription \
updated_publish updated_topic \
purge_op_db_trigger_topic \
updated_urls_topic \
spool_collection_path \
extra_cc \
Expand Down Expand Up @@ -70,6 +72,7 @@ function cloud_functions_env() {
[KCIDB_CLEAN_TEST_DATABASES]="$clean_test_databases"
[KCIDB_EMPTY_TEST_DATABASES]="$empty_test_databases"
[KCIDB_UPDATED_QUEUE_TOPIC]="$updated_topic"
[KCIDB_PURGE_OP_DB_TRIGGER_TOPIC]="$purge_op_db_trigger_topic"
[KCIDB_UPDATED_URLS_TOPIC]="$updated_urls_topic"
[KCIDB_SELECTED_SUBSCRIPTIONS]=""
[KCIDB_SPOOL_COLLECTION_PATH]="$spool_collection_path"
Expand Down Expand Up @@ -123,6 +126,7 @@ function cloud_functions_env() {
# --project=NAME --prefix=PREFIX --source=PATH
# --load-queue-trigger-topic=NAME
# --pick-notifications-trigger-topic=NAME
# --purge-op-db-trigger-topic=NAME
# --updated-urls-topic=NAME
# --updated-topic=NAME
# --spool-collection-path=PATH
Expand All @@ -133,6 +137,7 @@ function cloud_functions_deploy() {
params="$(getopt_vars sections project prefix source \
load_queue_trigger_topic \
pick_notifications_trigger_topic \
purge_op_db_trigger_topic \
updated_urls_topic \
updated_topic \
spool_collection_path \
Expand All @@ -152,6 +157,14 @@ function cloud_functions_deploy() {
trigger_event+="document.create"
declare trigger_resource="projects/$project/databases/(default)/documents/"
trigger_resource+="${spool_collection_path}/{notification_id}"
cloud_function_deploy "$sections" "$source" "$project" "$prefix" \
purge_op_db \
--env-vars-file "$env_yaml_file" \
--trigger-topic "${purge_op_db_trigger_topic}" \
--memory 256MB \
--max-instances=1 \
--timeout 540

cloud_function_deploy "$sections" "$source" "$project" "$prefix" \
pick_notifications \
--env-vars-file "$env_yaml_file" \
Expand Down Expand Up @@ -215,6 +228,8 @@ function cloud_functions_withdraw() {
cache_redirect_function_name \
-- "$@")"
eval "$params"
cloud_function_withdraw "$sections" "$project" "$prefix" \
purge_op_db
cloud_function_withdraw "$sections" "$project" "$prefix" \
pick_notifications
cloud_function_withdraw "$sections" "$project" "$prefix" \
Expand Down
6 changes: 6 additions & 0 deletions kcidb/cloud/pubsub.sh
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ function pubsub_subscription_withdraw() {
# --updated-topic=NAME
# --updated-debug-subscription=NAME
# --pick-notifications-trigger-topic=NAME
# --purge-op-db-trigger-topic=NAME
# --updated-urls-topic=NAME
# --smtp-topic=NAME
# --smtp-subscription=NAME
Expand All @@ -147,6 +148,7 @@ function pubsub_deploy() {
updated_topic \
updated_debug_subscription \
pick_notifications_trigger_topic \
purge_op_db_trigger_topic \
updated_urls_topic \
smtp_topic smtp_subscription \
-- "$@")"
Expand All @@ -167,6 +169,7 @@ function pubsub_deploy() {
pubsub_subscription_deploy "$project" "${updated_topic}" \
"${updated_debug_subscription}"
pubsub_topic_deploy "$project" "${pick_notifications_trigger_topic}"
pubsub_topic_deploy "$project" "${purge_op_db_trigger_topic}"
pubsub_topic_deploy "$project" "${updated_urls_topic}"
if [ -n "$smtp_topic" ]; then
pubsub_topic_deploy "$project" "$smtp_topic"
Expand All @@ -179,6 +182,7 @@ function pubsub_deploy() {
# Args: --project=NAME
# --load-queue-trigger-topic=NAME
# --pick-notifications-trigger-topic=NAME
# --purge-op-db-trigger-topic=NAME
# --updated-urls-topic=NAME
# --new-topic=NAME
# --new-load-subscription=NAME
Expand All @@ -192,6 +196,7 @@ function pubsub_withdraw() {
params="$(getopt_vars project \
load_queue_trigger_topic \
pick_notifications_trigger_topic \
purge_op_db_trigger_topic \
updated_urls_topic \
new_topic \
new_load_subscription \
Expand All @@ -213,6 +218,7 @@ function pubsub_withdraw() {
pubsub_topic_withdraw "$project" "$load_queue_trigger_topic"
pubsub_topic_withdraw "$project" "$pick_notifications_trigger_topic"
pubsub_topic_withdraw "$project" "$updated_urls_topic"
pubsub_topic_withdraw "$project" "$purge_op_db_trigger_topic"
}

fi # _PUBSUB_SH
6 changes: 6 additions & 0 deletions kcidb/cloud/scheduler.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ function scheduler_job_withdraw() {
# --prefix=STRING
# --load-queue-trigger-topic=NAME
# --pick-notifications-trigger-topic=NAME
# --purge-op-db-trigger-topic=NAME
function scheduler_deploy() {
declare params
params="$(getopt_vars project \
prefix \
load_queue_trigger_topic \
pick_notifications_trigger_topic \
purge_op_db_trigger_topic \
-- "$@")"
eval "$params"
# Deploy the jobs
Expand All @@ -80,6 +82,9 @@ function scheduler_deploy() {
scheduler_job_pubsub_deploy "$project" "${prefix}pick_notifications_trigger" \
"$pick_notifications_trigger_topic" \
'*/10 * * * *' '{}'
scheduler_job_pubsub_deploy "$project" "${prefix}purge_op_db_trigger" \
"$purge_op_db_trigger_topic" \
'0 6 * * MON' '{"delta": {"months": 6}}'
}

# Withdraw from the scheduler
Expand All @@ -89,6 +94,7 @@ function scheduler_withdraw() {
declare -r prefix="$1"; shift
scheduler_job_withdraw "$project" "${prefix}load_queue_trigger"
scheduler_job_withdraw "$project" "${prefix}pick_notifications_trigger"
scheduler_job_withdraw "$project" "${prefix}purge_op_db_trigger"
}

fi # _SCHEDULER_SH
1 change: 1 addition & 0 deletions kcidb/cloud/sections.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ declare -A -r SECTIONS=(
["secrets"]="Secrets"
["firestore"]="Firestore database"
["storage"]="Google cloud storage"
["cloud_functions.purge_op_db"]="Cloud Functions: kcidb_purge_op_db()"
["cloud_functions.pick_notifications"]="Cloud Functions: kcidb_pick_notifications()"
["cloud_functions.send_notification"]="Cloud Functions: kcidb_send_notification()"
["cloud_functions.spool_notifications"]="Cloud Functions: kcidb_spool_notifications()"
Expand Down
73 changes: 73 additions & 0 deletions kcidb/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import argparse
import logging
import json
import datetime
from textwrap import indent
from importlib import metadata
import dateutil
import dateutil.relativedelta
import dateutil.parser
from google.cloud import secretmanager
import jsonschema
import jq
import kcidb.io as io

Expand Down Expand Up @@ -482,3 +486,72 @@ def isliced(iterable, size):
if not iterator_slice:
break
yield iterator_slice


def parse_timedelta_json(data, stamp):
"""
Parse JSON data for a time delta: an optional timestamp, and an optional
delta, but at least one of them must be present. If the timestamp is not
present the "stamp" is used instead. The delta can only be positive and is
subtracted from the timestamp. The timestamp is rounded down to the
precision of the delta, based on the presence of its components in the
JSON.
Args:
data: The JSON data to parse.
stamp: The (aware) timestamp to use, if not present in the data.
"""
# Recognized time components, smaller to larger, and their minimums
components_min = dict(
microseconds=0,
seconds=0,
minutes=0,
hours=0,
days=1,
months=1,
years=1,
)
# The timedelta schema
schema = dict(
type="object",
properties=dict(
delta=dict(
type="object",
properties={
# Delta is always positive and is subtracted to reduce
# chance of forgetting the sign and going to the future,
# wiping everything
c: dict(type="integer", minimum=0)
for c in components_min
},
anyOf=[dict(required=[c]) for c in components_min],
additionalProperties=False,
),
stamp=dict(type="string", format="date-time",),
),
anyOf=[{"required": ["delta"]}, {"required": ["stamp"]},],
additionalProperties=False,
)

assert isinstance(stamp, datetime.datetime) and stamp.tzinfo
jsonschema.validate(
instance=data, schema=schema,
format_checker=jsonschema.Draft7Validator.FORMAT_CHECKER
)

# Use the base timestamp from the data, if present
if "stamp" in data:
stamp = dateutil.parser.isoparse(data["stamp"])

# Retrieve and apply the timedelta from input, if any
if "delta" in data:
delta = data["delta"].copy()
# Round the timestamp down to delta precision and subtract delta
for component, minimum in components_min.items():
if component in delta:
break
# Singular means replace with value
delta[component[:-1]] = minimum
stamp = stamp - dateutil.relativedelta.relativedelta(**delta)

return stamp
88 changes: 88 additions & 0 deletions kcidb/test_misc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""kcdib.misc module tests"""

import datetime
from pytest import raises
from jsonschema.exceptions import ValidationError
import kcidb.misc


def test_parse_timedelta_json():
"""Check kcidb-tests-validate works"""
f = kcidb.misc.parse_timedelta_json
min_stamp = datetime.datetime(1, 1, 1, 0, 0, 0, 0,
tzinfo=datetime.timezone.utc)
stamp = datetime.datetime(1, 2, 3, 4, 5, 6, 7,
tzinfo=datetime.timezone.utc)
stamp_str = stamp.isoformat(timespec="microseconds")
with raises(ValidationError):
f([], stamp)
with raises(ValidationError):
f({}, stamp)
with raises(ValidationError):
f(dict(stamp=10), stamp)
with raises(ValidationError):
f(dict(stamp=""), stamp)
with raises(ValidationError):
f(dict(delta=10), stamp)
with raises(ValidationError):
f(dict(delta=""), stamp)
with raises(ValidationError):
f(dict(delta={}), stamp)
with raises(ValidationError):
f(dict(delta=dict(seconds=-1)), stamp)

assert f(dict(stamp=stamp_str), min_stamp) == stamp
assert f(dict(delta=dict(microseconds=0), stamp=stamp_str),
min_stamp) == stamp

assert f(dict(delta=dict(microseconds=0)), stamp) == stamp
assert f(dict(delta=dict(seconds=0, microseconds=0)), stamp) == stamp
assert f(dict(delta=dict(minutes=0, seconds=0, microseconds=0)),
stamp) == stamp
assert f(dict(delta=dict(hours=0, minutes=0, seconds=0, microseconds=0)),
stamp) == stamp
assert f(dict(delta=dict(days=0,
hours=0, minutes=0, seconds=0, microseconds=0)),
stamp) == stamp
assert f(dict(delta=dict(months=0, days=0,
hours=0, minutes=0, seconds=0, microseconds=0)),
stamp) == stamp
assert f(dict(delta=dict(years=0, months=0, days=0,
hours=0, minutes=0, seconds=0, microseconds=0)),
stamp) == stamp
assert f(dict(delta=dict(seconds=0)), stamp) == \
datetime.datetime(1, 2, 3, 4, 5, 6, tzinfo=datetime.timezone.utc)
assert f(dict(delta=dict(minutes=0)), stamp) == \
datetime.datetime(1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc)
assert f(dict(delta=dict(hours=0)), stamp) == \
datetime.datetime(1, 2, 3, 4, tzinfo=datetime.timezone.utc)
assert f(dict(delta=dict(days=0)), stamp) == \
datetime.datetime(1, 2, 3, tzinfo=datetime.timezone.utc)
assert f(dict(delta=dict(months=0)), stamp) == \
datetime.datetime(1, 2, 1, tzinfo=datetime.timezone.utc)
assert f(dict(delta=dict(years=0)), stamp) == min_stamp

assert f(
dict(delta=dict(years=0), stamp=stamp_str),
min_stamp
) == min_stamp
assert f(
dict(delta=dict(years=0, months=1, days=2), stamp=stamp_str),
min_stamp
) == min_stamp

assert f(
dict(delta=dict(months=6)),
datetime.datetime(2023, 12, 15, 15, 52, 24, 204547,
tzinfo=datetime.timezone.utc)
) == datetime.datetime(2023, 6, 1, tzinfo=datetime.timezone.utc)

assert f(
dict(delta=dict(months=2)),
datetime.datetime(2023, 3, 31, 21, 11, 1, tzinfo=datetime.timezone.utc)
) == datetime.datetime(2023, 1, 1, tzinfo=datetime.timezone.utc)

assert f(
dict(delta=dict(months=3)),
datetime.datetime(2023, 4, 3, 2, 1, tzinfo=datetime.timezone.utc)
) == datetime.datetime(2023, 1, 1, tzinfo=datetime.timezone.utc)
Loading

0 comments on commit 2b390c3

Please sign in to comment.