Skip to content

Commit

Permalink
Merge pull request #702 from rohanpm/deploy-config-flush-subtrees
Browse files Browse the repository at this point in the history
Flush affected subtrees during alias updates [RHELDST-23300]
  • Loading branch information
rohanpm committed Apr 23, 2024
2 parents d3580d9 + fdd2a1d commit c7b5bc6
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 46 deletions.
4 changes: 4 additions & 0 deletions exodus_gw/aws/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,7 @@ def write_batch(self, items: list[models.Item], delete: bool = False):
def write_config(self, config):
request = self.create_config_request(config)
self.batch_write(request)
# As well as writing to the DB, update our own local copy
# so that methods using the config are consistent with what
# we've just written.
self._definitions = config
58 changes: 53 additions & 5 deletions exodus_gw/worker/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@
from exodus_gw.database import db_engine
from exodus_gw.settings import Settings

from .cache import Flusher

LOG = logging.getLogger("exodus-gw")


@dramatiq.actor(
time_limit=Settings().actor_time_limit,
max_backoff=Settings().actor_max_backoff,
)
def complete_deploy_config_task(task_id: str):
settings = Settings()
def complete_deploy_config_task(
task_id: str,
settings: Settings = Settings(),
flush_paths: list[str] | None = None,
env: str | None = None,
):
db = Session(bind=db_engine(settings))
task = db.query(models.Task).filter(models.Task.id == task_id).first()

Expand All @@ -33,6 +39,18 @@ def complete_deploy_config_task(task_id: str):
)
return

if env and flush_paths:
flusher = Flusher(
paths=flush_paths,
settings=settings,
env=env,
# In this context Flusher does not need aliases, because
# the flush_paths passed into us have already had alias
# resolution applied.
aliases=[],
)
flusher.run()

task.state = schemas.TaskStates.complete
db.commit()

Expand All @@ -47,10 +65,17 @@ def complete_deploy_config_task(task_id: str):
time_limit=Settings().actor_time_limit,
max_backoff=Settings().actor_max_backoff,
)
def deploy_config(config: dict[str, Any], env: str, from_date: str):
settings = Settings()
def deploy_config(
config: dict[str, Any],
env: str,
from_date: str,
settings: Settings = Settings(),
):
db = Session(bind=db_engine(settings))
ddb = DynamoDB(env, settings, from_date)

original_aliases = {src: dest for (src, dest) in ddb.aliases_for_flush}

