Skip to content

Commit

Permalink
Merge pull request #216 from rohanpm/scheduled-cleanup
Browse files Browse the repository at this point in the history
Implement scheduled cleanup task [RHELDST-4843]
  • Loading branch information
rohanpm committed Mar 1, 2021
2 parents 9d04807 + 209dfc4 commit d600800
Show file tree
Hide file tree
Showing 12 changed files with 529 additions and 227 deletions.
4 changes: 3 additions & 1 deletion exodus_gw/models/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ class Publish(Base):
env = Column(String, nullable=False)
state = Column(String, nullable=False)
updated = Column(DateTime(timezone=True))
items = relationship("Item", back_populates="publish")
items = relationship(
"Item", back_populates="publish", cascade="all, delete-orphan"
)


@event.listens_for(Publish, "before_update")
Expand Down
8 changes: 8 additions & 0 deletions exodus_gw/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class PublishStates(str, Enum):
committed = "COMMITTED"
failed = "FAILED"

@classmethod
def terminal(cls) -> List["PublishStates"]:
return [cls.committed, cls.failed]


class PublishBase(BaseModel):
id: UUID = Field(..., description="Unique ID of publish object.")
Expand Down Expand Up @@ -88,6 +92,10 @@ class TaskStates(str, Enum):
complete = "COMPLETE"
failed = "FAILED"

@classmethod
def terminal(cls) -> List["TaskStates"]:
return [cls.failed, cls.complete]


class Task(BaseModel):
id: UUID = Field(..., description="Unique ID of task object.")
Expand Down
11 changes: 11 additions & 0 deletions exodus_gw/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ class Settings(BaseSettings):
max_tries: int = 20
"""Maximum attempts to write to DynamoDB table."""

publish_timeout: int = 24
"""Maximum amount of time (in hours) between updates to a pending publish before
it will be considered abandoned. Defaults to one day.
"""

history_timeout: int = 24 * 14
"""Maximum amount of time (in hours) to retain historical data for publishes and
tasks. Publishes and tasks in a terminal state will be erased after this time has
passed. Defaults to two weeks.
"""

actor_time_limit: int = 30 * 60000
"""Maximum amount of time (in milliseconds) actors may run."""

Expand Down
102 changes: 95 additions & 7 deletions exodus_gw/worker/scheduled.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,104 @@
import logging
from datetime import datetime, timedelta, timezone

import dramatiq
from sqlalchemy.orm import Session

from exodus_gw.database import db_engine
from exodus_gw.models import Publish, Task
from exodus_gw.schemas import PublishStates, TaskStates
from exodus_gw.settings import Settings

LOG = logging.getLogger("exodus-gw")


class Janitor:
def __init__(self):
self.settings = Settings()
self.db = Session(bind=db_engine(self.settings))
self.now = datetime.now(timezone.utc)

def run(self):
self.fix_timestamps()
self.fix_abandoned()
self.clean_old_data()

self.db.commit()

LOG.info("Scheduled cleanup has completed")

def fix_timestamps(self):
# Fill in missing timestamps on any data.
#
# Timestamps are nullable. If we aren't sure the real
# updated timestamp on a particular object, we'll just
# pretend it was updated right now.
for klass in [Task, Publish]:
for instance in self.db.query(klass).filter(klass.updated == None):
LOG.warning(
"%s %s: setting updated",
klass.__name__,
instance.id,
)
instance.updated = self.now

def fix_abandoned(self):
# Find any publishes and tasks which appear to be abandoned (i.e.
# they did not complete and have not been updated for a long time)
# and mark them as failed.
#
# This covers scenarios:
#
# - a client created a publish, then crashed before committing it.
#
# - an internal error in exodus-gw somehow prevented a task from being
# executed and also prevented marking the task as failed, such as
# an extended outage from the DB.
#
hours = self.settings.publish_timeout
threshold = self.now - timedelta(hours=hours)

for klass, states in [(Task, TaskStates), (Publish, PublishStates)]:
for instance in self.db.query(klass).filter(
# Anything old enough...
klass.updated < threshold,
# And also not in a terminal state...
~klass.state.in_(states.terminal()),
):
LOG.warning(
"%s %s: marking as failed (last updated: %s)",
klass.__name__,
instance.id,
instance.updated,
)
instance.state = states.failed

def clean_old_data(self):
# Find any objects of transient types in terminal states which have not
# been updated for the configured period of time and delete them.
#
# This helps enforce the design that exodus-gw contains no persistent
# state.
hours = self.settings.history_timeout
threshold = self.now - timedelta(hours=hours)

for klass, states in [(Task, TaskStates), (Publish, PublishStates)]:
for instance in self.db.query(klass).filter(
# Anything old enough...
klass.updated < threshold,
# And also in a terminal state so there will be no further updates...
klass.state.in_(states.terminal()),
):
LOG.info(
"%s %s: cleaning old data (last updated: %s)",
klass.__name__,
instance.id,
instance.updated,
)
self.db.delete(instance)


@dramatiq.actor(scheduled=True)
def cleanup():
# TODO: implement me.
#
# We must first implement:
# - timestamps on task and publish objects
# - state on publish objects
#
LOG.warning("Would do cleanup now")
janitor = Janitor()
janitor.run()
23 changes: 21 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import mock
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm.session import Session

from exodus_gw import database, main, models, schemas, settings # noqa
Expand Down Expand Up @@ -70,8 +71,12 @@ def sqlite_in_tests(monkeypatch):


@pytest.fixture()
def db():
"""Yields a real DB session configured using current settings."""
def unmigrated_db():
"""Yields a real DB session configured using current settings.
Note that this DB is likely to be empty. In the more common case that
a test wants a DB with all tables in place, use 'db' instead.
"""

session = Session(bind=database.db_engine(settings.Settings()))
try:
Expand All @@ -80,6 +85,20 @@ def db():
session.close()


@pytest.fixture()
def db(unmigrated_db):
"""Yields a real DB session configured using current settings.
This session has the schema deployed prior to yielding, so the
test may assume all tables are already in place.
"""

with TestClient(main.app):
pass

return unmigrated_db


@pytest.fixture(autouse=True, scope="session")
def db_session_block_detector():
"""Wrap DB sessions created by the app with an object to detect
Expand Down
Loading

0 comments on commit d600800

Please sign in to comment.