From b2ffdc66423406bf0aa9630d9cac8245ed8bae92 Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Thu, 4 Jun 2026 11:36:07 +0300 Subject: [PATCH] PostgresNode is updated (self._os_ops is used always) --- src/node.py | 76 ++++++++++++++++++++++++++--------------------------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/src/node.py b/src/node.py index 032b6306..e78a70b5 100644 --- a/src/node.py +++ b/src/node.py @@ -479,18 +479,18 @@ def master(self): @property def base_dir(self): if not self._base_dir: - self._base_dir = self.os_ops.mkdtemp(prefix=self._prefix or TMP_NODE) + self._base_dir = self._os_ops.mkdtemp(prefix=self._prefix or TMP_NODE) # NOTE: it's safe to create a new dir - if not self.os_ops.path_exists(self._base_dir): - self.os_ops.makedirs(self._base_dir) + if not self._os_ops.path_exists(self._base_dir): + self._os_ops.makedirs(self._base_dir) return self._base_dir @property def bin_dir(self): if not self._bin_dir: - self._bin_dir = os.path.dirname(get_bin_path2(self.os_ops, "pg_config")) + self._bin_dir = os.path.dirname(get_bin_path2(self._os_ops, "pg_config")) return self._bin_dir @property @@ -502,8 +502,8 @@ def logs_dir(self): assert type(path) is str # NOTE: it's safe to create a new dir - if not self.os_ops.path_exists(path): - self.os_ops.makedirs(path) + if not self._os_ops.path_exists(path): + self._os_ops.makedirs(path) return path @@ -580,7 +580,7 @@ def _try_shutdown(self, max_attempts, with_force=False): ps_command = ['ps', '-o', 'pid=', '-p', str(node_pid)] - ps_output = self.os_ops.exec_command(cmd=ps_command, shell=True, ignore_errors=True).decode('utf-8') + ps_output = self._os_ops.exec_command(cmd=ps_command, shell=True, ignore_errors=True).decode('utf-8') assert type(ps_output) is str if ps_output == "": @@ -593,13 +593,13 @@ def _try_shutdown(self, max_attempts, with_force=False): try: eprint('Force stopping node {0} with PID {1}'.format(self.name, node_pid)) - self.os_ops.kill(node_pid, signal.SIGKILL) + self._os_ops.kill(node_pid, signal.SIGKILL) except Exception: # The node has already stopped pass # Check that node stopped - print only column pid without headers - ps_output = self.os_ops.exec_command(cmd=ps_command, shell=True, ignore_errors=True).decode('utf-8') + ps_output = self._os_ops.exec_command(cmd=ps_command, shell=True, ignore_errors=True).decode('utf-8') assert type(ps_output) is str if ps_output == "": @@ -662,7 +662,7 @@ def _create_recovery_conf(self, username, slot=None): signal_name = self._os_ops.build_path(self.data_dir, "standby.signal") assert type(signal_name) is str - self.os_ops.touch(signal_name) + self._os_ops.touch(signal_name) else: line += "standby_mode=on\n" @@ -723,10 +723,10 @@ def _collect_special_files(self) -> typing.List[typing.Tuple[str, bytes]]: for f, num_lines in files: # skip missing files - if not self.os_ops.path_exists(f): + if not self._os_ops.path_exists(f): continue - file_lines = self.os_ops.readlines(f, num_lines, binary=True, encoding=None) + file_lines = self._os_ops.readlines(f, num_lines, binary=True, encoding=None) lines = b''.join(file_lines) # fill list @@ -793,14 +793,14 @@ def default_conf(self, # filter lines in hba file # get rid of comments and blank lines - hba_conf_file = self.os_ops.readlines(hba_conf) + hba_conf_file = self._os_ops.readlines(hba_conf) lines = [ s for s in hba_conf_file if len(s.strip()) > 0 and not s.startswith('#') ] # write filtered lines - self.os_ops.write(hba_conf, lines, truncate=True) + self._os_ops.write(hba_conf, lines, truncate=True) # replication-related settings if allow_streaming: @@ -812,7 +812,7 @@ def get_auth_method(t): # get auth methods auth_local = get_auth_method('local') auth_host = get_auth_method('host') - subnet_base = ".".join(self.os_ops.host.split('.')[:-1] + ['0']) + subnet_base = ".".join(self._os_ops.host.split('.')[:-1] + ['0']) new_lines = [ u"local\treplication\tall\t\t\t{}\n".format(auth_local), @@ -825,10 +825,10 @@ def get_auth_method(t): ] # yapf: disable # write missing lines - self.os_ops.write(hba_conf, new_lines) + self._os_ops.write(hba_conf, new_lines) # overwrite config file - self.os_ops.write(postgres_conf, '', truncate=True) + self._os_ops.write(postgres_conf, '', truncate=True) self.append_conf(fsync=fsync, max_worker_processes=MAX_WORKER_PROCESSES, @@ -908,7 +908,7 @@ def append_conf(self, line='', filename=PG_CONF_FILE, **kwargs): conf_text = '' for line in lines: conf_text += text_type(line) + '\n' - self.os_ops.write(config_name, conf_text) + self._os_ops.write(config_name, conf_text) return self @@ -941,7 +941,7 @@ def get_control_data(self): _params += ["-D"] if self._pg_version >= PgVer('9.5') else [] _params += [self.data_dir] - data = execute_utility2(self.os_ops, _params, self.utils_log_file) + data = execute_utility2(self._os_ops, _params, self.utils_log_file) out_dict = {} @@ -1096,7 +1096,7 @@ def _start( def LOCAL__start_node(): # 'error' will be None on Windows - _, _, error = execute_utility2(self.os_ops, _params, self.utils_log_file, verbose=True, exec_env=exec_env) + _, _, error = execute_utility2(self._os_ops, _params, self.utils_log_file, verbose=True, exec_env=exec_env) assert error is None or type(error) is str if error and 'does not exist' in error: raise Exception(error) @@ -1185,7 +1185,7 @@ def stop(self, params=[], wait=True): "stop" ] + params # yapf: disable - execute_utility2(self.os_ops, _params, self.utils_log_file) + execute_utility2(self._os_ops, _params, self.utils_log_file) self._manually_started_pm_pid = None @@ -1242,7 +1242,7 @@ def restart(self, params=[]): ] + params # yapf: disable try: - error_code, out, error = execute_utility2(self.os_ops, _params, self.utils_log_file, verbose=True) + error_code, out, error = execute_utility2(self._os_ops, _params, self.utils_log_file, verbose=True) if error and 'could not start server' in error: raise ExecUtilException except ExecUtilException as e: @@ -1271,7 +1271,7 @@ def reload(self, params=[]): "reload" ] + params # yapf: disable - execute_utility2(self.os_ops, _params, self.utils_log_file) + execute_utility2(self._os_ops, _params, self.utils_log_file) return self @@ -1293,7 +1293,7 @@ def promote(self, dbname=None, username=None): "promote" ] # yapf: disable - execute_utility2(self.os_ops, _params, self.utils_log_file) + execute_utility2(self._os_ops, _params, self.utils_log_file) # for versions below 10 `promote` is asynchronous so we need to wait # until it actually becomes writable @@ -1328,7 +1328,7 @@ def pg_ctl(self, params): "-w" # wait ] + params # yapf: disable - return execute_utility2(self.os_ops, _params, self.utils_log_file) + return execute_utility2(self._os_ops, _params, self.utils_log_file) def release_resources(self): """ @@ -1364,7 +1364,7 @@ def cleanup(self, max_attempts=3, full=False, release_resources=False): else: rm_dir = self.data_dir # just data, save logs - self.os_ops.rmdirs(rm_dir, ignore_errors=False) + self._os_ops.rmdirs(rm_dir, ignore_errors=False) if release_resources: self._release_resources() @@ -1479,7 +1479,7 @@ def _psql( else: raise QueryException('Query or filename must be provided') - return self.os_ops.exec_command( + return self._os_ops.exec_command( psql_params, verbose=True, input=input, @@ -1562,9 +1562,9 @@ def dump(self, # Generate tmpfile or tmpdir def tmpfile(): if format == DumpFormat.Directory: - fname = self.os_ops.mkdtemp(prefix=TMP_DUMP) + fname = self._os_ops.mkdtemp(prefix=TMP_DUMP) else: - fname = self.os_ops.mkstemp(prefix=TMP_DUMP) + fname = self._os_ops.mkstemp(prefix=TMP_DUMP) return fname filename = filename or tmpfile() @@ -1583,7 +1583,7 @@ def tmpfile(): if options: _params.extend(options) - execute_utility2(self.os_ops, _params, self.utils_log_file) + execute_utility2(self._os_ops, _params, self.utils_log_file) return filename @@ -1612,7 +1612,7 @@ def restore(self, filename, dbname=None, username=None): # try pg_restore if dump is binary format, and psql if not try: - execute_utility2(self.os_ops, _params, self.utils_log_name) + execute_utility2(self._os_ops, _params, self.utils_log_name) except ExecUtilException: self.psql(filename=filename, dbname=dbname, username=username) @@ -1887,7 +1887,7 @@ def pgbench(self, # should be the last one _params.append(dbname) - proc = self.os_ops.exec_command(_params, stdout=stdout, stderr=stderr, wait_exit=True, get_process=True) + proc = self._os_ops.exec_command(_params, stdout=stdout, stderr=stderr, wait_exit=True, get_process=True) return proc @@ -1971,7 +1971,7 @@ def pgbench_run(self, dbname=None, username=None, options=[], **kwargs): # should be the last one _params.append(dbname) - return execute_utility2(self.os_ops, _params, self.utils_log_file) + return execute_utility2(self._os_ops, _params, self.utils_log_file) def connect(self, dbname=None, @@ -2060,9 +2060,9 @@ def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}): assert isinstance(self._os_ops, OsOperations) # parse postgresql.auto.conf - path = self.os_ops.build_path(self.data_dir, config) + path = self._os_ops.build_path(self.data_dir, config) - lines = self.os_ops.readlines(path) + lines = self._os_ops.readlines(path) current_options = {} current_directives = [] for line in lines: @@ -2109,7 +2109,7 @@ def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}): for directive in current_directives: auto_conf += directive + "\n" - self.os_ops.write(path, auto_conf, truncate=True) + self._os_ops.write(path, auto_conf, truncate=True) def upgrade_from(self, old_node, options=None, expect_error=False): """ @@ -2144,7 +2144,7 @@ def upgrade_from(self, old_node, options=None, expect_error=False): ] upgrade_command += options - return self.os_ops.exec_command(upgrade_command, expect_error=expect_error) + return self._os_ops.exec_command(upgrade_command, expect_error=expect_error) def _release_resources(self): self._free_port() @@ -2172,7 +2172,7 @@ def _get_bin_path(self, filename): if self.bin_dir: bin_path = self._os_ops.build_path(self.bin_dir, filename) else: - bin_path = get_bin_path2(self.os_ops, filename) + bin_path = get_bin_path2(self._os_ops, filename) return bin_path @staticmethod