current_message_id = CurrentMessage.get_current_message().message_id
task = (
db.query(models.Task)
Expand Down Expand Up @@ -92,11 +117,34 @@ def deploy_config(config: dict[str, Any], env: str, from_date: str):
db.commit()
return

# After the write propagates, we may need to flush cache for some
# URLs depending on what changed in the config.
flush_paths: set[str] = set()

for src, updated_dest in ddb.aliases_for_flush:
if original_aliases.get(src) != updated_dest:
for published_path in db.query(models.PublishedPath).filter(
models.PublishedPath.env == env,
models.PublishedPath.web_uri.like(f"{src}/%"),
):
LOG.info(
"Updated alias %s will flush cache for %s",
src,
published_path.web_uri,
extra={"event": "deploy"},
)
flush_paths.add(published_path.web_uri)

# TTL must be sent in milliseconds but the setting is in minutes for
# convenience and consistency with other components.
ttl = settings.config_cache_ttl * 60000
msg = complete_deploy_config_task.send_with_options(
kwargs={"task_id": str(task.id)}, delay=ttl
kwargs={
"task_id": str(task.id),
"env": env,
"flush_paths": sorted(flush_paths),
},
delay=ttl,
)
LOG.debug(
"Sent task %s for completion via message %s",
Expand Down
16 changes: 12 additions & 4 deletions tests/aws/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
{
"PutRequest": {
"Item": {
"web_uri": {"S": "/to/repomd.xml"},
"web_uri": {
"S": "/content/testproduct/1.1.0/repo/repomd.xml"
},
"object_key": {
"S": "3f449eb3b942af58e9aca4c1cffdef89"
"c3f1552c20787ae8c966767a1fedd3a5"
Expand All @@ -64,7 +66,9 @@
{
"PutRequest": {
"Item": {
"web_uri": {"S": "/to/.__exodus_autoindex"},
"web_uri": {
"S": "/content/testproduct/1.1.0/repo/.__exodus_autoindex"
},
"object_key": {
"S": "5891b5b522d5df086d0ff0b110fbd9d2"
"1bb4fc7163af34d08286a2e846f6be03"
Expand Down Expand Up @@ -100,15 +104,19 @@
{
"DeleteRequest": {
"Key": {
"web_uri": {"S": "/to/repomd.xml"},
"web_uri": {
"S": "/content/testproduct/1.1.0/repo/repomd.xml"
},
"from_date": {"S": "2023-10-04 03:52:02"},
}
}
},
{
"DeleteRequest": {
"Key": {
"web_uri": {"S": "/to/.__exodus_autoindex"},
"web_uri": {
"S": "/content/testproduct/1.1.0/repo/.__exodus_autoindex"
},
"from_date": {"S": "2023-10-04 03:52:02"},
}
}
Expand Down
58 changes: 36 additions & 22 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import os
from datetime import datetime
from typing import Any

import dramatiq
import mock
Expand Down Expand Up @@ -31,29 +32,34 @@ def mock_aws_client():
yield aws_client


@pytest.fixture()
def fake_dynamodb_query(fake_config: dict[str, Any]):
# Returns a callable which can be used as a mock side-effect
# to make a DynamoDB query on exodus-config return the current
# fake_config.
def side_effect(
TableName,
Limit,
ScanIndexForward,
KeyConditionExpression,
ExpressionAttributeValues,
):
# This is the only query we expect right now.
assert TableName == "my-config"
assert Limit == 1
return {
"Count": 1,
"Items": [{"config": {"S": json.dumps(fake_config)}}],
}

return side_effect


@pytest.fixture(autouse=True)
def mock_boto3_client():
def mock_boto3_client(fake_dynamodb_query):
with mock.patch("boto3.session.Session") as mock_session:
client = mock.MagicMock()
client.query.return_value = {
"ConsumedCapacity": {
"CapacityUnits": 0,
"GlobalSecondaryIndexes": {},
"LocalSecondaryIndexes": {},
"ReadCapacityUnits": 0,
"Table": {
"CapacityUnits": 0,
"ReadCapacityUnits": 0,
"WriteCapacityUnits": 0,
},
"TableName": "my-table",
"WriteCapacityUnits": 0,
},
"Count": 0,
"Items": [],
"LastEvaluatedKey": {},
"ScannedCount": 0,
}
client.query.side_effect = fake_dynamodb_query
client.__enter__.return_value = client
mock_session().client.return_value = client
yield client
Expand Down Expand Up @@ -179,13 +185,13 @@ def fake_publish():
updated=datetime(2023, 10, 4, 3, 52, 1),
),
models.Item(
web_uri="/to/repomd.xml",
web_uri="/content/testproduct/1/repo/repomd.xml",
object_key="3f449eb3b942af58e9aca4c1cffdef89c3f1552c20787ae8c966767a1fedd3a5",
publish_id=publish.id,
updated=datetime(2023, 10, 4, 3, 52, 2),
),
models.Item(
web_uri="/to/.__exodus_autoindex",
web_uri="/content/testproduct/1/repo/.__exodus_autoindex",
object_key="5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03",
publish_id=publish.id,
updated=datetime(2023, 10, 4, 3, 52, 2),
Expand Down Expand Up @@ -254,8 +260,16 @@ def fake_config():
"src": "/content/dist/rhel8/8",
"dest": "/content/dist/rhel8/8.5",
},
{
"src": "/content/testproduct/1",
"dest": "/content/testproduct/1.1.0",
},
],
"rhui_alias": [
{"src": "/content/dist/rhel8/rhui", "dest": "/content/dist/rhel8"},
{
"src": "/content/testproduct/rhui",
"dest": "/content/testproduct",
},
],
}
2 changes: 1 addition & 1 deletion tests/worker/test_cdn_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def test_flush_cdn_cache_typical(
db.commit()

# Set up some aliases to exercise alias resolution.
mock_boto3_client.query.return_value = {
mock_boto3_client.query.side_effect = lambda *args, **kwargs: {
"Items": [
{
"config": {
Expand Down
Loading

0 comments on commit c7b5bc6

Please sign in to comment.