Skip to content

Commit

Permalink
WIP: Prototype data load canvas
Browse files Browse the repository at this point in the history
Signed-off-by: Kairo de Araujo <kdearaujo@vmware.com>
  • Loading branch information
Kairo de Araujo committed Jan 23, 2023
1 parent 729ce3c commit 6e72836
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 66 deletions.
34 changes: 20 additions & 14 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,19 @@ class status(Enum):

@app.task(serializer="json", bind=True)
def repository_service_tuf_worker(
self, action: str, payload: Optional[Dict[str, Any]] = None
self,
action: str,
payload: Optional[Dict[str, Any]] = None,
refresh_settings: Optional[bool] = True,
):
"""
Repository Service for TUF Metadata Worker
"""
repository.refresh_settings(worker_settings)
if refresh_settings is True:
repository.refresh_settings(worker_settings)

repository_action = getattr(repository, action)

if payload is None:
result = repository_action()
else:
Expand Down Expand Up @@ -140,18 +146,18 @@ def task_received_notifier(**kwargs):
"acks_late": True,
},
},
"publish_targets": {
"task": "app.repository_service_tuf_worker",
"schedule": schedules.crontab(minute="*/1"),
"kwargs": {
"action": "publish_targets",
},
"options": {
"task_id": "publish_targets",
"queue": "rstuf_internals",
"acks_late": True,
},
},
# "publish_targets": {
# "task": "app.repository_service_tuf_worker",
# "schedule": schedules.crontab(minute="*/10"),
# "kwargs": {
# "action": "publish_targets",
# },
# "options": {
# "task_id": "publish_targets",
# "queue": "rstuf_internals",
# "acks_late": True,
# },
# },
}

repository = MetadataRepository.create_service()
1 change: 1 addition & 0 deletions repository_service_tuf_worker/models/targets/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def read_unpublished_rolenames(db: Session) -> Tuple[bool, str]:
.filter(
models.RSTUFTargets.published == False, # noqa
)
.order_by(models.RSTUFTargets.rolename)
.distinct()
.all()
)
Expand Down
136 changes: 85 additions & 51 deletions repository_service_tuf_worker/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import Any, Dict, List, Optional, Tuple

