Skip to content

Commit

Permalink
Merge pull request #184 from nathanegillett/task-status
Browse files Browse the repository at this point in the history
Add get_task endpoint [RHELDST-2162]
  • Loading branch information
negillett committed Feb 12, 2021
2 parents 3443d2d + 67efe9f commit 8412c1d
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 95 deletions.
2 changes: 2 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,5 @@ disable=print-statement,
# we need to import dramatiq actors to the worker after broker
# is set
wrong-import-position,
# pylint catches similar nested closing brackets from multiple files
duplicate-code,
36 changes: 29 additions & 7 deletions examples/exodus-sync
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ from concurrent.futures import ThreadPoolExecutor
from functools import partial
from urllib.parse import urljoin

import backoff
import boto3
import boto3.session
import requests
Expand Down Expand Up @@ -131,6 +132,25 @@ def upload_items(args, items):
)


@backoff.on_predicate(
wait_gen=backoff.expo,
predicate=lambda task: task["state"] != "COMPLETE",
max_time=120,
)
def poll_commit_completion(gw_url, commit):
session = requests.Session()

task_url = urljoin(gw_url, commit["links"]["self"])

r = session.get(task_url)
r.raise_for_status()
task = r.json()

print("Task state: {}".format(task["state"]))

return task


def publish_items(args, items):
# Publish all the items which have previously been uploaded. This
# will make the items downloadable from the CDN via exodus-lambda,
Expand All @@ -141,11 +161,11 @@ def publish_items(args, items):
# support certificate-based auth.
session = requests.Session()

response = session.post(
r = session.post(
os.path.join(args.exodus_gw_url, args.env, "publish")
)
response.raise_for_status()
publish = response.json()
r.raise_for_status()
publish = r.json()

print("Created publish {}".format(publish))

Expand Down Expand Up @@ -177,12 +197,14 @@ def publish_items(args, items):

r = session.post(commit_url)
r.raise_for_status()
commit = r.json()

print("Started commit of publish: {}".format(commit))

# TODO: committing is expected to be moved into a background task, at which
# point we expect to be given some kind of task object here, which we should
# poll to completion.
print("Polling for commit completion. . .")
task = poll_commit_completion(args.exodus_gw_url, commit)

print("Started commit of publish: {}".format(r.json()))
print("Publish complete: {}".format(task))


def main():
Expand Down
34 changes: 34 additions & 0 deletions exodus_gw/migrations/versions/cd561983acb2_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Add tasks table
Revision ID: cd561983acb2
Revises: 0c60e1b25e03
Create Date: 2021-02-08 18:08:31.508678
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "cd561983acb2"
down_revision = "0c60e1b25e03"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"tasks",
sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False),
sa.Column("state", sa.String(), nullable=False),
sa.Column("publish_id", postgresql.UUID(as_uuid=True), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("tasks")
# ### end Alembic commands ###
9 changes: 9 additions & 0 deletions exodus_gw/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def aws_fmt(self):
}


class Task(Base):

__tablename__ = "tasks"

id = Column(UUID(as_uuid=True), primary_key=True)
publish_id = Column(UUID(as_uuid=True), nullable=False)
state = Column(String)


###############################################################################
# Make some postgres dialect compatible with sqlite, for use within tests.

Expand Down
15 changes: 11 additions & 4 deletions exodus_gw/routers/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ async def update_publish_items(
@router.post(
"/{env}/publish/{publish_id}/commit",
status_code=200,
response_model=schemas.EmptyResponse,
response_model=schemas.Task,
)
def commit_publish(
publish_id: UUID = schemas.PathPublishId,
env: Environment = deps.env,
db: Session = deps.db,
settings: Settings = deps.settings,
) -> Dict[str, str]:
) -> models.Task:
"""Commit an existing publish object.
Committing a publish has the following effects:
Expand All @@ -167,7 +167,7 @@ def commit_publish(
- This occurs with all-or-nothing semantics; see [Atomicity](#section/Atomicity).
- The publish object becomes frozen - no further items can be added.
Commit occurs asynchronously. This API returns a message ID which may be used
Commit occurs asynchronously. This API returns a Task object which may be used
to monitor the progress of the commit.
Note that exodus-gw does not resolve conflicts or ensure that any given path is
Expand All @@ -179,4 +179,11 @@ def commit_publish(
msg = worker.commit.send(publish_id=str(publish_id), env=env.name)
LOG.info("Enqueued commit for '%s'", msg.kwargs["publish_id"])

return {"commit_message_id": msg.message_id}
task = models.Task(
id=msg.message_id,
publish_id=msg.kwargs["publish_id"],
state="NOT_STARTED",
)
db.add(task)

return task
22 changes: 20 additions & 2 deletions exodus_gw/routers/service.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""APIs for inspecting the state of the exodus-gw service."""

import logging
from uuid import UUID

from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from sqlalchemy.orm import Session

from .. import deps, schemas, worker
from .. import deps, models, schemas, worker
from ..auth import CallContext

LOG = logging.getLogger("exodus-gw")
Expand Down Expand Up @@ -80,3 +81,20 @@ def whoami(context: CallContext = deps.call_context):
It is a read-only endpoint intended for diagnosing authentication issues.
"""
return context


@router.get(
"/task/{task_id}",
response_model=schemas.Task,
responses={200: {"description": "Sucessfully retrieved task"}},
)
def get_task(
task_id: UUID = schemas.PathTaskId, db: Session = deps.db
) -> schemas.Task:
"""Return existing task object from database using given task ID."""
task = db.query(models.Task).filter(models.Task.id == task_id).first()

if not task:
raise HTTPException(404, detail="No task found for ID '%s'" % task_id)

return task
31 changes: 31 additions & 0 deletions exodus_gw/schemas.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
from os.path import join
from typing import Dict, List
from uuid import UUID
Expand All @@ -11,6 +12,10 @@
description="UUID of an existing publish object.",
)

PathTaskId = Path(
..., title="task ID", description="UUID of an existing task object."
)


class ItemBase(BaseModel):
web_uri: str = Field(
Expand Down Expand Up @@ -64,6 +69,32 @@ class Config:
orm_mode = True


class TaskStates(str, Enum):
not_started = "NOT_STARTED"
in_progress = "IN_PROGRESS"
complete = "COMPLETE"


class Task(BaseModel):
id: UUID = Field(..., description="Unique ID of task object.")
publish_id: UUID = Field(
..., description="Unique ID of publish object handled by this task."
)
state: TaskStates = Field(..., description="Current state of this task.")
links: Dict[str, str] = Field(
{}, description="""URL links related to this task."""
)

@root_validator
@classmethod
def make_links(cls, values):
values["links"] = {"self": join("/task", str(values["id"]))}
return values

class Config:
orm_mode = True


class MessageResponse(BaseModel):
detail: str = Field(
..., description="A human-readable message with additional info."
Expand Down
5 changes: 4 additions & 1 deletion exodus_gw/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import dramatiq
from dramatiq.middleware import CurrentMessage

from .broker import new_broker

dramatiq.set_broker(new_broker())
broker = new_broker()
broker.add_middleware(CurrentMessage())
dramatiq.set_broker(broker)

from .publish import commit # noqa

Expand Down
26 changes: 26 additions & 0 deletions exodus_gw/worker/publish.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
import logging
from os.path import basename

import dramatiq
from dramatiq.middleware import CurrentMessage
from sqlalchemy.orm import Session

from exodus_gw import models, schemas
from exodus_gw.aws.dynamodb import write_batches
from exodus_gw.crud import get_publish_by_id
from exodus_gw.database import db_engine
from exodus_gw.settings import Settings

LOG = logging.getLogger("exodus-gw")


@dramatiq.actor
def commit(publish_id: str, env: str):
settings = Settings()
db = Session(bind=db_engine(settings))
current_message_id = CurrentMessage.get_current_message().message_id
task = (
db.query(models.Task)
.filter(models.Task.id == current_message_id)
.first()
)

if task.state == "COMPLETE":
LOG.warning(
"Task %s already in completed state\nAborting commit",
task.id,
)
return

items = []
last_items = []
Expand All @@ -26,6 +44,10 @@ def commit(publish_id: str, env: str):
items_written = False
last_items_written = False

# Change task state to IN_PROGRESS.
task.state = schemas.TaskStates.in_progress
db.commit()

if items:
items_written = write_batches(env, items)

Expand All @@ -39,3 +61,7 @@ def commit(publish_id: str, env: str):
if not last_items_written:
# Delete everything if failed to write last_items.
write_batches(env, items + last_items, delete=True)

# Change task state to COMPLETE.
task.state = schemas.TaskStates.complete
db.commit()
5 changes: 3 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import uuid

import dramatiq
import mock
Expand Down Expand Up @@ -105,8 +106,8 @@ def db():

@pytest.fixture()
def mock_publish(mock_item_list):
publish = models.Publish()
publish.id = "123e4567-e89b-12d3-a456-426614174000"
publish = models.Publish(env="test")
publish.id = uuid.UUID("123e4567-e89b-12d3-a456-426614174000")
publish.items = [
models.Item(**item.dict(), publish_id=publish.id)
for item in mock_item_list
Expand Down
45 changes: 45 additions & 0 deletions tests/routers/test_service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import mock
from fastapi.testclient import TestClient

from exodus_gw import models
from exodus_gw.main import app
from exodus_gw.routers import service


Expand All @@ -21,3 +24,45 @@ def test_whoami():
# do anything except return the passed object.
context = object()
assert service.whoami(context=context) is context


def test_get_task(db):
"""The endpoint is able to retrieve task objects stored in DB."""

publish_id = "48c67d99-5dd6-4939-ad1c-072639eee35a"
task_id = "8d8a4692-c89b-4b57-840f-b3f0166148d2"

task = models.Task(
id=task_id,
publish_id=publish_id,
state="NOT_STARTED",
)

# Use TestClient to set up the test DB.
with TestClient(app) as client:
# Add a task object to the DB.
db.add(task)
db.commit()

# Update the task's state.
db.refresh(task)
task.state = "COMPLETE"
db.commit()

# Try to look up an invalid ID.
resp = client.get("/task/%s" % publish_id)

assert resp.status_code == 404
assert "No task found" in str(resp.content)

# Try to look up a valid ID.
resp = client.get("/task/%s" % task_id)

# Last request should have succeeded and returned the correct object.
assert resp.ok
assert resp.json() == {
"id": "8d8a4692-c89b-4b57-840f-b3f0166148d2",
"state": "COMPLETE",
"publish_id": "48c67d99-5dd6-4939-ad1c-072639eee35a",
"links": {"self": "/task/8d8a4692-c89b-4b57-840f-b3f0166148d2"},
}
Loading

0 comments on commit 8412c1d

Please sign in to comment.