Skip to content

Commit

Permalink
feat(io/default): Try to clean up during idle updates (#182)
Browse files Browse the repository at this point in the history
This adds an attempt to clean up a DefaultIONode during an
idle update by:
 * looking for `.placeholder` files and deleting them
 * attempting to remove acq directories

Because this routine runs when the node is idle (i.e. only
when there's no other I/O occurring), no placeholders should be
on the node.  Any which are found are clearly spurious due to
prior crashes.

I've also implemented a check to reduce how often it runs.
It will always run at start-up (when I suspect most uncleanliness
would be found), and then once every 100 times the node transitions
from not-idle to idle.

While implementing this, I discovered that the code that was
deleting acq dirs wasn't stopping at the StorageNode.root, meaning
there was a potential to delete the node directory itself (plus anything
above that)!

In practice, on DefaultIO nodes, this couldn't happen because
all such nodes have a `ALPENHORN_NODE` file at the top level, but
that's not necessarily true for other IO classes which still use the
DefaultIO's delete function (for example, the LustreHSM I/O class).

I've fixed this bug while moving the directory deletion code from
the delete_async into its own function in `ioutil` because the
cleanup task is now also using it.

Also, removed submitting an unnecessary job which was deleting zero file
copies.
  • Loading branch information
ketiltrout committed Jun 17, 2024
1 parent 5a624f2 commit 742165e
Show file tree
Hide file tree
Showing 10 changed files with 290 additions and 34 deletions.
23 changes: 1 addition & 22 deletions alpenhorn/io/_default_asyncs.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,28 +283,7 @@ def delete_async(

# Check if any containing directory is now empty
# and remove if they are.
dirname = fullpath.parent

# try to delete the directories. This must be done while locking down the tree lock
with tree_lock.down:
while dirname != ".":
try:
dirname.rmdir()
log.info(f"Removed directory {dirname} on {name}")
except OSError as e:
if e.errno == errno.ENOTEMPTY:
# This is fine, but stop trying to rmdir.
break
elif e.errno == errno.ENOENT:
# Already deleted, which is fine.
pass
else:
log.warning(
f"Error deleting directory {dirname} on {name}: {e}"
)
# Otherwise, let's try to soldier on

dirname = dirname.parent
ioutil.remove_filedir(copy.node, fullpath.parent, tree_lock)

# Update the DB
ArchiveFileCopy.update(
Expand Down
8 changes: 7 additions & 1 deletion alpenhorn/io/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def before_update(self, idle: bool) -> bool:
# By default, we do nothing and allow the update to continue
return True

def idle_update(self) -> None:
def idle_update(self, newly_idle: bool) -> None:
"""Idle update hook.
Called after a regular update that wasn't skipped, but only if,
Expand All @@ -199,6 +199,12 @@ def idle_update(self) -> None:
This is the place to put low-priority tasks that should only happen
if no other I/O is happening on the node.
Parameters
----------
newly_idle : bool
True if this is the first time idle_update has been called since
some I/O happened.
"""
# By default do nothing.
pass
Expand Down
64 changes: 63 additions & 1 deletion alpenhorn/io/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@
import threading
from watchdog.observers import Observer

from . import ioutil
from .base import BaseNodeIO, BaseGroupIO, BaseNodeRemote
from .updownlock import UpDownLock
from .. import util
from ..acquisition import ArchiveAcq, ArchiveFile
from ..task import Task

# The asyncs are over here:
from ._default_asyncs import pull_async, check_async, delete_async

if TYPE_CHECKING:
from collections.abc import Iterator
from ..acquisition import ArchiveFile
from ..archive import ArchiveFileCopy, ArchiveFileCopyRequest
from ..queue import FairMultiFIFOQueue
from ..storage import StorageNode
Expand All @@ -40,6 +41,10 @@
_mutex = threading.Lock()
_reserved_bytes = dict()

# This sets how often we run the clean-up idle task. What
# we're counting here is number of not-idle -> idle transitions
_IDLE_CLEANUP_PERIOD = 100 # (i.e. once every 100 opportunities)


class DefaultNodeRemote(BaseNodeRemote):
"""I/O class for a remote DefaultIO StorageNode."""
Expand Down Expand Up @@ -79,10 +84,67 @@ def __init__(
# The directory tree modification lock
self.tree_lock = UpDownLock()

# When <= 1, the idle clean-up is allowed to run; we initialise
# to zero to run it as soon as possible after start-up
self._skip_idle_cleanup = 0

# Set up a reservation for ourself if necessary
with _mutex:
_reserved_bytes.setdefault(node.name, 0)

# HOOKS

def idle_update(self, newly_idle: bool) -> None:
"""Idle update hook.
Called after a regular update that wasn't skipped, but only if,
after the regular update, there were no tasks pending or in
progress for this node (i.e. `self.idle` is True).
This will try to do some tidying-up: look for stale placeholders,
and attempt to delete empty acqdirs, whenever newly_idle is True.
Parameters
----------
newly_idle : bool
True if this is the first time idle_update has been called since
some I/O happened.
"""

# Task to do some cleanup
def _async(task, node, tree_lock):
# Loop over all acqs
for acq in ArchiveAcq.select():
# Only continue if the directory for this acquisition exists
acqpath = pathlib.Path(node.root, acq.name)
if acqpath.is_dir():
# Look for placeholders
for file_ in ArchiveFile.select().where(ArchiveFile.acq == acq):
placeholder = acqpath.joinpath(f".{file_.name}.placeholder")
if placeholder.exists():
log.warning(f"Removing stale placeholder {placeholder!s}")
placeholder.unlink()
# Attempt to remove acqpath. If it isn't empty, this does nothing
ioutil.remove_filedir(
node,
pathlib.Path(node.root, acq.name),
tree_lock,
)

# Submit the task only after doing some I/O
if newly_idle:
if self._skip_idle_cleanup <= 1:
self._skip_idle_cleanup = _IDLE_CLEANUP_PERIOD
Task(
func=_async,
queue=self._queue,
key=self.node.name,
args=(self.node, self.tree_lock),
name=f"Tidy up {self.node.name}",
)
else:
self._skip_idle_cleanup = self._skip_idle_cleanup - 1

# I/O METHODS

def bytes_avail(self, fast: bool = False) -> int | None:
Expand Down
61 changes: 61 additions & 0 deletions alpenhorn/io/ioutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import re
import time
import errno
import pathlib
import peewee as pw
from datetime import datetime
Expand All @@ -18,6 +19,7 @@
if TYPE_CHECKING:
import os
from .base import BaseNodeIO
from .updownlock import UpDownLock
from ..acquisition import ArchiveFile

import logging
Expand Down Expand Up @@ -489,3 +491,62 @@ def copy_request_done(
post_add(io.node, req.file)

return True


def remove_filedir(
node: StorageNode, dirname: pathlib.Path, tree_lock: UpDownLock
) -> None:
"""Try to delete an file's parenty directorie(s) from a node
Will attempt to remove the enitre tree given as `dirname`
while holding the `tree_lock` down until reaching `node.root`.
Blocks until the lock can be acquired.
The attempt to delete starts at `acq.name` and walks upwards until
it runs out of path elements in `acq.name`.
As soon as a non-empty directory is encountered, the attempt stops
without raising an error.
If `acq.name` is missing, or partially missing, that is not an error
either, but an attempt to delete the part remaining will still be
attempted.
Parameters
----------
node: StorageNode
The node to delete the acq directory from.
dirname: pathlib.Path
The path to delete. Must be absolute and rooted at `node.root`.
tree_lock: UpDownLock
This function will block until it can acquire the down lock and
all I/O will happen while holding the lock down.
Raises
------
ValueError
`dirname` was not a subdirectory of `node.root`
"""
# Sanity check
if not dirname.is_relative_to(node.root):
raise ValueError(f"dirname {dirname} not rooted under {node.root}")

# try to delete the directories. This must be done while locking down the tree lock
with tree_lock.down:
while str(dirname) != node.root:
try:
dirname.rmdir()
log.info(f"Removed directory {dirname} on {node.name}")
except OSError as e:
log.debug(f"Failed to remove directory {dirname} on {node.name}: {e}")
if e.errno == errno.ENOTEMPTY:
# This is fine, but stop trying to rmdir.
break
elif e.errno == errno.ENOENT:
# Already deleted, which is fine.
pass
else:
log.warning(f"Error deleting directory {dirname} on {name}: {e}")
# Otherwise, let's try to soldier on

dirname = dirname.parent
5 changes: 4 additions & 1 deletion alpenhorn/io/lustrehsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def before_update(self, idle: bool) -> bool:
# Continue with the update
return True

def idle_update(self) -> None:
def idle_update(self, newly_idle) -> None:
"""Update HSM state of copies when idle.
If the node is idle after an update, double check the HSM state
Expand All @@ -198,6 +198,9 @@ def idle_update(self) -> None:
the alpenhornd daemon can properly manage free space.
"""

# Run DefautlIO idle checks
super().idle_update(newly_idle)

# Check the query walker. Initialised if necessary.
if self._release_qw is None:
try:
Expand Down
19 changes: 17 additions & 2 deletions alpenhorn/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,13 @@ def __init__(self, queue: FairMultiFIFOQueue, node: StorageNode) -> None:
self._queue = queue
self._updated = False

# Set to True whenever I/O tasks are started.
# Set to False whenever idle updates happened.
#
# Used to let the idle_update hooks know whether this is the
# first idle update to happen after some I/O or not
self._io_happened = True

# Set in reinit()
self.db = None
self.reinit(node)
Expand Down Expand Up @@ -338,6 +345,7 @@ def run_auto_verify(self) -> None:
f" {self.name}."
)

self._io_happened = True
self.io.auto_verify(copy)

def update_idle(self) -> None:
Expand All @@ -348,7 +356,9 @@ def update_idle(self) -> None:
"""
if self._updated and self.idle:
# Do any I/O class idle updates
self.io.idle_update()
self.io.idle_update(self._io_happened)

self._io_happened = False

# Run auto-verify, if requested
if self.db.auto_verify > 0:
Expand Down Expand Up @@ -404,13 +414,16 @@ def update_delete(self) -> None:
# Group a bunch of these together to reduce the number of I/O Tasks
# created. TODO: figure out if this actually helps
if len(del_copies) >= 10:
self._io_happened = True
self.io.delete(del_copies)
del_copies = [copy]
else:
del_copies.append(copy)

# Handle the partial group at the end (which may be empty)
self.io.delete(del_copies)
if len(del_copies) > 0:
self._io_happened = True
self.io.delete(del_copies)

def update(self) -> None:
"""Perform I/O updates on this node.
Expand Down Expand Up @@ -447,6 +460,7 @@ def update(self) -> None:
)

# Dispatch integrity check to I/O layer
self._io_happened = True
self.io.check(copy)

# Delete any unwanted files to cleanup space
Expand All @@ -459,6 +473,7 @@ def update(self) -> None:
ArchiveFileCopyRequest.node_from == self.db,
):
if self.db.filecopy_present(req.file):
self._io_happened = True
self.io.ready_pull(req)
else:
log.info(
Expand Down
65 changes: 65 additions & 0 deletions tests/io/test_defaultnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,68 @@ def test_reserve_bytes(unode, xfs):

# Release the rest
unode.io.release_bytes(2000)


def test_idle_cleanup(unode, archiveacq, archivefile, queue, xfs):
"""test clean-up code in DefaultNodeIO.idle_update"""

# Node root
root = unode.db.root

# Create some acqs
empty_acq = archiveacq(name="empty")
dirty_acq = archiveacq(name="dirty")
full_acq = archiveacq(name="full")

# Create some files
empty_file = archivefile(name="empty", acq=empty_acq)
dirty_file = archivefile(name="dirty", acq=dirty_acq)
full_file = archivefile(name="full", acq=full_acq)

# Popluate the node filesystem
xfs.create_dir(f"{root}/empty")
xfs.create_file(f"{root}/dirty/.dirty.placeholder")
xfs.create_file(f"{root}/full/.full.placeholder")
xfs.create_file(f"{root}/full/full")

# Run the idle update
unode.io.idle_update(True)

# Now there's something in the queue
assert queue.qsize == 1

# Run the task
task, fifo = queue.get()
task()
queue.task_done(fifo)

# empty acq has been deleted
assert not pathlib.Path(f"{root}/empty").exists()

# dirty acq has been cleaned and deleted
assert not pathlib.Path(f"{root}/firty").exists()

# full acq has been cleaned
assert not pathlib.Path(f"{root}/.full.placeholder").exists()
assert pathlib.Path(f"{root}/full").exists()


def test_idle_cleanup_rate(unode, queue):
"""Test rate limiting of the cleanup task."""

# This is how often the task should be queued
from alpenhorn.io.default import _IDLE_CLEANUP_PERIOD

# Run the idle update a few times with newly_idle true
# The task is queued the first time, and then once
# every _IDLE_CLEANUP_PERIOD times after that.
for _ in range(1 + _IDLE_CLEANUP_PERIOD * 5):
unode.io.idle_update(True)

assert queue.qsize == 6

# This shouldn't add any more jobs
for _ in range(1 + _IDLE_CLEANUP_PERIOD * 5):
unode.io.idle_update(False)

assert queue.qsize == 6
Loading

0 comments on commit 742165e

Please sign in to comment.