Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 38 additions & 38 deletions src/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,18 @@ def master(self):
@property
def base_dir(self):
if not self._base_dir:
self._base_dir = self.os_ops.mkdtemp(prefix=self._prefix or TMP_NODE)
self._base_dir = self._os_ops.mkdtemp(prefix=self._prefix or TMP_NODE)

# NOTE: it's safe to create a new dir
if not self.os_ops.path_exists(self._base_dir):
self.os_ops.makedirs(self._base_dir)
if not self._os_ops.path_exists(self._base_dir):
self._os_ops.makedirs(self._base_dir)

return self._base_dir

@property
def bin_dir(self):
if not self._bin_dir:
self._bin_dir = os.path.dirname(get_bin_path2(self.os_ops, "pg_config"))
self._bin_dir = os.path.dirname(get_bin_path2(self._os_ops, "pg_config"))
return self._bin_dir

@property
Expand All @@ -502,8 +502,8 @@ def logs_dir(self):
assert type(path) is str

# NOTE: it's safe to create a new dir
if not self.os_ops.path_exists(path):
self.os_ops.makedirs(path)
if not self._os_ops.path_exists(path):
self._os_ops.makedirs(path)

return path

Expand Down Expand Up @@ -580,7 +580,7 @@ def _try_shutdown(self, max_attempts, with_force=False):

ps_command = ['ps', '-o', 'pid=', '-p', str(node_pid)]

ps_output = self.os_ops.exec_command(cmd=ps_command, shell=True, ignore_errors=True).decode('utf-8')
ps_output = self._os_ops.exec_command(cmd=ps_command, shell=True, ignore_errors=True).decode('utf-8')
assert type(ps_output) is str

if ps_output == "":
Expand All @@ -593,13 +593,13 @@ def _try_shutdown(self, max_attempts, with_force=False):

try:
eprint('Force stopping node {0} with PID {1}'.format(self.name, node_pid))
self.os_ops.kill(node_pid, signal.SIGKILL)
self._os_ops.kill(node_pid, signal.SIGKILL)
except Exception:
# The node has already stopped
pass

# Check that node stopped - print only column pid without headers
ps_output = self.os_ops.exec_command(cmd=ps_command, shell=True, ignore_errors=True).decode('utf-8')
ps_output = self._os_ops.exec_command(cmd=ps_command, shell=True, ignore_errors=True).decode('utf-8')
assert type(ps_output) is str

if ps_output == "":
Expand Down Expand Up @@ -662,7 +662,7 @@ def _create_recovery_conf(self, username, slot=None):

signal_name = self._os_ops.build_path(self.data_dir, "standby.signal")
assert type(signal_name) is str
self.os_ops.touch(signal_name)
self._os_ops.touch(signal_name)
else:
line += "standby_mode=on\n"

Expand Down Expand Up @@ -723,10 +723,10 @@ def _collect_special_files(self) -> typing.List[typing.Tuple[str, bytes]]:

for f, num_lines in files:
# skip missing files
if not self.os_ops.path_exists(f):
if not self._os_ops.path_exists(f):
continue

file_lines = self.os_ops.readlines(f, num_lines, binary=True, encoding=None)
file_lines = self._os_ops.readlines(f, num_lines, binary=True, encoding=None)
lines = b''.join(file_lines)

