Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel FREEZE TABLE #164

Merged
merged 24 commits into from
Jul 10, 2024
Merged

Parallel FREEZE TABLE #164

merged 24 commits into from
Jul 10, 2024

Conversation

kirillgarbar
Copy link
Contributor

No description provided.

@kirillgarbar kirillgarbar changed the title Parallel FREEZE TABLE Parallel backup Jul 1, 2024
@aalexfvk
Copy link
Contributor

aalexfvk commented Jul 8, 2024

It looks like we encountered a bug(or bugs) here described here Delgan/loguru#231 for Python 3.6.
Worker process is hanging in logging lock.
It was fixed in loguru for Python >=3.7

I have added printing of stack trace of threads upon getting SIGTERM

def signal_handler(signum, _frame):
    """
    Logs received signal. Useful for troubleshooting.
    """
    logging.info(f"Received signal {signum}")

    for th in threading.enumerate():
        logging.debug(th)
        logging.debug(traceback.format_stack(sys._current_frames()[th.ident]))

Traceback:

2024-07-08 17:02:03,782 Process-3    1279 [DEBUG   ] ch-backup: ['  File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
', '  File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
', '  File "/usr/local/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
', '  File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 69, in _worker
    work_item.run()
', '  File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
', '  File "/var/tmp/ch-backup/ch_backup/logic/table.py", line 200, in _backup_table
    context, db, table, backup_name, schema_only, mtimes, create_statement
', '  File "/var/tmp/ch-backup/ch_backup/logic/table.py", line 423, in _backup_table_after_freeze
    context.backup_meta.name, db, table, create_statement
', '  File "/var/tmp/ch-backup/ch_backup/backup/layout.py", line 99, in upload_table_create_statement
    create_statement, remote_path, is_async=True, encryption=True
', '  File "/var/tmp/ch-backup/ch_backup/storage/loader.py", line 45, in upload_data
    data, remote_path, is_async=is_async, encryption=encryption
', '  File "/var/tmp/ch-backup/ch_backup/storage/async_pipeline/pipeline_executor.py", line 53, in upload_data
    self._exec_pipeline(job_id, pipeline, is_async)
', '  File "/var/tmp/ch-backup/ch_backup/storage/async_pipeline/pipeline_executor.py", line 224, in _exec_pipeline
    return self._exec_pool.submit(job_id, profile(10, 60)(pipeline), callback)
', '  File "/var/tmp/ch-backup/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py", line 58, in submit
    future = self._pool.submit(func, *args, **kwargs)
', '  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 466, in submit
    self._start_queue_management_thread()
', '  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 427, in _start_queue_management_thread
    self._adjust_process_count()
', '  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 446, in _adjust_process_count
    p.start()
', '  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
', '  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
', '  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in _Popen
    return Popen(process_obj)
', '  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
', '  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in _launch
    code = process_obj._bootstrap()
', '  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
', '  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
', '  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 175, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
', '  File "/var/tmp/ch-backup/ch_backup/profile.py", line 45, in __call__
    result = self.func(*args, **kwargs)
', '  File "/var/tmp/ch-backup/ch_backup/storage/async_pipeline/pipelines.py", line 135, in upload_files_tarball_pipeline
    builder.build_uploading_stage(remote_path, estimated_size)
', '  File "/var/tmp/ch-backup/ch_backup/storage/async_pipeline/pipeline_builder.py", line 211, in build_uploading_stage
    storage = get_storage_engine(stage_config)
', '  File "/var/tmp/ch-backup/ch_backup/storage/engine/__init__.py", line 27, in get_storage_engine
    return SUPPORTED_STORAGES[engine_id](config)
', '  File "/var/tmp/ch-backup/ch_backup/storage/engine/s3/s3_engine.py", line 33, in __init__
    self._s3_client_factory = S3ClientCachedFactory(S3ClientFactory(config))
', '  File "/var/tmp/ch-backup/ch_backup/storage/engine/s3/s3_client_factory.py", line 44, in __init__
    region_name=self._config["boto_config"]["region_name"],
', '  File "/usr/local/lib/python3.6/site-packages/boto3/session.py", line 55, in __init__
    self._session = botocore.session.get_session()
', '  File "/usr/local/lib/python3.6/site-packages/botocore/session.py", line 1045, in get_session
    return Session(env_vars)
', '  File "/usr/local/lib/python3.6/site-packages/botocore/session.py", line 106, in __init__
    self._register_builtin_handlers(self._events)
', '  File "/usr/local/lib/python3.6/site-packages/botocore/session.py", line 178, in _register_builtin_handlers
    self.register(event_name, handler)
', '  File "/usr/local/lib/python3.6/site-packages/botocore/session.py", line 647, in register
    unique_id_uses_count=unique_id_uses_count)
