Skip to content

Commit

Permalink
Introduce concept of phase1 commit [RHELDST-20490]
Browse files Browse the repository at this point in the history
Internally, items have always been categorized into either phase1 or
phase2 during commit, with phase2 being committed last. This is needed
to ensure, for example, that a repomd.xml is not committed until after
all other files referenced by it have been committed.

This change now documents that concept and exposes it via the API.
By default, clients get the same behavior as always, but they can now
also explicitly request a phase1 commit.

A phase1 commit writes phase1 items to the DB, same as always, but then
stops without proceeding to write phase2 items. It also leaves the
publish open for later modifications and a later phase2 commit.

=== Why? ===

This will be used to solve the following Pub/rhsm-pulp/exodus-gw
integration problem:

Imagine that you need to publish multiple Pulp yum repos. You want this
to be atomic, so you want them all to use the same exodus-gw publish.

You ask the repos to publish, and they succeed, but then something
goes wrong during the later exodus-gw commit which is performed outside
of Pulp.

So, you restart the whole process and republish the repos again using
a new exodus-gw publish, and commit that successfully.

Problem: as far as Pulp is concerned, the publish tasks from the first
attempt were completely successful, as it does not "see" the later
failure to commit. Therefore, Pulp incorrectly thinks that the RPMs
processed by those tasks are fully published to the CDN, and so skips
publishing them during later tasks. This leads to missing content on the
CDN.

Solution: exodus-rsync, as invoked by Pulp during publish, will request
a phase1 commit during publish of each repo. This ensures the processed
RPMs (or other non-entrypoint files) are fully published on the CDN by the
time the Pulp publish task succeeds, matching Pulp's expectations.
The publish of repomd.xml is still held back until a later phase2
commit, retaining the atomic semantics across multiple repos.
  • Loading branch information