import redis
from celery import group
from celery.app.task import Task
from securesystemslib.exceptions import StorageError # type: ignore
from securesystemslib.signer import SSlibSigner # type: ignore
Expand Down Expand Up @@ -382,6 +383,7 @@ def _send_publish_targets_task(self, task_id: str): # pragma: no cover
kwargs={
"action": "publish_targets",
"payload": None,
"refresh_settings": False,
},
task_id=f"publish_targets-{task_id}",
queue="rstuf_internals",
Expand Down Expand Up @@ -427,66 +429,98 @@ def bootstrap(

return asdict(result)

def update_bin_role(
self, payload: Dict[str, str], update_state: Task.update_state
):
# load the delegated targets role, clean the targets and add
# a new meta from the SQL DB.
# note: it might include targets from another parent task, it
# will speed up the process of publishing new targets.
rolename = payload["rolename"]
logging.debug(f"starting update bin role {rolename}")
role = self._load(rolename)
role.signed.targets.clear()
role.signed.targets = {
target[0]: TargetFile.from_dict(target[1], target[0])
for target in targets_crud.read_all_add_by_rolename(
self._db, rolename
)
}

# update expiry, bump version and persist to the storage
self._bump_expiry(role, BINS)
self._bump_version(role)
self._sign(role, BINS)
self._persist(role, rolename)
logging.debug(f"{rolename} updated as version {role.signed.version}")

return (rolename, role.signed.version)

def publish_targets(self):
"""
Publish targets from SQL DB as persistent TUF Metadata in the backend
storage.
"""

# lock to avoid race conditions
with self._redis.lock("publish_targets", timeout=5.0):
# get all delegated role names with unpublished targets
unpublished_roles = targets_crud.read_unpublished_rolenames(
self._db
)
# with self._redis.lock("publish_targets", timeout=5.0):
# get all delegated role names with unpublished targets
unpublished_roles = targets_crud.read_unpublished_rolenames(self._db)

if len(unpublished_roles) == 0:
logging.debug("No new targets in delegated roles. Finishing")
return None

# initialize the new snapshot targets meta and published targets
# from DB SQL
new_snapshot_meta: List[Tuple(str, int)] = []
db_published_targets: List[targets_models.RSTUFTargets] = []
# for _, rolename in unpublished_roles:
# # get the unpublished targets for the delegated, it will be use
# # to update the database when Snapshot and Timestamp is
# # published by `_update_timestamp`
# db_targets = targets_crud.read_unpublished_by_rolename(
# db=self._db, rolename=rolename
# )
# logging.debug(f"{rolename}: New targets #: {len(db_targets)}")
# db_published_targets += db_targets

if len(unpublished_roles) == 0:
logging.debug("No new targets in delegated roles. Finishing")
return None

# initialize the new snapshot targets meta and published targets
# from DB SQL
new_snapshot_meta: List[Tuple(str, int)] = []
db_published_targets: List[targets_models.RSTUFTargets] = []
for _, rolename in unpublished_roles:
# get the unpublished targets for the delegated, it will be use
# to update the database when Snapshot and Timestamp is
# published by `_update_timestamp`
db_targets = targets_crud.read_unpublished_by_rolename(
db=self._db, rolename=rolename
# it is imported in the call to avoid a circular import
from app import repository_service_tuf_worker

logging.info("Grouping all update bin roles")
group_jobs = group(
repository_service_tuf_worker.s(
action="update_bin_role",
payload={"rolename": rolename},
refresh_settings=False,
)
logging.debug(f"{rolename}: New targets #: {len(db_targets)}")
db_published_targets += db_targets

# load the delegated targets role, clean the targets and add
# a new meta from the SQL DB.
# note: it might include targets from another parent task, it
# will speed up the process of publishing new targets.
role = self._load(rolename)
role.signed.targets.clear()
role.signed.targets = {
target[0]: TargetFile.from_dict(target[1], target[0])
for target in targets_crud.read_all_add_by_rolename(
self._db, rolename
)
}

# update expiry, bump version and persist to the storage
self._bump_expiry(role, BINS)
self._bump_version(role)
self._sign(role, BINS)
self._persist(role, rolename)
# append to the new snapshot targets meta
new_snapshot_meta.append((rolename, role.signed.version))

# update snapshop and timestamp
# note: the `db_published_targes` contains the targets that
# needs to updated in SQL DB as 'published' and it will be done
# by the `_update_timestamp`
self._update_timestamp(
self._update_snapshot(new_snapshot_meta),
db_published_targets,
)
for _, rolename in unpublished_roles
)
# group_jobs_run = group_jobs.apply_async(
# queue="rstuf_internals",
# acks_late=True,
# task_id="publish"
# )
logging.info("Running update bin roles celery group")
group_jobs_run = group_jobs.apply_async(queue="rstuf_internals")
while len(group_jobs_run) > 0:
for job in group_jobs_run.results:
if job.state == "SUCCESS":
new_snapshot_meta.append(job.result)
group_jobs_run.results.remove(job)
elif job.state == "FAILURE":
raise ValueError(job.result)
logging.info("Update bin roles celery group finished")

# update snapshop and timestamp
# note: the `db_published_targes` contains the targets that
# needs to updated in SQL DB as 'published' and it will be done
# by the `_update_timestamp`
self._update_timestamp(
self._update_snapshot(new_snapshot_meta),
db_published_targets,
)

def add_targets(
self, payload: Dict[str, Any], update_state: Task.update_state
Expand Down
2 changes: 1 addition & 1 deletion supervisor-dev.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ stdout_logfile = /dev/stdout
stdout_logfile_maxbytes = 0

[program:rstuf_worker_jobs]
command = celery -A app worker -B -l debug -Q rstuf_internals -n rstuf_jobs_%(ENV_RSTUF_WORKER_ID)s@dev
command = celery -A app worker -P threads -B -l debug -Q rstuf_internals -n rstuf_jobs_%(ENV_RSTUF_WORKER_ID)s@dev
directory = %(here)s
startsecs = 5
autostart = true
Expand Down

0 comments on commit 6e72836

Please sign in to comment.