Skip to content

Commit

Permalink
delete s3fs: delete files from cache when deleted from bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
dead10ck committed Dec 7, 2023
1 parent d436773 commit 96794a5
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 24 deletions.
93 changes: 79 additions & 14 deletions salt/fileserver/s3fs.py
Expand Up @@ -127,6 +127,7 @@ def update():
cached_file_path = _get_cached_file_name(
bucket, saltenv, file_path
)

log.debug("%s - %s : %s", bucket, saltenv, file_path)

# load the file from S3 if it's not in the cache or it's old
Expand Down Expand Up @@ -348,6 +349,7 @@ def _init():

# check mtime of the buckets files cache
metadata = None

try:
if os.path.getmtime(cache_file) > exp:
metadata = _read_buckets_cache_file(cache_file)
Expand All @@ -358,6 +360,8 @@ def _init():
# bucket files cache expired or does not exist
metadata = _refresh_buckets_cache_file(cache_file)

_prune_deleted_files(metadata)

return metadata


Expand All @@ -366,7 +370,6 @@ def _get_cache_dir():
Return the path to the s3cache dir
"""

# Or is that making too many assumptions?
return os.path.join(__opts__["cachedir"], "s3cache")


Expand All @@ -375,26 +378,15 @@ def _get_cached_file_name(bucket_name, saltenv, path):
Return the cached file name for a bucket path file
"""

file_path = os.path.join(_get_cache_dir(), saltenv, bucket_name, path)

# make sure bucket and saltenv directories exist
if not os.path.exists(os.path.dirname(file_path)):
os.makedirs(os.path.dirname(file_path))

return file_path
return os.path.join(_get_cache_dir(), saltenv, bucket_name, path)


def _get_buckets_cache_filename():
"""
Return the filename of the cache for bucket contents.
Create the path if it does not exist.
"""

cache_dir = _get_cache_dir()
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

return os.path.join(cache_dir, "buckets_files.cache")
return os.path.join(_get_cache_dir(), "buckets_files.cache")


def _refresh_buckets_cache_file(cache_file):
Expand All @@ -415,6 +407,7 @@ def _refresh_buckets_cache_file(cache_file):
path_style,
https_enable,
) = _get_s3_key()

metadata = {}

# helper s3 query function
Expand Down Expand Up @@ -563,10 +556,71 @@ def __get_s3_meta(bucket, key=key, keyid=keyid):
return metadata


def _prune_deleted_files(metadata):
cache_dir = _get_cache_dir()
cached_files = set()
roots = set()

if _is_env_per_bucket():
for env, env_data in metadata.items():
for bucket_meta in env_data:
for bucket, bucket_data in bucket_meta.items():
root = os.path.join(cache_dir, env, bucket)

if os.path.exists(root):
roots.add(root)

for meta in bucket_data:
path = meta["Key"]
cached_files.add(path)

else:
for env, env_data in metadata.items():
for bucket in _get_buckets():
root = os.path.join(cache_dir, bucket)

if os.path.exists(root):
roots.add(root)

for meta in env_data:
cached_files.add(meta["Key"])

if log.isEnabledFor(logging.DEBUG):
import pprint

log.debug(f"cached file list: {pprint.pformat(cached_files)}")

for root in roots:
for base, dirs, files in os.walk(root):
for file_name in files:
path = os.path.join(base, file_name)
relpath = os.path.relpath(path, root)

if relpath not in cached_files:
log.debug(f"file '{path}' not found in cached file list")
log.info(
f"file '{relpath}' was deleted from bucket, deleting local copy"
)

os.unlink(path)
dir = os.path.dirname(path)

# delete empty dirs all the way up to the cache dir
while dir != cache_dir and len(os.listdir(dir)) == 0:
log.debug(f"directory '{dir}' is now empty, removing")
os.rmdir(dir)
dir = os.path.dirname(dir)


def _write_buckets_cache_file(metadata, cache_file):
"""
Write the contents of the buckets cache file
"""
cache_dir = _get_cache_dir()

if not os.path.exists(cache_dir):
os.makedirs(cache_dir)

if os.path.isfile(cache_file):
os.remove(cache_file)

Expand All @@ -583,6 +637,10 @@ def _read_buckets_cache_file(cache_file):

log.debug("Reading buckets cache file")

if not os.path.exists(cache_file):
log.debug("Cache file does not exist")
return None

with salt.utils.files.fopen(cache_file, "rb") as fp_:
try:
data = pickle.load(fp_)
Expand Down Expand Up @@ -690,6 +748,13 @@ def _get_file_from_s3(metadata, saltenv, bucket_name, path, cached_file_path):
Checks the local cache for the file, if it's old or missing go grab the
file from S3 and update the cache
"""

# make sure bucket and saltenv directories exist
target_dir = os.path.dirname(cached_file_path)

if not os.path.exists(target_dir):
os.makedirs(target_dir)

(
key,
keyid,
Expand Down
43 changes: 33 additions & 10 deletions tests/pytests/unit/fileserver/test_s3fs.py
Expand Up @@ -89,6 +89,7 @@ def test_update(bucket, s3):
"top.sls": {"content": yaml.dump({"base": {"*": ["foo"]}})},
"foo.sls": {"content": yaml.dump({"nginx": {"pkg.installed": []}})},
"files/nginx.conf": {"content": "server {}"},
"files/conf.d/foo.conf": {"content": "server {}"},
}

conn = s3()
Expand All @@ -102,17 +103,40 @@ def test_update(bucket, s3):
s3fs.update()
verify_cache(bucket, keys)

# TODO: fix
# verify that when files get deleted from s3, they also get deleted in
# the local cache
delete_file = "files/nginx.conf"
del keys[delete_file]
conn.delete_object(Bucket=bucket, Key=delete_file)

# delete_file = "files/nginx.conf"
# del keys[delete_file]
# conn.delete_object(Bucket=bucket, Key=delete_file)
s3fs.update()
verify_cache(bucket, keys)

cache_file = s3fs._get_cached_file_name(bucket, "base", delete_file)
assert not os.path.exists(cache_file)

# we want empty directories to get deleted from the local cache

# after this one, `files` should still exist
files_dir = os.path.dirname(cache_file)
assert os.path.exists(files_dir)

# s3fs.update()
# verify_cache(bucket, keys)
# but after the last file is deleted, the directory and any parents
# should be deleted too
delete_file = "files/conf.d/foo.conf"
del keys[delete_file]
conn.delete_object(Bucket=bucket, Key=delete_file)

# cache_file = s3fs._get_cached_file_name(bucket, "base", delete_file)
# assert not os.path.exists(cache_file)
s3fs.update()
verify_cache(bucket, keys)

cache_file = s3fs._get_cached_file_name(bucket, "base", delete_file)
assert not os.path.exists(cache_file)

# after this, `files/conf.d` and `files` should be deleted
conf_d_dir = os.path.dirname(cache_file)
assert not os.path.exists(conf_d_dir)
assert not os.path.exists(files_dir)


@mock_s3
Expand Down Expand Up @@ -148,8 +172,7 @@ def test_s3_hash(bucket, s3):

def test_cache_round_trip(bucket):
metadata = {"foo": "bar"}
cache_file = s3fs._get_cached_file_name(bucket, "base", "somefile")

cache_file = s3fs._get_buckets_cache_filename()
s3fs._write_buckets_cache_file(metadata, cache_file)
assert s3fs._read_buckets_cache_file(cache_file) == metadata

Expand Down

0 comments on commit 96794a5

Please sign in to comment.