rohanpm committed Oct 2, 2023
1 parent cf14687 commit 9af04c5
Show file tree
Hide file tree
Showing 8 changed files with 583 additions and 95 deletions.
98 changes: 98 additions & 0 deletions exodus_gw/migrations/versions/1d51b80e64ba_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Adds columns supporting phase1 commit
Revision ID: 1d51b80e64ba
Revises: 0d88322fe0b3
Create Date: 2023-10-02 11:44:04.604593
"""
import sqlalchemy as sa
from alembic import op

from exodus_gw.migrations.test import tested_by

# revision identifiers, used by Alembic.
revision = "1d51b80e64ba"
down_revision = "0d88322fe0b3"
branch_labels = None
depends_on = None


def upgrade_testdata():
# Make a commit_task exist so we can verify it's transformed
# into phase2 commit
task_id = "41400ff1-9198-4b35-b24e-a71a29957ae1"
publish_id = "f7a38eb1-0d75-4245-a4ef-3dfd02d8129f"
op.bulk_insert(
sa.table(
"tasks",
sa.column("id", sa.Uuid(as_uuid=False)),
sa.column("state", sa.String()),
sa.column("type", sa.String()),
),
[
{
"id": task_id,
"state": "NOT_STARTED",
"type": "commit",
},
],
)
op.bulk_insert(
sa.table(
"commit_tasks",
sa.column("id", sa.Uuid(as_uuid=False)),
sa.column("publish_id", sa.Uuid(as_uuid=False)),
),
[
{
"id": task_id,
"publish_id": publish_id,
},
],
)

# and make some items exist too, which will be marked dirty
op.bulk_insert(
sa.table(
"items",
sa.column("id", sa.Uuid(as_uuid=False)),
sa.column("web_uri", sa.String()),
sa.column("object_key", sa.String()),
sa.column("publish_id", sa.Uuid(as_uuid=False)),
),
[
{
"id": "f021da4d-5c3b-483f-af8d-85117fb64b2c",
"publish_id": publish_id,
"web_uri": "/foo",
"object_key": "a1b2c3",
},
{
"id": "9dafa529-03e6-4412-85db-f681ea98d75d",
"publish_id": publish_id,
"web_uri": "/bar",
"object_key": "a1b2c3",
},
],
)


@tested_by(upgrade_testdata)
def upgrade():
op.add_column(
"commit_tasks",
sa.Column(
"commit_mode", sa.String(), nullable=False, server_default="phase2"
),
)
op.add_column(
"items",
sa.Column(
"dirty", sa.Boolean(), nullable=False, server_default="TRUE"
),
)


def downgrade():
op.drop_column("items", "dirty")
op.drop_column("commit_tasks", "commit_mode")
3 changes: 2 additions & 1 deletion exodus_gw/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .base import Base
from .dramatiq import DramatiqConsumer, DramatiqMessage
from .publish import Item, Publish
from .service import CommitTask, Task
from .service import CommitModes, CommitTask, Task

__all__ = [
"Base",
Expand All @@ -12,4 +12,5 @@
"Publish",
"Task",
"CommitTask",
"CommitModes",
]
5 changes: 5 additions & 0 deletions exodus_gw/models/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from fastapi import HTTPException
from sqlalchemy import (
Boolean,
DateTime,
ForeignKey,
String,
Expand Down Expand Up @@ -102,6 +103,10 @@ class Item(Base):
object_key: Mapped[Optional[str]] = mapped_column(String)
content_type: Mapped[Optional[str]] = mapped_column(String)
link_to: Mapped[Optional[str]] = mapped_column(String)

dirty: Mapped[bool] = mapped_column(Boolean, default=True)
"""True if item still needs to be written to DynamoDB."""

publish_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("publishes.id")
)
Expand Down
9 changes: 9 additions & 0 deletions exodus_gw/models/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from enum import Enum
from typing import Optional

from sqlalchemy import DateTime, ForeignKey, String, event
Expand All @@ -8,6 +9,11 @@
from .base import Base


class CommitModes(str, Enum):
phase1 = "phase1"
phase2 = "phase2"


class Task(Base):
__tablename__ = "tasks"
__mapper_args__ = {
Expand All @@ -30,6 +36,9 @@ class CommitTask(Task):

id: Mapped[str] = mapped_column(ForeignKey("tasks.id"), primary_key=True)
publish_id: Mapped[str] = mapped_column(Uuid(as_uuid=False))
commit_mode: Mapped[str] = mapped_column(
String, default=CommitModes.phase2
)


@event.listens_for(Task, "before_update")
Expand Down
132 changes: 106 additions & 26 deletions exodus_gw/routers/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
## Atomicity
exodus-gw aims to deliver atomic semantics for publishes; i.e., for a set
exodus-gw aims to enable atomic semantics for publishes; i.e., for a set
of published content, committing the publish will make either *all* of it
available (if commit succeeds) or *none* of it available (if commit fails),
with no partial updates becoming visible from the point of view of a CDN
Expand Down Expand Up @@ -36,17 +36,57 @@
with knowledge of the types of content being published. Files which serve
as an index or entry point to a set of content are committed last, to ensure
minimal impact in the case that a commit is interrupted.
- Example: if a publish includes yum repositories, exodus-gw will ensure that
repomd.xml files are always committed last - ensuring there is no possibility
that an interrupted commit would unveil a repomd.xml file referencing other
files which were not yet committed.
See "two-phase commit" below for a more in-depth explanation of this.
It should be noted that the atomicity discussed here applies only to the interaction
between exodus-gw and its underlying data store. exodus-gw does not contain any CDN
cache purging logic; the impact of CDN caching must also be considered when evaluating
the semantics of a publish from the CDN client's point of view.
## Two-phase commit
All published content is categorized into two phases, phase 1 and phase 2,
and committed in that order. exodus-gw performs this categorization internally
and clients cannot influence this.
Simple clients do not need to worry about this, but in more complicated scenarios
the client may wish to control the commit of each phase independently. In such
cases it is important to understand how the two phases are intended to work.
Phase 1 content:
- includes the majority of content within a publish
- should be immutable
- is usually not discoverable by CDN users without consulting some form of index
- examples: RPM files within a yum repo; any generic file
Phase 2 content:
- includes a small minority of content within a publish
- is usually mutable, perhaps changing at every publish
- contains indexes, repository entry points or other references pointing at
phase 1 content (and thus must be committed last)
- examples: `repodata/repomd.xml` within a yum repo; `PULP_MANIFEST` within a
Pulp file repository
As an example of this phased approach, consider the publish of a yum repository.
A client consuming packages from a yum repository discovers available packages
via a series of fetches involving multiple files which are published together,
e.g.
`repodata/repomd.xml` => `repodata/<checksum>-primary.xml.gz`
=> `Packages/<somepackage>.rpm`
If no ordering were to be applied to the publish of these files it would be
possible for `repomd.xml` to be published prior to `<checksum>-primary.xml.gz`,
or for `<checksum>-primary.xml.gz` to be published prior to
`Packages/<somepackage>.rpm`, either of which could cause a CDN consumer to
attempt to fetch content which has not yet been published, resulting in 404
errors.
This problem is avoided by exodus-gw internally categorizing `repomd.xml` as
phase 2 content and ensuring it is committed only after the rest of the files
in the repo, which are categorized as phase 1 content.
## Expiry of publish objects
Expand All @@ -72,7 +112,7 @@

import logging
from datetime import datetime, timedelta
from typing import Dict, List, Union
from typing import Dict, List, Optional, Union
from uuid import uuid4

from fastapi import APIRouter, Body, HTTPException, Query
Expand Down Expand Up @@ -202,8 +242,11 @@ def update_publish_items(
)

# Convert the list into dict and update each dict with a publish_id.
# Each item is also set 'dirty' to ensure it's written to DynamoDB,
# even if it was already written before.
items_data = [
{**item.model_dump(), "publish_id": db_publish.id} for item in items
{**item.model_dump(), "publish_id": db_publish.id, "dirty": True}
for item in items
]

LOG.debug(
Expand Down Expand Up @@ -242,19 +285,44 @@ def commit_publish(
deadline: Union[str, None] = Query(
default=None, examples=["2022-07-25T15:47:47Z"]
),
commit_mode: Optional[models.CommitModes] = Query(
default=None,
title="commit mode",
description="See: [Two-phase commit](#section/Two-phase-commit)",
examples=[models.CommitModes.phase1, models.CommitModes.phase2],
),
) -> models.CommitTask:
"""Commit an existing publish object.
**Required roles**: `{env}-publisher`
Committing a publish has the following effects:
Committing a publish is required in order to expose published content from the CDN.
There are two available commit modes, "phase1" and "phase2" (default).
### Phase 1
A phase 1 commit:
- is optional.
- can be performed more than once.
- does not prevent further modifications to the publish.
- will commit all phase 1 content (e.g. packages in yum repos), but not phase 2
content (e.g. repodata in yum repos); see
[Two-phase commit](#section/Two-phase-commit).
- is not rolled back if a later phase 2 commit fails (or never occurs).
### Phase 2
- If enabled by server settings, static HTML indexes will be generated and added
onto the publish for certain content types such as yum repositories.
- All URIs contained within the publish become accessible from the CDN,
pointing at their corresponding objects.
- This occurs with all-or-nothing semantics; see [Atomicity](#section/Atomicity).
- The publish object becomes frozen - no further items can be added.
A phase 2 commit:
- is the default when no commit mode is specified.
- can (and should) be performed exactly once.
- freezes the associated publish object - no further items can be added.
- will commit all content with near-atomic behavior; see
[Atomicity](#section/Atomicity).
### Notes
Commit occurs asynchronously. This API returns a Task object which may be used
to monitor the progress of the commit.
Expand All @@ -264,7 +332,7 @@ def commit_publish(
path are being committed concurrently, URIs on the CDN may end up pointing to
objects from any of those publishes.
"""

commit_mode = (commit_mode or models.CommitModes.phase2).value
now = datetime.utcnow()

if isinstance(deadline, str):
Expand Down Expand Up @@ -293,14 +361,20 @@ def commit_publish(
)

if db_publish.state != "PENDING":
# Check if there is already an associated task and, if so, return it rather than raise.
task = (
db.query(models.CommitTask)
.filter(models.CommitTask.publish_id == publish_id)
.first()
)
if task:
return task
if commit_mode == models.CommitModes.phase2:
# Phase 2 commit can only be done once, so asking to commit again is
# an error, but to make the API idempotent we check if there is
# already an associated task and return it if so.
task = (
db.query(models.CommitTask)
.filter(
models.CommitTask.publish_id == publish_id,
models.CommitTask.commit_mode == commit_mode,
)
.first()
)
if task:
return task

raise HTTPException(
status_code=409,
Expand All @@ -314,20 +388,26 @@ def commit_publish(
publish_id=str(db_publish.id),
env=env.name,
from_date=str(now),
commit_mode=commit_mode,
)

LOG.info(
"Enqueued commit for '%s'",
"Enqueued %s commit for '%s'",
commit_mode,
msg.kwargs["publish_id"],
extra={"event": "publish", "success": True},
)
db_publish.state = schemas.PublishStates.committing

# Only phase2 commit moves the publish into committing state.
if commit_mode == models.CommitModes.phase2:
db_publish.state = schemas.PublishStates.committing

task = models.CommitTask(
id=msg.message_id,
publish_id=msg.kwargs["publish_id"],
state="NOT_STARTED",
deadline=deadline_obj,
commit_mode=commit_mode,
)
db.add(task)

Expand Down
Loading

0 comments on commit 9af04c5

Please sign in to comment.