# fill list
Expand Down Expand Up @@ -793,14 +793,14 @@ def default_conf(self,

# filter lines in hba file
# get rid of comments and blank lines
hba_conf_file = self.os_ops.readlines(hba_conf)
hba_conf_file = self._os_ops.readlines(hba_conf)
lines = [
s for s in hba_conf_file
if len(s.strip()) > 0 and not s.startswith('#')
]

# write filtered lines
self.os_ops.write(hba_conf, lines, truncate=True)
self._os_ops.write(hba_conf, lines, truncate=True)

# replication-related settings
if allow_streaming:
Expand All @@ -812,7 +812,7 @@ def get_auth_method(t):
# get auth methods
auth_local = get_auth_method('local')
auth_host = get_auth_method('host')
subnet_base = ".".join(self.os_ops.host.split('.')[:-1] + ['0'])
subnet_base = ".".join(self._os_ops.host.split('.')[:-1] + ['0'])

new_lines = [
u"local\treplication\tall\t\t\t{}\n".format(auth_local),
Expand All @@ -825,10 +825,10 @@ def get_auth_method(t):
] # yapf: disable

# write missing lines
self.os_ops.write(hba_conf, new_lines)
self._os_ops.write(hba_conf, new_lines)

# overwrite config file
self.os_ops.write(postgres_conf, '', truncate=True)
self._os_ops.write(postgres_conf, '', truncate=True)

self.append_conf(fsync=fsync,
max_worker_processes=MAX_WORKER_PROCESSES,
Expand Down Expand Up @@ -908,7 +908,7 @@ def append_conf(self, line='', filename=PG_CONF_FILE, **kwargs):
conf_text = ''
for line in lines:
conf_text += text_type(line) + '\n'
self.os_ops.write(config_name, conf_text)
self._os_ops.write(config_name, conf_text)

return self

Expand Down Expand Up @@ -941,7 +941,7 @@ def get_control_data(self):
_params += ["-D"] if self._pg_version >= PgVer('9.5') else []
_params += [self.data_dir]

data = execute_utility2(self.os_ops, _params, self.utils_log_file)
data = execute_utility2(self._os_ops, _params, self.utils_log_file)

out_dict = {}

Expand Down Expand Up @@ -1096,7 +1096,7 @@ def _start(

def LOCAL__start_node():
# 'error' will be None on Windows
_, _, error = execute_utility2(self.os_ops, _params, self.utils_log_file, verbose=True, exec_env=exec_env)
_, _, error = execute_utility2(self._os_ops, _params, self.utils_log_file, verbose=True, exec_env=exec_env)
assert error is None or type(error) is str
if error and 'does not exist' in error:
raise Exception(error)
Expand Down Expand Up @@ -1185,7 +1185,7 @@ def stop(self, params=[], wait=True):
"stop"
] + params # yapf: disable

execute_utility2(self.os_ops, _params, self.utils_log_file)
execute_utility2(self._os_ops, _params, self.utils_log_file)

self._manually_started_pm_pid = None

Expand Down Expand Up @@ -1242,7 +1242,7 @@ def restart(self, params=[]):
] + params # yapf: disable

try:
error_code, out, error = execute_utility2(self.os_ops, _params, self.utils_log_file, verbose=True)
error_code, out, error = execute_utility2(self._os_ops, _params, self.utils_log_file, verbose=True)
if error and 'could not start server' in error:
raise ExecUtilException
except ExecUtilException as e:
Expand Down Expand Up @@ -1271,7 +1271,7 @@ def reload(self, params=[]):
"reload"
] + params # yapf: disable

execute_utility2(self.os_ops, _params, self.utils_log_file)
execute_utility2(self._os_ops, _params, self.utils_log_file)

return self

Expand All @@ -1293,7 +1293,7 @@ def promote(self, dbname=None, username=None):
"promote"
] # yapf: disable

execute_utility2(self.os_ops, _params, self.utils_log_file)
execute_utility2(self._os_ops, _params, self.utils_log_file)

# for versions below 10 `promote` is asynchronous so we need to wait
# until it actually becomes writable
Expand Down Expand Up @@ -1328,7 +1328,7 @@ def pg_ctl(self, params):
"-w" # wait
] + params # yapf: disable

return execute_utility2(self.os_ops, _params, self.utils_log_file)
return execute_utility2(self._os_ops, _params, self.utils_log_file)