', '  File "/usr/local/lib/python3.6/site-packages/botocore/hooks.py", line 365, in register
    aliased_event_name = self._alias_event_name(event_name)
', '  File "/usr/local/lib/python3.6/site-packages/botocore/hooks.py", line 421, in _alias_event_name
    event_name, new_name
', '  File "/usr/local/lib/python3.6/logging/__init__.py", line 1296, in debug
    self._log(DEBUG, msg, args, **kwargs)
', '  File "/usr/local/lib/python3.6/logging/__init__.py", line 1444, in _log
    self.handle(record)
', '  File "/usr/local/lib/python3.6/logging/__init__.py", line 1454, in handle
    self.callHandlers(record)
', '  File "/usr/local/lib/python3.6/logging/__init__.py", line 1516, in callHandlers
    hdlr.handle(record)
', '  File "/usr/local/lib/python3.6/logging/__init__.py", line 863, in handle
    self.acquire()
', '  File "/usr/local/lib/python3.6/logging/__init__.py", line 814, in acquire
    self.lock.acquire()
', '  File "/var/tmp/ch-backup/ch_backup/cli.py", line 55, in signal_handler
    logging.debug(traceback.format_stack(sys._current_frames()[th.ident]))
']

@@ -48,7 +48,9 @@ def __init__(self, config: Config) -> None:
self._config = config
self._access_backup_manager = AccessBackup()
self._database_backup_manager = DatabaseBackup()
self._table_backup_manager = TableBackup()
self._table_backup_manager = TableBackup(
self._config.get("multiprocessing").get("freeze_workers", 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense use get() only when a default is expected. The error of absent key for [] is more readable.

@kirillgarbar
Copy link
Contributor Author

Now tables are freezed before backup is started, maybe we should raise default number of freeze threads

@@ -474,15 +475,27 @@ def system_unfreeze(self, backup_name: str) -> None:
query_sql = SYSTEM_UNFREEZE_SQL.format(backup_name=backup_name)
self._ch_client.query(query_sql, timeout=self._unfreeze_timeout)

def remove_freezed_data(self) -> None:
def remove_freezed_data(
self, backup_name: Optional[str] = None, table: Optional[Table] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's introduce a new object with backup_name and table as members. Or add checking and throwing an exception when only one from the pair is provided.

@@ -776,6 +789,26 @@ def chown_dir(self, dir_path: str) -> None:
need_recursion,
)

def create_shadow_increment(self) -> None:
"""
Create shadow/increment.txt to fix race condition with parallel freeze.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some issue or comment in the code of CH about this to mention here ?

table.name,
)

table_freezer.freeze_table(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that simpler.
Just pass list of tables to freezer to the synchronous method freeze_tables(or make it as function), returning after all is frozen.
Without accumulating, waiting and context manager.

@aalexfvk
Copy link
Contributor

aalexfvk commented Jul 9, 2024

Now tables are freezed before backup is started, maybe we should raise default number of freeze threads

4 is OK, for starter

@kirillgarbar kirillgarbar changed the title Parallel backup Parallel FREEZE TABLE Jul 9, 2024
@aalexfvk aalexfvk merged commit 248e0ce into yandex:main Jul 10, 2024
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants