Skip to content

Commit 467a663

Browse files
authored
[core][feat] Use database locks to perform migration (#2184)
1 parent c3a1bdd commit 467a663

File tree

8 files changed

+225
-41
lines changed

8 files changed

+225
-41
lines changed

fixcore/fixcore/__main__.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@
3737
parse_config,
3838
CoreConfig,
3939
)
40-
from fixcore.db import SystemData
41-
from fixcore.db.db_access import DbAccess, CurrentDatabaseVersion
40+
from fixcore.db.db_access import DbAccess
4241
from fixcore.db.system_data_db import EphemeralJwtSigningKey
4342
from fixcore.dependencies import Dependencies, ServiceNames, TenantDependencies
4443
from fixcore.dependencies import (
@@ -129,12 +128,7 @@ def run_process(args: Namespace) -> None:
129128
cert_handler_no_ca = deps.add(ServiceNames.cert_handler, CertificateHandlerNoCA.lookup(config, temp))
130129
verify: Union[bool, str] = False if args.graphdb_no_ssl_verify else str(cert_handler_no_ca.ca_bundle)
131130
deps.add(ServiceNames.config, evolve(config, run=RunConfig(temp, verify)))
132-
deps.add(ServiceNames.system_data, SystemData("multi-tenant", utc(), CurrentDatabaseVersion))
133-
deps.add(
134-
ServiceNames.event_sender,
135-
PostHogEventSender(deps.system_data) if config.runtime.usage_metrics else NoEventSender(),
136-
)
137-
131+
deps.add(ServiceNames.event_sender, NoEventSender())
138132
provider: TenantDependencyProvider = deps.add(
139133
ServiceNames.tenant_dependency_provider, FromRequestTenantDependencyProvider(deps)
140134
)
@@ -172,6 +166,9 @@ async def direct_tenant(deps: TenantDependencies) -> None:
172166
config = deps.config
173167
event_sender = deps.event_sender
174168
db = deps.service(ServiceNames.db_access, DbAccess)
169+
# migrate the database to latest schema
170+
db_change = await db.migrate()
171+
deps.add(ServiceNames.system_data, db_change.current)
175172
message_bus = deps.add(ServiceNames.message_bus, MessageBus())
176173
scheduler = deps.add(ServiceNames.scheduler, APScheduler() if not config.args.no_scheduling else NoScheduler())
177174
model = deps.add(ServiceNames.model_handler, ModelHandlerDB(db, config.runtime.plantuml_server))
@@ -202,7 +199,9 @@ async def direct_tenant(deps: TenantDependencies) -> None:
202199
subscriptions = deps.add(ServiceNames.subscription_handler, SubscriptionHandlerService(message_bus))
203200
core_config_handler = deps.add(
204201
ServiceNames.core_config_handler,
205-
CoreConfigHandler(config, message_bus, worker_task_queue, config_handler, event_sender, inspector),
202+
CoreConfigHandler(
203+
config, message_bus, worker_task_queue, config_handler, event_sender, inspector, db_change, db.lock_db
204+
),
206205
)
207206
deps.add(ServiceNames.infra_apps_runtime, LocalfixcoreAppRuntime(cli))
208207
deps.add(

fixcore/fixcore/config/core_config_handler.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
from contextlib import suppress
55
from functools import partial
66
from typing import Optional, List, Callable, Awaitable
7-
from apscheduler.triggers.cron import CronTrigger
87

98
import yaml
9+
from apscheduler.triggers.cron import CronTrigger
1010

1111
from fixcore.analytics import AnalyticsEventSender, CoreEvent
1212
from fixcore.config import ConfigHandler, ConfigEntity, ConfigValidation
@@ -25,14 +25,16 @@
2525
config_model as core_config_model,
2626
migrate_command_config,
2727
)
28-
from fixcore.system_start import empty_config
28+
from fixcore.db import DatabaseChange
29+
from fixcore.db.lockdb import LockDB
2930
from fixcore.ids import SubscriberId, WorkerId, ConfigId
3031
from fixcore.message_bus import MessageBus, CoreMessage
3132
from fixcore.model.model import Kind
3233
from fixcore.model.typed_model import from_js
3334
from fixcore.report import FixReportBenchmark, FixReportCheck, Inspector, BenchmarkConfigRoot, CheckConfigRoot
3435
from fixcore.report.report_config import config_model as report_config_model
3536
from fixcore.service import Service
37+
from fixcore.system_start import empty_config
3638
from fixcore.types import Json
3739
from fixcore.user import config_model as user_config_model, UsersConfigId, FixInventoryUsersConfig
3840
from fixcore.util import deep_merge, restart_service, value_in_path, value_in_path_get
@@ -50,6 +52,8 @@ def __init__(
5052
config_handler: ConfigHandler,
5153
event_sender: AnalyticsEventSender,
5254
inspector: Inspector,
55+
db_change: DatabaseChange,
56+
lock_db: LockDB,
5357
exit_fn: Callable[[], None] = partial(restart_service, "fixcore config changed."),
5458
):
5559
super().__init__()
@@ -61,6 +65,8 @@ def __init__(
6165
self.config_handler = config_handler
6266
self.event_sender = event_sender
6367
self.inspector = inspector
68+
self.db_change = db_change
69+
self.lock_db = lock_db
6470
self.exit_fn = exit_fn
6571
self.config_updated_callbacks: List[Callable[[ConfigId], Awaitable[None]]] = []
6672

@@ -250,9 +256,10 @@ async def __update_model(self) -> None:
250256
log.error(f"Could not update fix core config model: {ex}", exc_info=ex)
251257

252258
async def start(self) -> None:
253-
if not self.config.multi_tenant_setup:
254-
await self.__update_model()
255-
await self.__update_config()
259+
if self.db_change.has_changed() and not self.config.multi_tenant_setup:
260+
async with self.lock_db.lock("fixcore.config.update"):
261+
await self.__update_model()
262+
await self.__update_config()
256263
self.config_updated_listener = asyncio.create_task(self.__handle_events())
257264
self.config_validator = asyncio.create_task(self.__validate_config())
258265

fixcore/fixcore/db/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
from datetime import datetime
55
from enum import Enum
66

7+
from fixcore.core_config import current_git_hash
78
from fixcore.types import Json
89

10+
MigrateAlways = False
11+
CurrentDatabaseVersion = 2
12+
913

1014
@define
1115
class SystemData:
@@ -14,6 +18,29 @@ class SystemData:
1418
db_version: int
1519
version: Optional[str] = None
1620

21+
def detect_change(self) -> bool:
22+
git_hash = current_git_hash()
23+
return (
24+
MigrateAlways
25+
or self.db_version != CurrentDatabaseVersion
26+
or self.version is None
27+
or git_hash is None
28+
or git_hash != self.version
29+
)
30+
31+
32+
@define
33+
class DatabaseChange:
34+
previous: Optional[SystemData]
35+
current: SystemData
36+
37+
def has_changed(self) -> bool:
38+
return (
39+
self.previous is None
40+
or self.previous.version != self.current.version
41+
or self.previous.db_version != self.current.db_version
42+
)
43+
1744

1845
class EstimatedQueryCostRating(Enum):
1946
simple = 1

fixcore/fixcore/db/db_access.py

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,22 @@
1010
from arango import ArangoServerError
1111
from arango.client import ArangoClient
1212
from arango.database import StandardDatabase
13-
from attr import frozen
13+
from attr import frozen, evolve
1414
from dateutil.parser import parse
1515
from requests.exceptions import RequestException
1616

1717
from fixcore.analytics import AnalyticsEventSender
1818
from fixcore.async_extensions import run_async
1919
from fixcore.core_config import CoreConfig, current_git_hash
20-
from fixcore.db import SystemData
20+
from fixcore.db import SystemData, DatabaseChange
2121
from fixcore.db.arangodb_extensions import ArangoHTTPClient
2222
from fixcore.db.async_arangodb import AsyncArangoDB, AsyncCursor
2323
from fixcore.db.configdb import config_entity_db, config_validation_entity_db
2424
from fixcore.db.deferredouteredgedb import deferred_outer_edge_db
2525
from fixcore.db.entitydb import EventEntityDb
2626
from fixcore.db.graphdb import ArangoGraphDB, GraphDB, EventGraphDB
2727
from fixcore.db.jobdb import job_db
28+
from fixcore.db.lockdb import LockDB
2829
from fixcore.db.modeldb import ModelDb, model_db
2930
from fixcore.db.packagedb import app_package_entity_db
3031
from fixcore.db.reportdb import report_check_db, benchmark_db
@@ -76,6 +77,7 @@ def __init__(
7677
time_series: str = "ts",
7778
report_checks: str = "report_checks",
7879
benchmarks: str = "report_benchmarks",
80+
locks: str = "locks",
7981
):
8082
super().__init__()
8183
self.event_sender = event_sender
@@ -95,34 +97,28 @@ def __init__(
9597
self.report_check_db = report_check_db(self.db, report_checks)
9698
self.benchmark_db = benchmark_db(self.db, benchmarks)
9799
self.time_series_db = TimeSeriesDB(self.db, time_series, config)
100+
self.lock_db = LockDB(self.db, locks)
98101
self.graph_dbs: Dict[str, GraphDB] = {}
99102
self.config = config
100103
self.cleaner = Periodic("outdated_updates_cleaner", self.check_outdated_updates, timedelta(seconds=60))
101104

102105
async def start(self) -> None:
103-
await self.__migrate()
104106
await self.cleaner.start()
105107

106108
async def stop(self) -> None:
107109
await self.cleaner.stop()
108110

109-
async def __migrate(self) -> None:
110-
try:
111-
system_data = await self.system_data_db.system_data()
112-
except Exception:
113-
system_data = None
114-
if not await self.db.has_collection("system_data"): # make sure the system data collection exists
115-
await self.db.create_collection("system_data")
116-
if system_data is None: # in case no version is available, create a genesis version
117-
system_data = SystemData(uuid_str(), utc(), CurrentDatabaseVersion)
118-
git_hash = current_git_hash()
119-
if (
120-
MigrateAlways
121-
or system_data.db_version != CurrentDatabaseVersion
122-
or system_data.version is None
123-
or git_hash is None
124-
or git_hash != system_data.version
125-
):
111+
async def migrate(self) -> DatabaseChange:
112+
async def sys_data() -> Optional[SystemData]:
113+
try:
114+
return await self.system_data_db.system_data()
115+
except Exception:
116+
if not await self.db.has_collection("system_data"): # make sure the system data collection exists
117+
await self.db.create_collection("system_data")
118+
return None
119+
120+
async def do_migrate(system_data: SystemData) -> SystemData:
121+
git_hash = current_git_hash()
126122
# check if we need to run a migration
127123
if system_data.db_version < CurrentDatabaseVersion:
128124
log.info(f"Database migration required: db={system_data.db_version} -> latest={CurrentDatabaseVersion}")
@@ -131,7 +127,7 @@ async def __migrate(self) -> None:
131127
for version in range(system_data.db_version, CurrentDatabaseVersion):
132128
log.info(f"Running migration {version} -> {version + 1}")
133129
await migrations[version]()
134-
system_data.db_version = CurrentDatabaseVersion
130+
system_data = evolve(system_data, db_version=CurrentDatabaseVersion)
135131
await self.system_data_db.update_system_data(system_data)
136132

137133
# will be executed on every git change
@@ -159,10 +155,22 @@ async def __migrate(self) -> None:
159155
await em.create_update_schema()
160156

161157
# update the system data version to the current git hash
162-
system_data.version = git_hash
163-
158+
system_data = evolve(system_data, version=git_hash)
164159
# update the system data version to not migrate the next time
165160
await self.system_data_db.update_system_data(system_data)
161+
return system_data
162+
163+
existing = await sys_data()
164+
if existing is None or existing.detect_change():
165+
# create `lock` collection to be able to create database locks
166+
await self.lock_db.create_update_schema()
167+
async with self.lock_db.lock("db_access.migrate"):
168+
# lookup again with an exclusive database lock
169+
existing = await sys_data()
170+
if existing is None or existing.detect_change():
171+
updated = await do_migrate(existing or SystemData(uuid_str(), utc(), CurrentDatabaseVersion))
172+
return DatabaseChange(existing, updated)
173+
return DatabaseChange(existing, existing)
166174

167175
async def __migrate_v1_to_v2(self) -> None:
168176
def migrate_config(old_id: str, old_root: str, config: Json) -> Json:

fixcore/fixcore/db/lockdb.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import asyncio
2+
import logging
3+
from contextlib import asynccontextmanager
4+
from datetime import timedelta
5+
from typing import cast, List, AsyncIterator
6+
7+
from arango import CollectionCreateError
8+
9+
from fixcore.db.async_arangodb import AsyncArangoDB
10+
from fixcore.types import Json
11+
from fixcore.util import utc
12+
13+
log = logging.getLogger(__name__)
14+
15+
16+
class LockAcquisitionError(Exception):
17+
pass
18+
19+
20+
class LockDB:
21+
def __init__(self, db: AsyncArangoDB, collection_name: str) -> None:
22+
self.db = db
23+
self.collection_name = collection_name
24+
25+
@asynccontextmanager
26+
async def lock(
27+
self,
28+
name: str,
29+
*,
30+
get_lock: timedelta = timedelta(seconds=60), # how long to try to get the lock
31+
lock_for: timedelta = timedelta(seconds=60), # acquired: how long to hold the lock max
32+
retry_interval: timedelta = timedelta(seconds=0.1), # how long to wait between retries
33+
) -> AsyncIterator[None]:
34+
now = utc()
35+
ttl = int((now + lock_for).timestamp())
36+
deadline = now + get_lock
37+
lock_acquired = False
38+
while not lock_acquired and utc() < deadline:
39+
try:
40+
# try to insert a document with key __lock__
41+
await self.db.insert(self.collection_name, dict(_key=name, expires=ttl), sync=True)
42+
except Exception:
43+
# could not insert the document. Wait and try again
44+
await asyncio.sleep(retry_interval.total_seconds())
45+
continue
46+
lock_acquired = True
47+
try:
48+
yield
49+
finally:
50+
await self.db.delete(self.collection_name, name, ignore_missing=True)
51+
if not lock_acquired:
52+
raise LockAcquisitionError(f"Could not acquire lock {name}")
53+
54+
async def create_update_schema(self) -> None:
55+
if not await self.db.has_collection(self.collection_name):
56+
try:
57+
await self.db.create_collection(self.collection_name)
58+
except CollectionCreateError as ex:
59+
if ex.error_code != 1207: # already exists
60+
raise
61+
collection = self.db.collection(self.collection_name)
62+
indexes = {idx["name"]: idx for idx in cast(List[Json], collection.indexes())}
63+
if "ttl" not in indexes:
64+
try:
65+
# The ttl expiry is only a safeguard - if the process is not able to clean up requested locks
66+
collection.add_ttl_index(["expires"], expiry_time=0, name="ttl")
67+
except Exception as ex:
68+
log.info(f"Could not create TTL index for lock collection: {ex}")

fixcore/fixcore/dependencies.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,8 @@ def standard_database() -> StandardDatabase:
446446
# direct db access
447447
sdb = deps.add(ServiceNames.system_database, await run_async(standard_database))
448448
db = deps.add(ServiceNames.db_access, DbAccess(sdb, dp.event_sender, NoAdjust(), config))
449+
db_change = await db.migrate()
450+
deps.add(ServiceNames.system_data, db_change.current)
449451
# no scheduler required in multi-tenant mode
450452
scheduler = deps.add(ServiceNames.scheduler, NoScheduler())
451453
# all tenants use the same model (derived from code)
@@ -473,7 +475,9 @@ def standard_database() -> StandardDatabase:
473475
subscriptions = deps.add(ServiceNames.subscription_handler, NoSubscriptionHandler())
474476
core_config_handler = deps.add(
475477
ServiceNames.core_config_handler,
476-
CoreConfigHandler(config, message_bus, worker_task_queue, config_handler, event_sender, inspector),
478+
CoreConfigHandler(
479+
config, message_bus, worker_task_queue, config_handler, event_sender, inspector, db_change, db.lock_db
480+
),
477481
)
478482
# Enable package manager and runtime for infra apps when required
479483
# deps.add(ServiceNames.infra_apps_runtime, LocalfixcoreAppRuntime(cli))

0 commit comments

Comments
 (0)