From e1f140f436ba3657489bd681dc552e6d0726d35c Mon Sep 17 00:00:00 2001 From: Ryan Lambert Date: Fri, 25 Nov 2022 20:59:31 -0700 Subject: [PATCH 1/4] Improve logging output with append mode --- docker/pgosm_flex.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docker/pgosm_flex.py b/docker/pgosm_flex.py index 1d27f61..dd7c975 100644 --- a/docker/pgosm_flex.py +++ b/docker/pgosm_flex.py @@ -107,7 +107,7 @@ def run_pgosm_flex(ram, region, subregion, append, data_only, debug, success = run_replication_update(skip_nested=skip_nested, flex_path=paths['flex_path']) else: - logger.info('Running normal osm2pgsql mode') + logger.info('Running osm2pgsql without replication') success = run_osm2pgsql_standard(input_file=input_file, out_path=paths['out_path'], flex_path=paths['flex_path'], @@ -490,11 +490,11 @@ def check_replication_exists(): logger.debug(f'osm2pgsql-replication output:\n{output.stdout}') if output.returncode != 0: - err_msg = f'Failure. Return code: {output.returncode}' - logger.warning(err_msg) + logger.info('Replication not previously set up, fresh import.') + logger.debug(f'Return code: {output.returncode}') return False - logger.debug('osm2pgsql-replication status checked.') + logger.debug('Replication set up, candidate for update.') return True From 9a3494ea97f07eca7964b6eebaf5d20d34d8cb75 Mon Sep 17 00:00:00 2001 From: Ryan Lambert Date: Fri, 25 Nov 2022 21:57:41 -0700 Subject: [PATCH 2/4] Replace subprocess.run with subprocess.Popen to provide more logging closer to when it happens instead of all at the end --- docker/pgosm_flex.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/docker/pgosm_flex.py b/docker/pgosm_flex.py index dd7c975..316ac60 100644 --- a/docker/pgosm_flex.py +++ b/docker/pgosm_flex.py @@ -13,6 +13,7 @@ from pathlib import Path import sys import subprocess +from time import sleep import click @@ -364,21 +365,29 @@ def run_osm2pgsql(osm2pgsql_command, flex_path): logger = logging.getLogger('pgosm-flex') logger.info('Running osm2pgsql') - output = subprocess.run(osm2pgsql_command.split(), - text=True, - cwd=flex_path, - check=False, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + with subprocess.Popen(osm2pgsql_command.split(), + cwd=flex_path, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) as process: - logger.info(f'osm2pgsql output: \n {output.stdout}\nEND PgOSM Flex output') + while True: + output = process.stdout.readline() + if process.poll() is not None and output == b'': + break - if output.returncode != 0: - err_msg = f'Failed to run osm2pgsql. Return code: {output.returncode}' + if output: + logger.info(output.strip()) + else: + sleep(1) + + returncode = process.poll() + + if returncode != 0: + err_msg = f'Failed to run osm2pgsql. Return code: {returncode}' logger.error(err_msg) - sys.exit(f'{err_msg} - Check the log output for details.') + sys.exit(f'{err_msg} - Check the log output for details') - logger.info('osm2pgsql completed.') + logger.info('osm2pgsql completed') def check_layerset_places(flex_path): From 85b153a4d2bd0ee62bf84047eb7664e04d55695f Mon Sep 17 00:00:00 2001 From: Ryan Lambert Date: Mon, 28 Nov 2022 19:47:58 -0700 Subject: [PATCH 3/4] Continue moving from subprocess.run to subprocess.Popen --- docker/helpers.py | 49 ++++++++++++++++++++++++++++++------- docker/pgosm_flex.py | 58 +++++++++++--------------------------------- 2 files changed, 54 insertions(+), 53 deletions(-) diff --git a/docker/helpers.py b/docker/helpers.py index 25a0191..8ed9d9c 100644 --- a/docker/helpers.py +++ b/docker/helpers.py @@ -5,6 +5,7 @@ import subprocess import os import sys +from time import sleep import db @@ -23,6 +24,40 @@ def get_today(): return today +def run_command_via_subprocess(cmd, cwd): + """ + Parameters + ----------------------- + cmd : list + cwd : str + + Returns + ----------------------- + status : int + Return code from command + """ + logger = logging.getLogger('pgosm-flex') + with subprocess.Popen(cmd, + cwd=cwd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) as process: + + while True: + output = process.stdout.readline() + if process.poll() is not None and output == b'': + break + + if output: + logger.info(output.strip().decode('utf-8')) + else: + # Only sleep when there wasn't output + sleep(1) + + status = process.poll() + + return status + + def verify_checksum(md5_file, path): """If verfication fails calls `sys.exit()` @@ -35,16 +70,12 @@ def verify_checksum(md5_file, path): logger = logging.getLogger('pgosm-flex') logger.debug(f'Validating {md5_file} in {path}') - output = subprocess.run(['md5sum', '-c', md5_file], - text=True, - check=False, - cwd=path, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + returncode = run_command_via_subprocess(cmd=['md5sum', '-c', md5_file], + cwd=path) + - if output.returncode != 0: - err_msg = 'Failed to validate md5sum. Return code: ' - err_msg += f'{output.returncode} {output.stdout}' + if returncode != 0: + err_msg = f'Failed to validate md5sum. Return code: {output.returncode}' logger.error(err_msg) sys.exit(err_msg) diff --git a/docker/pgosm_flex.py b/docker/pgosm_flex.py index 316ac60..67e7b08 100644 --- a/docker/pgosm_flex.py +++ b/docker/pgosm_flex.py @@ -12,8 +12,6 @@ import os from pathlib import Path import sys -import subprocess -from time import sleep import click @@ -208,17 +206,11 @@ def run_replication_update(skip_nested, flex_path): """ update_cmd = update_cmd.replace('-d $PGOSM_CONN', f'-d {conn_string}') - output = subprocess.run(update_cmd.split(), - text=True, - check=False, - cwd=flex_path, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + returncode = helpers.run_command_via_subprocess(cmd=update_cmd.split(), + cwd=flex_path) - logger.info(f'osm2pgsql-replication output:\n{output.stdout}') - - if output.returncode != 0: - err_msg = f'Failure. Return code: {output.returncode}' + if returncode != 0: + err_msg = f'Failure. Return code: {returncode}' logger.warning(err_msg) return False @@ -365,22 +357,8 @@ def run_osm2pgsql(osm2pgsql_command, flex_path): logger = logging.getLogger('pgosm-flex') logger.info('Running osm2pgsql') - with subprocess.Popen(osm2pgsql_command.split(), - cwd=flex_path, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) as process: - - while True: - output = process.stdout.readline() - if process.poll() is not None and output == b'': - break - - if output: - logger.info(output.strip()) - else: - sleep(1) - - returncode = process.poll() + returncode = helpers.run_command_via_subprocess(cmd=osm2pgsql_command.split(), + cwd=flex_path) if returncode != 0: err_msg = f'Failed to run osm2pgsql. Return code: {returncode}' @@ -490,17 +468,13 @@ def check_replication_exists(): logger.debug(f'Command to check DB for replication status:\n{check_cmd}') conn_string = db.connection_string() check_cmd = check_cmd.replace('-d $PGOSM_CONN', f'-d {conn_string}') - output = subprocess.run(check_cmd.split(), - text=True, - check=False, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - logger.debug(f'osm2pgsql-replication output:\n{output.stdout}') + returncode = helpers.run_command_via_subprocess(cmd=check_cmd.split(), + cwd=None) - if output.returncode != 0: + if returncode != 0: logger.info('Replication not previously set up, fresh import.') - logger.debug(f'Return code: {output.returncode}') + logger.debug(f'Return code: {returncode}') return False logger.debug('Replication set up, candidate for update.') @@ -522,16 +496,12 @@ def run_osm2pgsql_replication_init(pbf_path, pbf_filename): logger.debug(f'Initializing DB for replication with command:\n{init_cmd}') conn_string = db.connection_string() init_cmd = init_cmd.replace('-d $PGOSM_CONN', f'-d {conn_string}') - output = subprocess.run(init_cmd.split(), - text=True, - check=False, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - logger.info(f'osm2pgsql-replication output:\n{output.stdout}') + returncode = helpers.run_command_via_subprocess(cmd=init_cmd.split(), + cwd=None) - if output.returncode != 0: - err_msg = f'Failed to run osm2pgsql-replication. Return code: {output.returncode}' + if returncode != 0: + err_msg = f'Failed to run osm2pgsql-replication. Return code: {returncode}' logger.error(err_msg) sys.exit(f'{err_msg} - Check the log output for details.') From fd20908b12e95d83676272cb08edaccec3fd4c39 Mon Sep 17 00:00:00 2001 From: Ryan Lambert Date: Mon, 28 Nov 2022 19:56:59 -0700 Subject: [PATCH 4/4] Cleanup --- docker/helpers.py | 19 +++++++++---------- docker/pgosm_flex.py | 6 ------ 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/docker/helpers.py b/docker/helpers.py index 8ed9d9c..e421fce 100644 --- a/docker/helpers.py +++ b/docker/helpers.py @@ -25,11 +25,15 @@ def get_today(): def run_command_via_subprocess(cmd, cwd): - """ + """Wraps around subprocess.Popen to run commands outside of Python. Prints + output as it goes, returns the status code from the command. + Parameters ----------------------- cmd : list - cwd : str + Parts of the command to run. + cwd : str or None + Set the working directory, or to None. Returns ----------------------- @@ -37,11 +41,9 @@ def run_command_via_subprocess(cmd, cwd): Return code from command """ logger = logging.getLogger('pgosm-flex') - with subprocess.Popen(cmd, - cwd=cwd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) as process: - + with subprocess.Popen(cmd, cwd=cwd, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) as process: while True: output = process.stdout.readline() if process.poll() is not None and output == b'': @@ -52,9 +54,7 @@ def run_command_via_subprocess(cmd, cwd): else: # Only sleep when there wasn't output sleep(1) - status = process.poll() - return status @@ -73,7 +73,6 @@ def verify_checksum(md5_file, path): returncode = run_command_via_subprocess(cmd=['md5sum', '-c', md5_file], cwd=path) - if returncode != 0: err_msg = f'Failed to validate md5sum. Return code: {output.returncode}' logger.error(err_msg) diff --git a/docker/pgosm_flex.py b/docker/pgosm_flex.py index 67e7b08..02008a8 100644 --- a/docker/pgosm_flex.py +++ b/docker/pgosm_flex.py @@ -205,7 +205,6 @@ def run_replication_update(skip_nested, flex_path): -d $PGOSM_CONN """ update_cmd = update_cmd.replace('-d $PGOSM_CONN', f'-d {conn_string}') - returncode = helpers.run_command_via_subprocess(cmd=update_cmd.split(), cwd=flex_path) @@ -292,7 +291,6 @@ def get_paths(): return paths - def get_export_filename(input_file): """Returns the .sql filename to use for pg_dump. @@ -337,7 +335,6 @@ def get_export_full_path(out_path, export_filename): ----------------- export_path : str """ - if os.path.isabs(export_filename): export_path = export_filename else: @@ -380,7 +377,6 @@ def check_layerset_places(flex_path): skip_nested : boolean """ logger = logging.getLogger('pgosm-flex') - layerset = os.environ.get('PGOSM_LAYERSET') layerset_path = os.environ.get('PGOSM_LAYERSET_PATH') @@ -429,7 +425,6 @@ def run_post_processing(flex_path, skip_nested): if not post_processing_sql: return False - return True @@ -448,7 +443,6 @@ def dump_database(input_file, out_path, skip_dump, data_only, schema_name): logging.getLogger('pgosm-flex').info('Skipping pg_dump') else: export_filename = get_export_filename(input_file) - export_path = get_export_full_path(out_path, export_filename) db.run_pg_dump(export_path=export_path,