Skip to content

Commit

Permalink
Merge pull request #596 from rohanpm/phase1-commit
Browse files Browse the repository at this point in the history
Introduce concept of phase1 commit [RHELDST-20490]
  • Loading branch information
rohanpm committed Oct 4, 2023
2 parents 5e4abb7 + 017a2e9 commit 93a3255
Show file tree
Hide file tree
Showing 8 changed files with 585 additions and 95 deletions.
98 changes: 98 additions & 0 deletions exodus_gw/migrations/versions/1d51b80e64ba_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Adds columns supporting phase1 commit
Revision ID: 1d51b80e64ba
Revises: 0d88322fe0b3
Create Date: 2023-10-02 11:44:04.604593
"""
import sqlalchemy as sa
from alembic import op

from exodus_gw.migrations.test import tested_by

# revision identifiers, used by Alembic.
revision = "1d51b80e64ba"
down_revision = "0d88322fe0b3"
branch_labels = None
depends_on = None


def upgrade_testdata():
# Make a commit_task exist so we can verify it's transformed
# into phase2 commit
task_id = "41400ff1-9198-4b35-b24e-a71a29957ae1"
publish_id = "f7a38eb1-0d75-4245-a4ef-3dfd02d8129f"
op.bulk_insert(
sa.table(
"tasks",
sa.column("id", sa.Uuid(as_uuid=False)),
sa.column("state", sa.String()),
sa.column("type", sa.String()),
),
[
{
"id": task_id,
"state": "NOT_STARTED",
"type": "commit",
},
],
)
op.bulk_insert(
sa.table(
"commit_tasks",
sa.column("id", sa.Uuid(as_uuid=False)),
sa.column("publish_id", sa.Uuid(as_uuid=False)),
),
[
{
"id": task_id,
"publish_id": publish_id,
},
],
)

# and make some items exist too, which will be marked dirty
op.bulk_insert(
sa.table(
"items",
sa.column("id", sa.Uuid(as_uuid=False)),
sa.column("web_uri", sa.String()),
sa.column("object_key", sa.String()),
sa.column("publish_id", sa.Uuid(as_uuid=False)),
),
[
{
"id": "f021da4d-5c3b-483f-af8d-85117fb64b2c",
"publish_id": publish_id,
"web_uri": "/foo",
"object_key": "a1b2c3",
},
{
"id": "9dafa529-03e6-4412-85db-f681ea98d75d",
"publish_id": publish_id,
"web_uri": "/bar",
"object_key": "a1b2c3",
},
],
)


@tested_by(upgrade_testdata)
def upgrade():
op.add_column(
"commit_tasks",
sa.Column(
"commit_mode", sa.String(), nullable=False, server_default="phase2"
),
)
op.add_column(
"items",
sa.Column(
"dirty", sa.Boolean(), nullable=False, server_default="TRUE"
),
)


def downgrade():
op.drop_column("items", "dirty")
op.drop_column("commit_tasks", "commit_mode")
3 changes: 2 additions & 1 deletion exodus_gw/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .base import Base
from .dramatiq import DramatiqConsumer, DramatiqMessage
from .publish import Item, Publish
from .service import CommitTask, Task
from .service import CommitModes, CommitTask, Task

__all__ = [
"Base",
Expand All @@ -12,4 +12,5 @@
"Publish",
"Task",
"CommitTask",
"CommitModes",
]
5 changes: 5 additions & 0 deletions exodus_gw/models/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from fastapi import HTTPException
from sqlalchemy import (
Boolean,
DateTime,
ForeignKey,
String,
Expand Down Expand Up @@ -102,6 +103,10 @@ class Item(Base):
object_key: Mapped[Optional[str]] = mapped_column(String)
content_type: Mapped[Optional[str]] = mapped_column(String)
link_to: Mapped[Optional[str]] = mapped_column(String)

dirty: Mapped[bool] = mapped_column(Boolean, default=True)
"""True if item still needs to be written to DynamoDB."""

publish_id: Mapped[str] = mapped_column(
Uuid(as_uuid=False), ForeignKey("publishes.id")
)
Expand Down
9 changes: 9 additions & 0 deletions exodus_gw/models/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from enum import Enum
from typing import Optional

from sqlalchemy import DateTime, ForeignKey, String, event
Expand All @@ -8,6 +9,11 @@
from .base import Base


class CommitModes(str, Enum):
phase1 = "phase1"
phase2 = "phase2"


class Task(Base):
__tablename__ = "tasks"
__mapper_args__ = {
Expand All @@ -30,6 +36,9 @@ class CommitTask(Task):

id: Mapped[str] = mapped_column(ForeignKey("tasks.id"), primary_key=True)
publish_id: Mapped[str] = mapped_column(Uuid(as_uuid=False))
commit_mode: Mapped[str] = mapped_column(
String, default=CommitModes.phase2
)


@event.listens_for(Task, "before_update")
Expand Down
132 changes: 106 additions & 26 deletions exodus_gw/routers/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
## Atomicity
exodus-gw aims to deliver atomic semantics for publishes; i.e., for a set
exodus-gw aims to enable atomic semantics for publishes; i.e., for a set
of published content, committing the publish will make either *all* of it
available (if commit succeeds) or *none* of it available (if commit fails),
with no partial updates becoming visible from the point of view of a CDN
Expand Down Expand Up @@ -36,17 +36,57 @@
with knowledge of the types of content being published. Files which serve
as an index or entry point to a set of content are committed last, to ensure
minimal impact in the case that a commit is interrupted.
- Example: if a publish includes yum repositories, exodus-gw will ensure that
repomd.xml files are always committed last - ensuring there is no possibility
that an interrupted commit would unveil a repomd.xml file referencing other
files which were not yet committed.
See "two-phase commit" below for a more in-depth explanation of this.
It should be noted that the atomicity discussed here applies only to the interaction
between exodus-gw and its underlying data store. exodus-gw does not contain any CDN
cache purging logic; the impact of CDN caching must also be considered when evaluating
the semantics of a publish from the CDN client's point of view.
## Two-phase commit
All published content is categorized into two phases, phase 1 and phase 2,
and committed in that order. exodus-gw performs this categorization internally
and clients cannot influence this.
Simple clients do not need to worry about this, but in more complicated scenarios
the client may wish to control the commit of each phase independently. In such
cases it is important to understand how the two phases are intended to work.
Phase 1 content:
- includes the majority of content within a publish
- should be immutable
- is usually not discoverable by CDN users without consulting some form of index
- examples: RPM files within a yum repo; any generic file
Phase 2 content:
- includes a small minority of content within a publish
- is usually mutable, perhaps changing at every publish
- contains indexes, repository entry points or other references pointing at
phase 1 content (and thus must be committed last)
- examples: `repodata/repomd.xml` within a yum repo; `PULP_MANIFEST` within a
Pulp file repository
As an example of this phased approach, consider the publish of a yum repository.
A client consuming packages from a yum repository discovers available packages
via a series of fetches involving multiple files which are published together,
e.g.
`repodata/repomd.xml` => `repodata/<checksum>-primary.xml.gz`
=> `Packages/<somepackage>.rpm`
If no ordering were to be applied to the publish of these files it would be
possible for `repomd.xml` to be published prior to `<checksum>-primary.xml.gz`,
or for `<checksum>-primary.xml.gz` to be published prior to
`Packages/<somepackage>.rpm`, either of which could cause a CDN consumer to
attempt to fetch content which has not yet been published, resulting in 404
errors.
This problem is avoided by exodus-gw internally categorizing `repomd.xml` as
phase 2 content and ensuring it is committed only after the rest of the files
in the repo, which are categorized as phase 1 content.
## Expiry of publish objects
Expand All @@ -72,7 +112,7 @@

import logging
from datetime import datetime, timedelta
from typing import Dict, List, Union
from typing import Dict, List, Optional, Union
from uuid import uuid4

from fastapi import APIRouter, Body, HTTPException, Query
Expand Down Expand Up @@ -202,8 +242,11 @@ def update_publish_items(
)

# Convert the list into dict and update each dict with a publish_id.
# Each item is also set 'dirty' to ensure it's written to DynamoDB,
# even if it was already written before.
items_data = [
{**item.model_dump(), "publish_id": db_publish.id} for item in items
{**item.model_dump(), "publish_id": db_publish.id, "dirty": True}
for item in items
]

LOG.debug(
Expand Down Expand Up @@ -242,19 +285,44 @@ def commit_publish(
deadline: Union[str, None] = Query(
default=None, examples=["2022-07-25T15:47:47Z"]
),
commit_mode: Optional[models.CommitModes] = Query(
default=None,
title="commit mode",
description="See: [Two-phase commit](#section/Two-phase-commit)",
examples=[models.CommitModes.phase1, models.CommitModes.phase2],
),
) -> models.CommitTask:
"""Commit an existing publish object.
**Required roles**: `{env}-publisher`
Committing a publish has the following effects:
Committing a publish is required in order to expose published content from the CDN.
There are two available commit modes, "phase1" and "phase2" (default).
### Phase 1
A phase 1 commit:
- is optional.
- can be performed more than once.
- does not prevent further modifications to the publish.
- will commit all phase 1 content (e.g. packages in yum repos), but not phase 2
content (e.g. repodata in yum repos); see
[Two-phase commit](#section/Two-phase-commit).
- is not rolled back if a later phase 2 commit fails (or never occurs).
### Phase 2
- If enabled by server settings, static HTML indexes will be generated and added
onto the publish for certain content types such as yum repositories.
- All URIs contained within the publish become accessible from the CDN,
pointing at their corresponding objects.
- This occurs with all-or-nothing semantics; see [Atomicity](#section/Atomicity).
- The publish object becomes frozen - no further items can be added.
A phase 2 commit:
- is the default when no commit mode is specified.
- can (and should) be performed exactly once.
- freezes the associated publish object - no further items can be added.
- will commit all content with near-atomic behavior; see
[Atomicity](#section/Atomicity).
### Notes
Commit occurs asynchronously. This API returns a Task object which may be used
to monitor the progress of the commit.
Expand All @@ -264,7 +332,7 @@ def commit_publish(
path are being committed concurrently, URIs on the CDN may end up pointing to
objects from any of those publishes.
"""

commit_mode_str = (commit_mode or models.CommitModes.phase2).value
now = datetime.utcnow()

if isinstance(deadline, str):
Expand Down Expand Up @@ -293,14 +361,20 @@ def commit_publish(
)

if db_publish.state != "PENDING":
# Check if there is already an associated task and, if so, return it rather than raise.
task = (
db.query(models.CommitTask)
.filter(models.CommitTask.publish_id == publish_id)
.first()
)
if task:
return task
if commit_mode_str == models.CommitModes.phase2:
# Phase 2 commit can only be done once, so asking to commit again is
# an error, but to make the API idempotent we check if there is
# already an associated task and return it if so.
task = (
db.query(models.CommitTask)
.filter(
models.CommitTask.publish_id == publish_id,
models.CommitTask.commit_mode == commit_mode_str,
)
.first()
)
if task:
return task

raise HTTPException(
status_code=409,
Expand All @@ -314,20 +388,26 @@ def commit_publish(
publish_id=str(db_publish.id),
env=env.name,
from_date=str(now),
commit_mode=commit_mode_str,
)

LOG.info(
"Enqueued commit for '%s'",
"Enqueued %s commit for '%s'",
commit_mode_str,
msg.kwargs["publish_id"],
extra={"event": "publish", "success": True},
)
db_publish.state = schemas.PublishStates.committing

# Only phase2 commit moves the publish into committing state.
if commit_mode_str == models.CommitModes.phase2:
db_publish.state = schemas.PublishStates.committing

task = models.CommitTask(
id=msg.message_id,
publish_id=msg.kwargs["publish_id"],
state="NOT_STARTED",
deadline=deadline_obj,
commit_mode=commit_mode,
)
db.add(task)

Expand Down
Loading

0 comments on commit 93a3255

Please sign in to comment.