Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel FREEZE TABLE #164

Merged
merged 24 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4c3acbf
Freeze and backup thread pools
kirillgarbar Jun 19, 2024
faecbed
Remove freezed data for one table
kirillgarbar Jun 20, 2024
8807c86
Freeze workers option in config
kirillgarbar Jun 20, 2024
1e86b09
More flexibility of data generation for tests
kirillgarbar Jun 22, 2024
6b7182b
Parallel freeze test
kirillgarbar Jun 22, 2024
c45b386
Move executors to subclass
kirillgarbar Jun 22, 2024
ec20451
Partition by prefix to fix dedup tests
kirillgarbar Jun 22, 2024
8b795ae
Fix codespell
kirillgarbar Jun 24, 2024
4f8bfec
Fix for old python
kirillgarbar Jun 24, 2024
7554870
Remove backup executor and backup in the main thread
kirillgarbar Jun 25, 2024
06cb097
Fix black
kirillgarbar Jun 25, 2024
95bc77b
Fix race condition with shadow/increment.txt
kirillgarbar Jun 25, 2024
0214f1e
Parallel backup
kirillgarbar Jul 1, 2024
6933163
Fix unit test - can't pickle lock
kirillgarbar Jul 1, 2024
d0a25c9
Revert "Fix unit test - can't pickle lock"
kirillgarbar Jul 8, 2024
827a66d
Revert "Parallel backup"
kirillgarbar Jul 8, 2024
93da739
Backup tables after all tables are frozen
kirillgarbar Jul 8, 2024
b7a4358
Move thread pool related logic to separate class
kirillgarbar Jul 8, 2024
6d1076a
Move all freeze related logic to TableFreezer
kirillgarbar Jul 8, 2024
c70ad1f
Small fixes
kirillgarbar Jul 8, 2024
190539c
Fix exception suppression in TableFreezer's __exit__
kirillgarbar Jul 8, 2024
59fdf82
Fix mypy
kirillgarbar Jul 9, 2024
2f12cb7
Fix black
kirillgarbar Jul 9, 2024
87f9d30
Move freeze logic to the method
kirillgarbar Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def __init__(self, config: Config) -> None:
self._config = config
self._access_backup_manager = AccessBackup()
self._database_backup_manager = DatabaseBackup()
self._table_backup_manager = TableBackup()
self._table_backup_manager = TableBackup(
self._config["multiprocessing"]["freeze_threads"]
)
self._udf_backup_manager = UDFBackup()
self._nc_backup_manager = NamedCollectionsBackup()

Expand Down
45 changes: 39 additions & 6 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ch_backup.exceptions import ClickhouseBackupError
from ch_backup.util import (
chown_dir_contents,
chown_file,
escape,
list_dir_files,
retry,
Expand Down Expand Up @@ -474,15 +475,27 @@ def system_unfreeze(self, backup_name: str) -> None:
query_sql = SYSTEM_UNFREEZE_SQL.format(backup_name=backup_name)
self._ch_client.query(query_sql, timeout=self._unfreeze_timeout)

def remove_freezed_data(self) -> None:
def remove_freezed_data(
self, backup_name: Optional[str] = None, table: Optional[Table] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's introduce a new object with backup_name and table as members. Or add checking and throwing an exception when only one from the pair is provided.

) -> None:
"""
Remove all freezed partitions from all local disks.
"""
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
if backup_name and table:
for table_data_path, disk in table.paths_with_disks:
if disk.type == "local":
table_relative_path = os.path.relpath(table_data_path, disk.path)
shadow_path = os.path.join(
disk.path, "shadow", backup_name, table_relative_path
)
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
else:
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)

def remove_freezed_part(self, part: FrozenPart) -> None:
"""
Expand Down Expand Up @@ -776,6 +789,26 @@ def chown_dir(self, dir_path: str) -> None:
need_recursion,
)

def create_shadow_increment(self) -> None:
"""
Create shadow/increment.txt to fix race condition with parallel freeze.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some issue or comment in the code of CH about this to mention here ?

Must be used before freezing more than one table at once.
"""
default_shadow_path = Path(self._root_data_path) / "shadow"
increment_path = default_shadow_path / "increment.txt"
if os.path.exists(increment_path):
return
if not os.path.exists(default_shadow_path):
os.mkdir(default_shadow_path)
self.chown_dir(str(default_shadow_path))
with open(increment_path, "w", encoding="utf-8") as file:
file.write("0")
chown_file(
self._ch_ctl_config["user"],
self._ch_ctl_config["group"],
str(increment_path),
)

@retry(OSError)
def _remove_shadow_data(self, path: str) -> None:
if path.find("/shadow") == -1:
Expand Down
2 changes: 2 additions & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def _as_seconds(t: str) -> int:
"workers": 4,
# The number of processes for parts restoring from S3 disks.
"cloud_storage_restore_workers": 4,
# The number of threads for parallel freeze of tables
"freeze_threads": 4,
},
"pipeline": {
# Is asynchronous pipelines used (based on Pypeln library)
Expand Down
Loading
Loading