Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Make synapse_port_db correctly create indexes (#6102)
Browse files Browse the repository at this point in the history
Make `synapse_port_db` correctly create indexes in the PostgreSQL database, by having it run the background updates on the database before migrating the data.

To ensure we're migrating the right data, also block the port if the SQLite3 database still has pending or ongoing background updates.

Fixes #4877
  • Loading branch information
babolivier committed Oct 23, 2019
1 parent 409c62b commit c97ed64
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 52 deletions.
1 change: 1 addition & 0 deletions changelog.d/6102.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the `synapse_port_db` script create the right indexes on a new PostgreSQL database.
182 changes: 130 additions & 52 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,9 +30,23 @@ import yaml
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor

from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import PreserveLoggingContext
from synapse.storage._base import LoggingTransaction
from synapse.storage.client_ips import ClientIpBackgroundUpdateStore
from synapse.storage.deviceinbox import DeviceInboxBackgroundUpdateStore
from synapse.storage.devices import DeviceBackgroundUpdateStore
from synapse.storage.engines import create_engine
from synapse.storage.events_bg_updates import EventsBackgroundUpdatesStore
from synapse.storage.media_repository import MediaRepositoryBackgroundUpdateStore
from synapse.storage.prepare_database import prepare_database
from synapse.storage.registration import RegistrationBackgroundUpdateStore
from synapse.storage.roommember import RoomMemberBackgroundUpdateStore
from synapse.storage.search import SearchBackgroundUpdateStore
from synapse.storage.state import StateBackgroundUpdateStore
from synapse.storage.stats import StatsStore
from synapse.storage.user_directory import UserDirectoryBackgroundUpdateStore
from synapse.util import Clock

logger = logging.getLogger("synapse_port_db")

Expand Down Expand Up @@ -98,33 +113,24 @@ APPEND_ONLY_TABLES = [
end_error_exec_info = None


class Store(object):
"""This object is used to pull out some of the convenience API from the
Storage layer.
*All* database interactions should go through this object.
"""

def __init__(self, db_pool, engine):
self.db_pool = db_pool
self.database_engine = engine

_simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
_simple_insert = SQLBaseStore.__dict__["_simple_insert"]

_simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
_simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
_simple_select_one = SQLBaseStore.__dict__["_simple_select_one"]
_simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"]
_simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
_simple_select_one_onecol_txn = SQLBaseStore.__dict__[
"_simple_select_one_onecol_txn"
]

_simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
_simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
_simple_update_txn = SQLBaseStore.__dict__["_simple_update_txn"]
class Store(
ClientIpBackgroundUpdateStore,
DeviceInboxBackgroundUpdateStore,
DeviceBackgroundUpdateStore,
EventsBackgroundUpdatesStore,
MediaRepositoryBackgroundUpdateStore,
RegistrationBackgroundUpdateStore,
RoomMemberBackgroundUpdateStore,
SearchBackgroundUpdateStore,
StateBackgroundUpdateStore,
UserDirectoryBackgroundUpdateStore,
StatsStore,
):
def __init__(self, db_conn, hs):
super().__init__(db_conn, hs)
self.db_pool = hs.get_db_pool()

@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
def r(conn):
try:
Expand All @@ -150,7 +156,8 @@ class Store(object):
logger.debug("[TXN FAIL] {%s} %s", desc, e)
raise

return self.db_pool.runWithConnection(r)
with PreserveLoggingContext():
return (yield self.db_pool.runWithConnection(r))

def execute(self, f, *args, **kwargs):
return self.runInteraction(f.__name__, f, *args, **kwargs)
Expand All @@ -176,6 +183,25 @@ class Store(object):
raise


class MockHomeserver:
def __init__(self, config, database_engine, db_conn, db_pool):
self.database_engine = database_engine
self.db_conn = db_conn
self.db_pool = db_pool
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server_name

def get_db_conn(self):
return self.db_conn

def get_db_pool(self):
return self.db_pool

def get_clock(self):
return self.clock


class Porter(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
Expand Down Expand Up @@ -447,31 +473,75 @@ class Porter(object):

db_conn.commit()

return db_conn

@defer.inlineCallbacks
def run(self):
try:
sqlite_db_pool = adbapi.ConnectionPool(
self.sqlite_config["name"], **self.sqlite_config["args"]
)
def build_db_store(self, config):
"""Builds and returns a database store using the provided configuration.
postgres_db_pool = adbapi.ConnectionPool(
self.postgres_config["name"], **self.postgres_config["args"]
)
Args:
config: The database configuration, i.e. a dict following the structure of
the "database" section of Synapse's configuration file.
Returns:
The built Store object.
"""
engine = create_engine(config)

self.progress.set_state("Preparing %s" % config["name"])
conn = self.setup_db(config, engine)

db_pool = adbapi.ConnectionPool(
config["name"], **config["args"]
)

hs = MockHomeserver(self.hs_config, engine, conn, db_pool)

store = Store(conn, hs)

yield store.runInteraction(
"%s_engine.check_database" % config["name"],
engine.check_database,
)

sqlite_engine = create_engine(sqlite_config)
postgres_engine = create_engine(postgres_config)
return store

self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
self.postgres_store = Store(postgres_db_pool, postgres_engine)
@defer.inlineCallbacks
def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = yield self.postgres_store.has_completed_background_updates()

if not postgres_ready:
# Only say that we're running background updates when there are background
# updates to run.
self.progress.set_state("Running background updates on PostgreSQL")

while not postgres_ready:
yield self.postgres_store.do_next_background_update(100)
postgres_ready = yield (
self.postgres_store.has_completed_background_updates()
)

yield self.postgres_store.execute(postgres_engine.check_database)
@defer.inlineCallbacks
def run(self):
try:
self.sqlite_store = yield self.build_db_store(self.sqlite_config)

# Check if all background updates are done, abort if not.
updates_complete = yield self.sqlite_store.has_completed_background_updates()
if not updates_complete:
sys.stderr.write(
"Pending background updates exist in the SQLite3 database."
" Please start Synapse again and wait until every update has finished"
" before running this script.\n"
)
defer.returnValue(None)

# Step 1. Set up databases.
self.progress.set_state("Preparing SQLite3")
self.setup_db(sqlite_config, sqlite_engine)
self.postgres_store = yield self.build_db_store(
self.hs_config.database_config
)

self.progress.set_state("Preparing PostgreSQL")
self.setup_db(postgres_config, postgres_engine)
yield self.run_background_updates_on_postgres()

self.progress.set_state("Creating port tables")

Expand Down Expand Up @@ -563,6 +633,8 @@ class Porter(object):
def conv(j, col):
if j in bool_cols:
return bool(col)
if isinstance(col, bytes):
return bytearray(col)
elif isinstance(col, string_types) and "\0" in col:
logger.warn(
"DROPPING ROW: NUL value in table %s col %s: %r",
Expand Down Expand Up @@ -926,18 +998,24 @@ if __name__ == "__main__":
},
}

postgres_config = yaml.safe_load(args.postgres_config)
hs_config = yaml.safe_load(args.postgres_config)

if "database" in postgres_config:
postgres_config = postgres_config["database"]
if "database" not in hs_config:
sys.stderr.write("The configuration file must have a 'database' section.\n")
sys.exit(4)

postgres_config = hs_config["database"]

if "name" not in postgres_config:
sys.stderr.write("Malformed database config: no 'name'")
sys.stderr.write("Malformed database config: no 'name'\n")
sys.exit(2)
if postgres_config["name"] != "psycopg2":
sys.stderr.write("Database must use 'psycopg2' connector.")
sys.stderr.write("Database must use the 'psycopg2' connector.\n")
sys.exit(3)

config = HomeServerConfig()
config.parse_config_dict(hs_config, "", "")

def start(stdscr=None):
if stdscr:
progress = CursesProgress(stdscr)
Expand All @@ -946,9 +1024,9 @@ if __name__ == "__main__":

porter = Porter(
sqlite_config=sqlite_config,
postgres_config=postgres_config,
progress=progress,
batch_size=args.batch_size,
hs_config=config,
)

reactor.callWhenRunning(porter.run)
Expand Down

0 comments on commit c97ed64

Please sign in to comment.