Skip to content

Commit

Permalink
MDB-21949 Use tmp folder for restore/backup access control entities (#…
Browse files Browse the repository at this point in the history
…155)

* MDB-21949 Use tmp folder for restore/backup access control entities

* Restart Ci

* Review fixes

* fix

* Fix style

* Remove unused
  • Loading branch information
MikhailBurdukov committed May 30, 2024
1 parent 4f20a8d commit 63e5fbb
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 84 deletions.
14 changes: 7 additions & 7 deletions ch_backup/backup/layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,18 @@ def upload_access_control_file(self, backup_name: str, file_name: str) -> None:
raise StorageError(msg) from e

def upload_access_control_files(
self, backup_name: str, file_names: List[str]
self, local_path: str, backup_name: str, file_names: List[str]
) -> None:
"""
Upload access control list.
"""
local_path = self._access_control_path
remote_path = _access_control_data_path(
self.get_backup_path(backup_name), ACCESS_CONTROL_FNAME
)
try:
logging.debug('Uploading access control data "{}"', local_path)
self._storage_loader.upload_files_tarball(
self._access_control_path,
local_path,
remote_path,
files=file_names,
encryption=True,
Expand Down Expand Up @@ -368,14 +367,16 @@ def get_table_create_statement(
remote_path = _table_metadata_path(backup_meta.path, db_name, table_name)
return self._storage_loader.download_data(remote_path, encryption=True)

def download_access_control_file(self, backup_name: str, file_name: str) -> None:
def download_access_control_file(
self, local_path: str, backup_name: str, file_name: str
) -> None:
"""
Download access control object metadata and save on disk.
"""
remote_path = _access_control_data_path(
self.get_backup_path(backup_name), file_name
)
local_path = os.path.join(self._access_control_path, file_name)
local_path = os.path.join(local_path, file_name)
logging.debug(
'Downloading access control metadata "{}" to "{}', remote_path, local_path
)
Expand All @@ -385,14 +386,13 @@ def download_access_control_file(self, backup_name: str, file_name: str) -> None
msg = f"Failed to download access control metadata file {remote_path}"
raise StorageError(msg) from e

def download_access_control(self, backup_name: str) -> None:
def download_access_control(self, local_path: str, backup_name: str) -> None:
"""
Download access control object metadata and save on disk.
"""
remote_path = _access_control_data_path(
self.get_backup_path(backup_name), ACCESS_CONTROL_FNAME
)
local_path = self._access_control_path
logging.debug(
'Downloading access control metadata "{}" to "{}', remote_path, local_path
)
Expand Down
1 change: 1 addition & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def _as_seconds(t: str) -> int:
"data_path": "/var/lib/clickhouse",
"metadata_path": "/var/lib/clickhouse/metadata",
"access_control_path": "/var/lib/clickhouse/access",
"tmp_path": "/var/lib/clickhouse/_mdb_tmp",
"zk_access_control_path": "/clickhouse/access",
"config_dir": "/etc/clickhouse-server/config.d/",
"preprocessed_config_path": "/var/lib/clickhouse/preprocessed_configs/config.xml",
Expand Down
196 changes: 127 additions & 69 deletions ch_backup/logic/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import os
import re
import shutil
from contextlib import contextmanager
from tempfile import TemporaryDirectory, mkdtemp
from typing import Any, Dict, List, Sequence, Union

from kazoo.client import KazooClient
Expand All @@ -14,11 +16,30 @@
from ch_backup.backup.metadata import BackupStorageFormat
from ch_backup.backup_context import BackupContext
from ch_backup.logic.backup_manager import BackupManager
from ch_backup.util import chown_dir_contents
from ch_backup.util import chown_dir_contents, copy_directory_content

CH_MARK_FILE = "need_rebuild_lists.mark"


@contextmanager
def access_control_temp_directory(
directory_path: str, backup_name: str, keep_on_exception: bool = False
) -> Any:
"""
Class to automatically create and remove temporary directory.
If keep flag is set, then in case of exception, do not remove the folder.
"""
_prefix = backup_name + "_tmp_"
if keep_on_exception:
tmpdir = mkdtemp(prefix=_prefix, dir=directory_path)
yield tmpdir
shutil.rmtree(tmpdir)
else:
with TemporaryDirectory(prefix=_prefix, dir=directory_path) as tmpdir:
yield tmpdir


class AccessBackup(BackupManager):
"""
Access backup class
Expand All @@ -28,13 +49,41 @@ def backup(self, context: BackupContext) -> None:
"""
Backup access control entities.
"""
objects = context.ch_ctl.get_access_control_objects()
context.backup_meta.set_access_control(objects)
access_control = context.backup_meta.access_control

if self._has_replicated_access(context):
self._backup_replicated(access_control.acl_ids, context)
self._backup_local(access_control.acl_ids, context)
clickhouse_access_path = context.ch_ctl_conf["access_control_path"]
user = context.ch_ctl_conf["user"]
group = context.ch_ctl_conf["group"]

self._prepare_folder(clickhouse_access_path, user, group)
self._prepare_folder(context.ch_ctl_conf["tmp_path"], user, group, False)

with access_control_temp_directory(
context.ch_ctl_conf["tmp_path"],
context.backup_meta.name,
keep_on_exception=True,
) as backup_tmp_path:
objects = context.ch_ctl.get_access_control_objects()
context.backup_meta.set_access_control(objects)
access_control = context.backup_meta.access_control

if self._has_replicated_access(context):
self._backup_replicated(
backup_tmp_path, access_control.acl_ids, context
)
self._backup_local(clickhouse_access_path, backup_tmp_path)

assert (
context.backup_meta.access_control.backup_format
== BackupStorageFormat.TAR
)
chown_dir_contents(user, group, backup_tmp_path)

acl_file_names = _get_access_control_files(access_control.acl_ids)
context.backup_layout.upload_access_control_files(
backup_tmp_path,
context.backup_meta.name,
acl_file_names,
)

def restore(self, context: BackupContext) -> None:
"""
Expand All @@ -48,11 +97,28 @@ def restore(self, context: BackupContext) -> None:
return

has_replicated_access = self._has_replicated_access(context)
mark_to_rebuild = not has_replicated_access

self._restore_local(acl_ids, context, mark_to_rebuild=mark_to_rebuild)
if has_replicated_access:
self._restore_replicated(acl_ids, acl_meta, context)
clickhouse_access_path = context.ch_ctl_conf["access_control_path"]
user = context.ch_ctl_conf["user"]
group = context.ch_ctl_conf["group"]

self._prepare_folder(clickhouse_access_path, user, group, True)
self._prepare_folder(context.ch_ctl_conf["tmp_path"], user, group, False)

with access_control_temp_directory(
context.ch_ctl_conf["tmp_path"],
context.backup_meta.name,
keep_on_exception=True,
) as restore_tmp_path:

self._download_access_control_list(context, restore_tmp_path, acl_ids)

if has_replicated_access:
self._restore_replicated(restore_tmp_path, acl_ids, acl_meta, context)
else:
self._restore_local(
restore_tmp_path, clickhouse_access_path, user, group
)

def fix_admin_user(self, context: BackupContext, dry_run: bool = True) -> None:
"""
Expand Down Expand Up @@ -109,7 +175,9 @@ def fix_admin_user(self, context: BackupContext, dry_run: bool = True) -> None:
logging.debug(f"Node {zk_path} not found.")

# cleanup SQL file
file_path = _get_access_file_path(context, f"{uuid}.sql")
file_path = os.path.join(
context.ch_ctl_conf["access_control_path"], f"{uuid}.sql"
)
logging.debug(f"Removing file {file_path}")
try:
if dry_run:
Expand All @@ -121,65 +189,72 @@ def fix_admin_user(self, context: BackupContext, dry_run: bool = True) -> None:
except FileNotFoundError:
logging.debug(f"File {file_path} not found.")

def _prepare_folder(
self, path: str, user: str, group: str, cleanup: bool = False
) -> None:
if cleanup and os.path.exists(path):
shutil.rmtree(path)

os.makedirs(path, exist_ok=True)
shutil.chown(path, user, group)

def _download_access_control_list(
self, context: BackupContext, restore_tmp_path: str, acl_ids: List[str]
) -> None:
if context.backup_meta.access_control.backup_format == BackupStorageFormat.TAR:
context.backup_layout.download_access_control(
restore_tmp_path, context.backup_meta.name
)
else:
for name in _get_access_control_files(acl_ids):
context.backup_layout.download_access_control_file(
restore_tmp_path, context.backup_meta.name, name
)

def _clean_user_uuid(self, raw_str: str) -> str:
return re.sub(r"EXCEPT ID\('(.+)'\)", "", raw_str)

def _backup_local(self, acl_ids: Sequence[str], context: BackupContext) -> None:
def _backup_local(self, clickhouse_access_path: str, backup_tmp_path: str) -> None:
"""
Backup access entities from local storage.
Backup access entities from local storage to temporary folder.
"""
assert (
context.backup_meta.access_control.backup_format == BackupStorageFormat.TAR
)

logging.debug(f"Backupping {len(acl_ids)} access entities from local storage")
acl_file_names = _get_access_control_files(acl_ids)
context.backup_layout.upload_access_control_files(
context.backup_meta.name, acl_file_names
logging.debug(
"Backupping access entities from local storage to {}", backup_tmp_path
)
copy_directory_content(clickhouse_access_path, backup_tmp_path)

def _backup_replicated(
self, acl_list: Sequence[str], context: BackupContext
self, backup_tmp_path: str, acl_list: Sequence[str], context: BackupContext
) -> None:
"""
Backup access entities from replicated storage (ZK/CK).
"""
_ensure_access_control_path(context)
logging.debug(
f"Backupping {len(acl_list)} access entities from replicated storage"
)
with context.zk_ctl.zk_client as zk_client:
for uuid in acl_list:
uuid_zk_path = _get_access_zk_path(context, f"/uuid/{uuid}")
data, _ = zk_client.get(uuid_zk_path)
_file_create(context, f"{uuid}.sql", data.decode())
_chown_access_control_dir(context)
_create_access_file(backup_tmp_path, f"{uuid}.sql", data.decode())

def _restore_local(
self,
acl_list: Sequence[str],
context: BackupContext,
mark_to_rebuild: bool = True,
restore_tmp_path: str,
clickhouse_access_path: str,
user: str,
group: str,
) -> None:
"""
Restore access entities to local storage.
"""
_ensure_access_control_path(context)
logging.debug(f"Restoring {len(acl_list)} access entities to local storage")

if context.backup_meta.access_control.backup_format == BackupStorageFormat.TAR:
context.backup_layout.download_access_control(context.backup_meta.name)
else:
for name in _get_access_control_files(acl_list):
context.backup_layout.download_access_control_file(
context.backup_meta.name, name
)

if mark_to_rebuild:
self._mark_to_rebuild(context)
copy_directory_content(restore_tmp_path, clickhouse_access_path)
self._mark_to_rebuild(clickhouse_access_path, user, group)

def _restore_replicated(
self,
restore_tmp_path: str,
acl_list: Sequence[str],
acl_meta: Dict[str, Dict[str, Any]],
context: BackupContext,
Expand All @@ -202,7 +277,7 @@ def _restore_replicated(
name, obj_char = meta_data["name"], meta_data["char"]

# restore object data
file_path = _get_access_file_path(context, f"{uuid}.sql")
file_path = os.path.join(restore_tmp_path, f"{uuid}.sql")
with open(file_path, "r", encoding="utf-8") as file:
data = file.read()
uuid_zk_path = _get_access_zk_path(context, f"/uuid/{uuid}")
Expand All @@ -212,14 +287,16 @@ def _restore_replicated(
uuid_zk_path = _get_access_zk_path(context, f"/{obj_char}/{name}")
_zk_upsert_data(zk_client, uuid_zk_path, uuid)

def _mark_to_rebuild(self, context: BackupContext) -> None:
def _mark_to_rebuild(
self, clickhouse_access_path: str, user: str, group: str
) -> None:
"""
Creates special mark file to rebuild the lists.
"""
mark_file = _get_access_file_path(context, CH_MARK_FILE)
mark_file = os.path.join(clickhouse_access_path, CH_MARK_FILE)
with open(mark_file, "a", encoding="utf-8"):
pass
_chown_path(context, mark_file)
shutil.chown(mark_file, user, group)

def _has_replicated_access(self, context: BackupContext) -> bool:
return (
Expand All @@ -232,11 +309,8 @@ def _get_access_control_files(objects: Sequence[str]) -> List[str]:
"""
Return list of file to be backuped/restored.
"""
return [f"{obj}.sql" for obj in objects]


def _get_access_file_path(context: BackupContext, file_name: str) -> str:
return os.path.join(context.ch_ctl_conf["access_control_path"], file_name)
return [f"{obj}.sql" for obj in objects]


def _get_access_zk_path(context: BackupContext, zk_path: str) -> str:
Expand All @@ -259,28 +333,12 @@ def _zk_upsert_data(zk: KazooClient, path: str, value: Union[str, bytes]) -> Non
zk.create(path, value, makepath=True)


def _file_create(context: BackupContext, file_name: str, file_content: str = "") -> str:
file_path = _get_access_file_path(context, file_name)
def _create_access_file(
backup_tmp_path: str, file_name: str, file_content: str = ""
) -> str:
file_path = os.path.join(backup_tmp_path, file_name)
logging.debug(f'Creating "{file_path}" access entity file')
with open(file_path, "w", encoding="utf-8") as file:
file.write(file_content)

return file_path


def _ensure_access_control_path(context: BackupContext) -> None:
acl_path = context.ch_ctl_conf["access_control_path"]
os.makedirs(acl_path, exist_ok=True)
_chown_path(context, acl_path)


def _chown_access_control_dir(context: BackupContext) -> None:
ch_config = context.ch_ctl_conf
chown_dir_contents(
ch_config["user"], ch_config["group"], ch_config["access_control_path"]
)


def _chown_path(context: BackupContext, path: str) -> None:
ch_config = context.ch_ctl_conf
shutil.chown(path, ch_config["user"], ch_config["group"])
13 changes: 13 additions & 0 deletions ch_backup/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,3 +466,16 @@ def __eq__(self, other):
if not getattr(self, slot) == getattr(other, slot):
return False
return True


def copy_directory_content(from_path_dir: str, to_path_dir: str) -> None:
"""
Copy all files from one directory to another.
"""
if to_path_dir[-1] != "/":
to_path_dir += "/"
for subpath in os.listdir(from_path_dir):
subpath_from = os.path.join(from_path_dir, subpath)
subpath_to = os.path.join(to_path_dir, subpath)
if not os.path.exists(subpath_to):
shutil.copy(subpath_from, to_path_dir)
Loading

0 comments on commit 63e5fbb

Please sign in to comment.