diff --git a/scripts/osm2pgsql-replication b/scripts/osm2pgsql-replication index e721da350..966b1fb98 100755 --- a/scripts/osm2pgsql-replication +++ b/scripts/osm2pgsql-replication @@ -140,9 +140,10 @@ def setup_replication_state(conn, table, base_url, seq, date): the given state. """ with conn.cursor() as cur: - cur.execute(sql.SQL('DROP TABLE IF EXISTS {}').format(table)) - cur.execute(sql.SQL("""CREATE TABLE {} - (url TEXT, + # XXX TODO : make it possible to drop a replication source ? + # cur.execute(sql.SQL('DROP TABLE IF EXISTS {}').format(table)) + cur.execute(sql.SQL("""CREATE TABLE IF NOT EXISTS {} + (url TEXT PRIMARY KEY, sequence INTEGER, importdate TIMESTAMP WITH TIME ZONE) """).format(table)) @@ -151,17 +152,17 @@ def setup_replication_state(conn, table, base_url, seq, date): conn.commit() -def update_replication_state(conn, table, seq, date): +def update_replication_state(conn, table, base_url, seq, date): """ Update sequence and date in the replication state table. The table is assumed to exist. """ with conn.cursor() as cur: if date is not None: - cur.execute(sql.SQL('UPDATE {} SET sequence=%s, importdate=%s').format(table), - (seq, date)) + cur.execute(sql.SQL('UPDATE {} SET sequence=%s, importdate=%s WHERE url=%s').format(table), + (seq, date, base_url)) else: - cur.execute(sql.SQL('UPDATE {} SET sequence=%s').format(table), - (seq,)) + cur.execute(sql.SQL('UPDATE {} SET sequence=%s WHERE url=%s').format(table), + (seq, base_url)) conn.commit() @@ -211,42 +212,44 @@ def status(conn, args): if not table_exists(conn, args.table_name): results['status'] = 1 - results['error'] = "Cannot find replication status table. Run 'osm2pgsql-replication init' first." + results['errors'] = ["Cannot find replication status table. Run 'osm2pgsql-replication init' first."] else: with conn.cursor() as cur: cur.execute(sql.SQL('SELECT * FROM {}').format(args.table)) - if cur.rowcount != 1: + if cur.rowcount < 1: results['status'] = 2 - results['error'] = "Updates not set up correctly. Run 'osm2pgsql-updates init' first." + results['errors'] = ["Updates not set up correctly. Run 'osm2pgsql-updates init' first."] else: - - base_url, db_seq, db_ts = cur.fetchone() - db_ts = db_ts.astimezone(dt.timezone.utc) - results['server'] = {} - results['local'] = {} - results['server']['base_url'] = base_url - results['local']['sequence'] = db_seq - results['local']['timestamp'] = db_ts.strftime("%Y-%m-%dT%H:%M:%SZ") - - - repl = ReplicationServer(base_url) - state_info = repl.get_state_info() - if state_info is None: - # PyOsmium was unable to download the state information - results['status'] = 3 - results['error'] = "Unable to download the state information from {}".format(base_url) - else: - results['status'] = 0 - now = dt.datetime.now(dt.timezone.utc) - - server_seq, server_ts = state_info - server_ts = server_ts.astimezone(dt.timezone.utc) - - results['server']['sequence'] = server_seq - results['server']['timestamp'] = server_ts.strftime("%Y-%m-%dT%H:%M:%SZ") - results['server']['age_sec'] = int((now-server_ts).total_seconds()) - - results['local']['age_sec'] = int((now - db_ts).total_seconds()) + results['replications'] = [] + results['status'] = 0 + results['errors'] = [] + for (base_url, db_seq, db_ts) in cur.fetchall(): + db_ts = db_ts.astimezone(dt.timezone.utc) + replication = {"server": {}, "local": {}} + replication['server']['base_url'] = base_url + replication['local']['sequence'] = db_seq + replication['local']['timestamp'] = db_ts.strftime("%Y-%m-%dT%H:%M:%SZ") + + + repl = ReplicationServer(base_url) + state_info = repl.get_state_info() + if state_info is None: + # PyOsmium was unable to download the state information + results['status'] = 3 + results['errors'].append("Unable to download the state information from {}".format(base_url)) + else: + results['status'] = max(results['status'], 0) + now = dt.datetime.now(dt.timezone.utc) + + server_seq, server_ts = state_info + server_ts = server_ts.astimezone(dt.timezone.utc) + + replication['server']['sequence'] = server_seq + replication['server']['timestamp'] = server_ts.strftime("%Y-%m-%dT%H:%M:%SZ") + replication['server']['age_sec'] = int((now-server_ts).total_seconds()) + + replication['local']['age_sec'] = int((now - db_ts).total_seconds()) + results['replications'].append(replication) if args.json: print(json.dumps(results)) @@ -254,19 +257,20 @@ def status(conn, args): if results['status'] != 0: LOG.fatal(results['error']) else: - print("Using replication service '{}', which is at sequence {} ( {} )".format( - results['server']['base_url'], results['server']['sequence'], results['server']['timestamp'])) - print("Replication server's most recent data is {} old".format(pretty_format_timedelta(results['server']['age_sec']))) - - if results['local']['sequence'] == results['server']['sequence']: - print("Local database is up to date with server") - else: - print("Local database is {} sequences behind the server, i.e. {}".format( - results['server']['sequence'] - results['local']['sequence'], - pretty_format_timedelta(results['local']['age_sec'] - results['server']['age_sec']) - )) - - print("Local database's most recent data is {} old".format(pretty_format_timedelta(results['local']['age_sec']))) + for replication in results['replications']: + print("Using replication service '{}', which is at sequence {} ( {} )".format( + replication['server']['base_url'], replication['server']['sequence'], replication['server']['timestamp'])) + print("Replication server's most recent data is {} old".format(pretty_format_timedelta(replication['server']['age_sec']))) + + if replication['local']['sequence'] == replication['server']['sequence']: + print("Local database is up to date with server") + else: + print("Local database is {} sequences behind the server, i.e. {}".format( + replication['server']['sequence'] - replication['local']['sequence'], + pretty_format_timedelta(replication['local']['age_sec'] - replication['server']['age_sec']) + )) + + print("Local database's most recent data is {} old".format(pretty_format_timedelta(replication['local']['age_sec']))) return results['status'] @@ -358,76 +362,79 @@ def update(conn, args): "Run 'osm2pgsql-replication init' first.") return 1 + replication_streams = [] with conn.cursor() as cur: cur.execute(sql.SQL('SELECT * FROM {}').format(args.table)) - if cur.rowcount != 1: + if cur.rowcount < 1: LOG.fatal("Updates not set up correctly. Run 'osm2pgsql-updates init' first.") return 1 - base_url, seq, ts = cur.fetchone() + replication_streams = cur.fetchall() + + for (base_url, seq, ts) in replication_streams: LOG.info("Using replication service '%s'. Current sequence %d (%s).", base_url, seq, ts) - repl = ReplicationServer(base_url) - current = repl.get_state_info() - - if seq >= current.sequence: - LOG.info("Database already up-to-date.") - return 0 - - if args.diff_file is not None: - outfile = Path(args.diff_file) - else: - tmpdir = tempfile.TemporaryDirectory() - outfile = Path(tmpdir.name) / 'osm2pgsql_diff.osc.gz' - - osm2pgsql = [args.osm2pgsql_cmd, '--append', '--slim', '--prefix', args.prefix] - osm2pgsql.extend(args.extra_params) - if args.database: - osm2pgsql.extend(('-d', args.database)) - if args.username: - osm2pgsql.extend(('-U', args.username)) - if args.host: - osm2pgsql.extend(('-H', args.host)) - if args.port: - osm2pgsql.extend(('-P', args.port)) - osm2pgsql.append(str(outfile)) - LOG.debug("Calling osm2pgsql with: %s", ' '.join(osm2pgsql)) - - while seq < current.sequence: - LOG.debug("Importing from sequence %d", seq) - if outfile.exists(): - outfile.unlink() - outhandler = WriteHandler(str(outfile)) - endseq = repl.apply_diffs(outhandler, seq + 1, - max_size=args.max_diff_size * 1024) - outhandler.close() - - if endseq is None: - LOG.debug("No new diffs found.") - break - - subprocess.run(osm2pgsql, check=True) - seq = endseq - - nextstate = repl.get_state_info(seq) - timestamp = nextstate.timestamp if nextstate else None - - if args.post_processing: - cmd = [args.post_processing, str(endseq), str(timestamp or '')] - LOG.debug('Calling post-processing script: %s', ' '.join(cmd)) - subprocess.run(cmd, check=True) - - update_replication_state(conn, args.table, seq, - nextstate.timestamp if nextstate else None) - - if nextstate is not None: - LOG.info("Data imported until %s. Backlog remaining: %s", - nextstate.timestamp, - dt.datetime.now(dt.timezone.utc) - nextstate.timestamp) - - if args.once: - break + repl = ReplicationServer(base_url) + current = repl.get_state_info() + + if seq >= current.sequence: + LOG.info("Database already up-to-date.") + continue + + if args.diff_file is not None: + outfile = Path(args.diff_file) + else: + tmpdir = tempfile.TemporaryDirectory() + outfile = Path(tmpdir.name) / 'osm2pgsql_diff.osc.gz' + + osm2pgsql = [args.osm2pgsql_cmd, '--append', '--slim', '--prefix', args.prefix] + osm2pgsql.extend(args.extra_params) + if args.database: + osm2pgsql.extend(('-d', args.database)) + if args.username: + osm2pgsql.extend(('-U', args.username)) + if args.host: + osm2pgsql.extend(('-H', args.host)) + if args.port: + osm2pgsql.extend(('-P', args.port)) + osm2pgsql.append(str(outfile)) + LOG.debug("Calling osm2pgsql with: %s", ' '.join(osm2pgsql)) + + while seq < current.sequence: + LOG.debug("Importing from sequence %d", seq) + if outfile.exists(): + outfile.unlink() + outhandler = WriteHandler(str(outfile)) + endseq = repl.apply_diffs(outhandler, seq + 1, + max_size=args.max_diff_size * 1024) + outhandler.close() + + if endseq is None: + LOG.debug("No new diffs found.") + break + + subprocess.run(osm2pgsql, check=True) + seq = endseq + + nextstate = repl.get_state_info(seq) + timestamp = nextstate.timestamp if nextstate else None + + if args.post_processing: + cmd = [args.post_processing, str(endseq), str(timestamp or '')] + LOG.debug('Calling post-processing script: %s', ' '.join(cmd)) + subprocess.run(cmd, check=True) + + update_replication_state(conn, args.table, base_url, seq, + nextstate.timestamp if nextstate else None) + + if nextstate is not None: + LOG.info("Data imported until %s. Backlog remaining: %s", + nextstate.timestamp, + dt.datetime.now(dt.timezone.utc) - nextstate.timestamp) + + if args.once: + break return 0