def release_resources(self):
"""
Expand Down Expand Up @@ -1364,7 +1364,7 @@ def cleanup(self, max_attempts=3, full=False, release_resources=False):
else:
rm_dir = self.data_dir # just data, save logs

self.os_ops.rmdirs(rm_dir, ignore_errors=False)
self._os_ops.rmdirs(rm_dir, ignore_errors=False)

if release_resources:
self._release_resources()
Expand Down Expand Up @@ -1479,7 +1479,7 @@ def _psql(
else:
raise QueryException('Query or filename must be provided')

return self.os_ops.exec_command(
return self._os_ops.exec_command(
psql_params,
verbose=True,
input=input,
Expand Down Expand Up @@ -1562,9 +1562,9 @@ def dump(self,
# Generate tmpfile or tmpdir
def tmpfile():
if format == DumpFormat.Directory:
fname = self.os_ops.mkdtemp(prefix=TMP_DUMP)
fname = self._os_ops.mkdtemp(prefix=TMP_DUMP)
else:
fname = self.os_ops.mkstemp(prefix=TMP_DUMP)
fname = self._os_ops.mkstemp(prefix=TMP_DUMP)
return fname

filename = filename or tmpfile()
Expand All @@ -1583,7 +1583,7 @@ def tmpfile():
if options:
_params.extend(options)

execute_utility2(self.os_ops, _params, self.utils_log_file)
execute_utility2(self._os_ops, _params, self.utils_log_file)

return filename

Expand Down Expand Up @@ -1612,7 +1612,7 @@ def restore(self, filename, dbname=None, username=None):

# try pg_restore if dump is binary format, and psql if not
try:
execute_utility2(self.os_ops, _params, self.utils_log_name)
execute_utility2(self._os_ops, _params, self.utils_log_name)
except ExecUtilException:
self.psql(filename=filename, dbname=dbname, username=username)

Expand Down Expand Up @@ -1887,7 +1887,7 @@ def pgbench(self,
# should be the last one
_params.append(dbname)

proc = self.os_ops.exec_command(_params, stdout=stdout, stderr=stderr, wait_exit=True, get_process=True)
proc = self._os_ops.exec_command(_params, stdout=stdout, stderr=stderr, wait_exit=True, get_process=True)

return proc

Expand Down Expand Up @@ -1971,7 +1971,7 @@ def pgbench_run(self, dbname=None, username=None, options=[], **kwargs):
# should be the last one
_params.append(dbname)

return execute_utility2(self.os_ops, _params, self.utils_log_file)
return execute_utility2(self._os_ops, _params, self.utils_log_file)

def connect(self,
dbname=None,
Expand Down Expand Up @@ -2060,9 +2060,9 @@ def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}):
assert isinstance(self._os_ops, OsOperations)

# parse postgresql.auto.conf
path = self.os_ops.build_path(self.data_dir, config)
path = self._os_ops.build_path(self.data_dir, config)

lines = self.os_ops.readlines(path)
lines = self._os_ops.readlines(path)
current_options = {}
current_directives = []
for line in lines:
Expand Down Expand Up @@ -2109,7 +2109,7 @@ def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}):
for directive in current_directives:
auto_conf += directive + "\n"

self.os_ops.write(path, auto_conf, truncate=True)
self._os_ops.write(path, auto_conf, truncate=True)

def upgrade_from(self, old_node, options=None, expect_error=False):
"""
Expand Down Expand Up @@ -2144,7 +2144,7 @@ def upgrade_from(self, old_node, options=None, expect_error=False):
]
upgrade_command += options

return self.os_ops.exec_command(upgrade_command, expect_error=expect_error)
return self._os_ops.exec_command(upgrade_command, expect_error=expect_error)

def _release_resources(self):
self._free_port()
Expand Down Expand Up @@ -2172,7 +2172,7 @@ def _get_bin_path(self, filename):
if self.bin_dir:
bin_path = self._os_ops.build_path(self.bin_dir, filename)
else:
bin_path = get_bin_path2(self.os_ops, filename)
bin_path = get_bin_path2(self._os_ops, filename)
return bin_path

@staticmethod
Expand Down