Skip to content

Commit

Permalink
[resotocore][feat] Maintain version and run migrations on demand (#1887)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Jan 22, 2024
1 parent 1f0ca94 commit c51e3b7
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 66 deletions.
4 changes: 2 additions & 2 deletions resotocore/resotocore/__main__.py
Expand Up @@ -7,7 +7,7 @@
from argparse import Namespace
from asyncio import Queue
from contextlib import suppress
from datetime import timedelta, datetime, timezone
from datetime import timedelta
from functools import partial
from pathlib import Path
from tempfile import TemporaryDirectory
Expand Down Expand Up @@ -127,7 +127,7 @@ def run_process(args: Namespace) -> None:
cert_handler_no_ca = deps.add(ServiceNames.cert_handler, CertificateHandlerNoCA.lookup(config, temp))
verify: Union[bool, str] = False if args.graphdb_no_ssl_verify else str(cert_handler_no_ca.ca_bundle)
deps.add(ServiceNames.config, evolve(config, run=RunConfig(temp, verify)))
deps.add(ServiceNames.system_data, SystemData("multi-tenant", datetime(2023, 9, 1, tzinfo=timezone.utc), 1))
deps.add(ServiceNames.system_data, SystemData("multi-tenant", utc(), 1))
deps.add(
ServiceNames.event_sender,
PostHogEventSender(deps.system_data) if config.runtime.usage_metrics else NoEventSender(),
Expand Down
13 changes: 9 additions & 4 deletions resotocore/resotocore/core_config.py
@@ -1,10 +1,12 @@
import logging
import os
import re
import subprocess
from argparse import Namespace
from contextlib import suppress
from copy import deepcopy
from datetime import timedelta
from functools import lru_cache
from pathlib import Path
from typing import Optional, List, ClassVar, Dict, Union, cast, Callable

Expand Down Expand Up @@ -39,15 +41,18 @@
GitHashFile = "/usr/local/etc/git-commit.HEAD"


def git_hash_from_file() -> Optional[str]:
@lru_cache(maxsize=1)
def current_git_hash() -> Optional[str]:
"""
Returns the git hash from the file created by the docker build.
In case we do not run inside a docker container, this method returns None.
Returns the git hash either from the file created by the docker build,
or it tries to get it from git directly.
If both fails, it returns None.
"""
with suppress(Exception):
path = Path(GitHashFile)
if path.exists():
return path.read_text("utf-8").strip()
return subprocess.check_output(["git", "rev-parse", "HEAD"]).strip().decode("utf-8")
return None


Expand All @@ -60,7 +65,7 @@ def inside_docker() -> bool:
os.environ.get("INSIDE_DOCKER", "false").lower() in ("true", "yes", "1")
or os.environ.get("INSIDE_KUBERNETES", "false").lower() in ("true", "yes", "1")
# this file is available in the created docker container
or git_hash_from_file() is not None
or Path(GitHashFile).exists()
)


Expand Down
3 changes: 3 additions & 0 deletions resotocore/resotocore/db/__init__.py
@@ -1,3 +1,5 @@
from typing import Optional

from attrs import define
from datetime import datetime
from enum import Enum
Expand All @@ -10,6 +12,7 @@ class SystemData:
system_id: str
created_at: datetime
db_version: int
version: Optional[str] = None


class EstimatedQueryCostRating(Enum):
Expand Down
140 changes: 89 additions & 51 deletions resotocore/resotocore/db/db_access.py
Expand Up @@ -12,7 +12,7 @@

from resotocore.analytics import AnalyticsEventSender
from resotocore.async_extensions import run_async
from resotocore.core_config import CoreConfig
from resotocore.core_config import CoreConfig, current_git_hash
from resotocore.db import SystemData
from resotocore.db.arangodb_extensions import ArangoHTTPClient
from resotocore.db.async_arangodb import AsyncArangoDB
Expand Down Expand Up @@ -82,7 +82,24 @@ def __init__(
self.cleaner = Periodic("outdated_updates_cleaner", self.check_outdated_updates, timedelta(seconds=60))

async def start(self) -> None:
if not self.config.multi_tenant_setup:
await self.__migrate()
await self.cleaner.start()

async def stop(self) -> None:
await self.cleaner.stop()

async def __migrate(self) -> None:
try:
system_data = await self.system_data_db.system_data()
except Exception:
system_data = None
if not await self.db.has_collection("system_data"): # make sure the system data collection exists
await self.db.create_collection("system_data")
if system_data is None: # in case no version is available, create a genesis version
system_data = SystemData(uuid_str(), utc(), 1)
git_hash = current_git_hash()
if system_data.version is None or git_hash is None or git_hash != system_data.version:
log.info(f"Version change detected. Running migrations. {system_data.version} -> {git_hash}")
await self.running_task_db.create_update_schema()
await self.job_db.create_update_schema()
await self.config_entity_db.create_update_schema()
Expand All @@ -105,11 +122,14 @@ async def start(self) -> None:
log.info(f"Found graph: {graph_name}")
db = self.get_graph_db(graph_name)
await db.create_update_schema()
await self.get_graph_model_db(graph_name)
await self.cleaner.start()

async def stop(self) -> None:
await self.cleaner.stop()
em = await self.get_graph_model_db(graph_name)
await em.create_update_schema()
if git_hash is not None:
# update the system data version to not migrate the next time
system_data.version = git_hash
await self.system_data_db.update_system_data(system_data)
else:
log.warning("No git_hash found - will always update the database schema on startup.")

def graph_model_name(self, graph_name: GraphName) -> str:
return f"{graph_name}_model"
Expand All @@ -118,8 +138,12 @@ async def create_graph(self, name: GraphName, validate_name: bool = True) -> Gra
if validate_name:
check_graph_name(name)

# create the graph in the database
db = self.get_graph_db(name, no_check=True)
await db.create_update_schema()
# also create the related model database
model = await self.get_graph_model_db(name)
await model.create_update_schema()
return db

async def delete_graph(self, name: GraphName) -> None:
Expand Down Expand Up @@ -168,7 +192,6 @@ async def get_graph_model_db(self, graph_name: GraphName) -> ModelDb:
else:
model_name = self.graph_model_name(graph_name)
db = EventEntityDb(model_db(self.db, model_name), self.event_sender, model_name)
await db.create_update_schema()
self.graph_model_dbs[graph_name] = db
return db

Expand All @@ -182,6 +205,55 @@ async def check_outdated_updates(self) -> None:
log.warning(f"Given update is too old: {batch_id}. Will abort the update.")
await db.abort_update(batch_id)

@classmethod
def create_database(
cls,
*,
server: str,
username: str,
password: str,
database: str,
root_password: str,
request_timeout: int,
secure_root: bool,
) -> None:
log.info(f"Create new database {database} for user {username} on server {server}.")
try:
# try to access the system database with given credentials.
http_client = ArangoHTTPClient(request_timeout, False)
root_db = ArangoClient(hosts=server, http_client=http_client).db(password=root_password)
root_db.echo() # this call will fail if we are not allowed to access the system db
user = username
change = False
if not root_db.has_user(user):
log.info("Configured graph db user does not exist. Create it.")
root_db.create_user(user, password, active=True)
change = True
if not root_db.has_database(database):
log.info("Configured graph db database does not exist. Create it.")
root_db.create_database(
database,
[{"username": user, "password": password, "active": True, "extra": {"generated": "resoto"}}],
)
change = True
if change and secure_root and root_password == "" and password != "" and password not in {"test"}:
root_db.replace_user("root", password, True)
log.info(
"Database is using an empty password. "
"Secure the root account with the provided user password. "
"Login to the Resoto database via provided username and password. "
"Login to the System database via `root` and provided password!"
)
if not change:
log.info("Not allowed to access database, while user and database exist. Wrong password?")
except Exception as ex:
log.error(
"Database or user does not exist or does not have enough permissions. "
f"Attempt to create user/database via default system account is not possible. Reason: {ex}. "
"You can provide the password of the root user via --graphdb-root-password to setup "
"a Resoto user and database automatically."
)

# Only used during startup.
# Note: this call uses sleep and will block the current executing thread!
@classmethod
Expand All @@ -191,48 +263,6 @@ def connect(
deadline = utc() + timeout
db = cls.client(args, verify)

def create_database() -> None:
try:
# try to access the system database with default credentials.
# this only works if arango has been started with default settings.
http_client = ArangoHTTPClient(args.graphdb_request_timeout, False)
root_pw = args.graphdb_root_password
secure_root = not args.graphdb_bootstrap_do_not_secure
root_db = ArangoClient(hosts=args.graphdb_server, http_client=http_client).db(password=root_pw)
root_db.echo() # this call will fail, if we are not allowed to access the system db
user = args.graphdb_username
passwd = args.graphdb_password
database = args.graphdb_database
change = False
if not root_db.has_user(user):
log.info("Configured graph db user does not exist. Create it.")
root_db.create_user(user, passwd, active=True)
change = True
if not root_db.has_database(database):
log.info("Configured graph db database does not exist. Create it.")
root_db.create_database(
database,
[{"username": user, "password": passwd, "active": True, "extra": {"generated": "resoto"}}],
)
change = True
if change and secure_root and root_pw == "" and passwd != "" and passwd not in {"test"}:
root_db.replace_user("root", passwd, True)
log.info(
"Database is using an empty password. "
"Secure the root account with the provided user password. "
"Login to the Resoto database via provided username and password. "
"Login to the System database via `root` and provided password!"
)
if not change:
log.info("Not allowed to access database, while user and database exist. Wrong password?")
except Exception as ex:
log.error(
"Database or user does not exist or does not have enough permissions. "
f"Attempt to create user/database via default system account is not possible. Reason: {ex}. "
"You can provide the password of the root user via --graphdb-root-password to setup "
"a Resoto user and database automatically."
)

def system_data() -> Tuple[bool, SystemData]:
def insert_system_data() -> SystemData:
system = SystemData(uuid_str(), utc(), 1)
Expand Down Expand Up @@ -268,7 +298,15 @@ def insert_system_data() -> SystemData:
# This means we can reach the database, but are either not allowed to access it
# or the related user and or database could not be found.
# We assume the database does not exist and try to create it.
create_database()
cls.create_database(
server=args.graphdb_server,
username=args.graphdb_username,
password=args.graphdb_password,
database=args.graphdb_database,
root_password=args.graphdb_root_password,
request_timeout=args.graphdb_request_timeout,
secure_root=not args.graphdb_bootstrap_do_not_secure,
)
else:
log.warning(f"Problem accessing the graph database: {ex}. Trying again in 5 seconds.")
# Retry directly after the first attempt
Expand Down
12 changes: 11 additions & 1 deletion resotocore/resotocore/db/system_data_db.py
Expand Up @@ -3,7 +3,7 @@

from resotocore.db import SystemData, drop_arango_props
from resotocore.db.async_arangodb import AsyncArangoDB
from resotocore.model.typed_model import from_js
from resotocore.model.typed_model import from_js, to_js
from resotocore.util import if_set
from resotolib.x509 import bootstrap_ca, key_to_bytes, cert_to_bytes

Expand Down Expand Up @@ -64,3 +64,13 @@ async def update_info(self, **kwargs: str) -> Dict[str, str]:
self.collection_name, kwargs, return_new=True, overwrite=True, overwrite_mode="update", merge=True
)
return drop_arango_props(doc["new"]) # type: ignore

async def update_system_data(self, data: SystemData) -> SystemData:
doc = await self.db.insert(
self.collection_name,
dict(_key="system", **to_js(data)),
return_new=True,
overwrite=True,
overwrite_mode="replace",
)
return from_js(doc["new"], SystemData) # type: ignore
24 changes: 23 additions & 1 deletion resotocore/resotocore/dependencies.py
Expand Up @@ -13,6 +13,7 @@

from aiohttp import ClientSession, TCPConnector
from aiohttp.web import Request
from arango import ArangoServerError
from arango.client import ArangoClient
from arango.database import StandardDatabase
from attr import define
Expand Down Expand Up @@ -350,6 +351,7 @@ class GraphDbAccess:
database: str
username: str
password: str
create_database: bool = False

def is_valid(self) -> bool:
return bool(self.server and self.database and self.username)
Expand Down Expand Up @@ -381,6 +383,7 @@ async def dependencies(self, request: Request) -> TenantDependencies:
request.headers.get("FixGraphDbDatabase", ""),
request.headers.get("FixGraphDbUsername", ""),
request.headers.get("FixGraphDbPassword", ""),
request.headers.get("FixGraphDbCreateDatabase", "false").lower() == "true",
)
if not db_access.is_valid():
raise ValueError("Invalid graph db access data provided for multi tenant requests!")
Expand All @@ -399,7 +402,26 @@ def standard_database() -> StandardDatabase:
http_client = ArangoHTTPClient(args.graphdb_request_timeout, verify=dp.config.run.verify)
client = ArangoClient(hosts=access.server, http_client=http_client)
deps.register_on_stop_callback(client.close)
return client.db(name=access.database, username=access.username, password=access.password)
tdb = client.db(name=access.database, username=access.username, password=access.password)
# create database if requested
if access.create_database:
try:
tdb.echo()
log.warning(f"Tenant: {tenant_hash}: Create database requested but it already exists!")
except ArangoServerError as ex:
if ex.error_code in (11, 1228, 1703):
DbAccess.create_database(
server=access.server,
database=access.database,
username=access.username,
password=access.password,
root_password=args.graphdb_root_password,
request_timeout=args.graphdb_request_timeout,
secure_root=False,
)
else:
raise
return tdb

# direct db access
sdb = deps.add(ServiceNames.system_database, await run_async(standard_database))
Expand Down
2 changes: 2 additions & 0 deletions resotocore/resotocore/graph_manager/graph_manager.py
Expand Up @@ -174,6 +174,7 @@ async def _copy_graph(

source_model_db = await self.db_access.get_graph_model_db(source)
destination_model_db = await self.db_access.get_graph_model_db(destination)
await destination_model_db.create_update_schema()

model_kinds = [kind async for kind in source_model_db.all()]
await destination_model_db.update_many(model_kinds)
Expand Down Expand Up @@ -288,6 +289,7 @@ async def import_graph_model(data: AsyncIterator[str]) -> None:
position += 1

graph_model_db = await self.db_access.get_graph_model_db(graph_name)
await graph_model_db.create_update_schema()
await graph_model_db.update_many(from_js(kinds, List[Kind]))

# import the data into the temp graph
Expand Down
4 changes: 2 additions & 2 deletions resotocore/resotocore/system_start.py
Expand Up @@ -14,7 +14,7 @@

from resotocore import async_extensions, version
from resotocore.analytics import AnalyticsEventSender
from resotocore.core_config import CoreConfig, parse_config, git_hash_from_file, inside_docker
from resotocore.core_config import CoreConfig, parse_config, current_git_hash, inside_docker
from resotocore.db.db_access import DbAccess
from resotocore.model.adjust_node import DirectAdjuster
from resotocore.types import JsonElement
Expand Down Expand Up @@ -46,7 +46,7 @@ def system_info() -> SystemInfo:
mem = psutil.virtual_memory()
return SystemInfo(
version=version(),
git_hash=git_hash_from_file() or "n/a",
git_hash=current_git_hash() or None,
cpus=mp.cpu_count(),
mem_available=iec_size_format(mem.available),
mem_total=iec_size_format(mem.total),
Expand Down

0 comments on commit c51e3b7

Please sign in to comment.