Skip to content

Commit

Permalink
Improve deploy performance for very large models (Issue #7262, PR #7278)
Browse files Browse the repository at this point in the history
Pull request opened by the merge tool on behalf of #7278
  • Loading branch information
wouterdb authored and inmantaci committed Mar 4, 2024
1 parent a5d9a04 commit 183d3de
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 154 deletions.
7 changes: 7 additions & 0 deletions changelogs/unreleased/7262-performance.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
description: Improve deploy performance for very large models
issue-nr: 7262
change-type: minor
destination-branches: [master, iso7]
sections:
minor-improvement: "{{description}}"

2 changes: 1 addition & 1 deletion src/inmanta/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
is_time,
)

agent_deploy_interval = Option(
agent_deploy_interval: Option[int | str] = Option(
"config",
"agent-deploy-interval",
0,
Expand Down
28 changes: 15 additions & 13 deletions src/inmanta/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ def get(cls, section: Optional[str] = None, name: Optional[str] = None, default_

@classmethod
def get_for_option(cls, option: "Option[T]") -> T:
raw_value: Optional[str | T] = cls._get_value(option.section, option.name, option.get_default_value())
raw_value: str | T = cls._get_value(option.section, option.name, option.get_default_value())
return option.validate(raw_value)

@classmethod
def _get_value(cls, section: str, name: str, default_value: Optional[T] = None) -> Optional[str | T]:
def _get_value(cls, section: str, name: str, default_value: T) -> str | T:
cfg: ConfigParser = cls.get_instance()
val: Optional[str] = _get_from_env(section, name)
if val is not None:
LOGGER.debug(f"Setting {section}:{name} was set using an environment variable")
else:
val = cfg.get(section, name, fallback=default_value)

return val
return val
# Typing of this method in the sdk is not entirely accurate
# It just returns the fallback, whatever its type
return cfg.get(section, name, fallback=default_value)

@classmethod
def is_set(cls, section: str, name: str) -> bool:
Expand Down Expand Up @@ -205,12 +205,12 @@ def is_float(value: str) -> float:
return float(value)


def is_time(value: str) -> int:
def is_time(value: str | int) -> int:
"""Time, the number of seconds represented as an integer value"""
return int(value)


def is_time_or_cron(value: str) -> Union[int, str]:
def is_time_or_cron(value: str | int) -> Union[int, str]:
"""Time, the number of seconds represented as an integer value or a cron-like expression"""
try:
return is_time(value)
Expand All @@ -232,8 +232,10 @@ def is_bool(value: Union[bool, str]) -> bool:
return boolean_states[value.lower()]


def is_list(value: str) -> list[str]:
def is_list(value: str | list[str]) -> list[str]:
"""List of comma-separated values"""
if isinstance(value, list):
return value
return [] if value == "" else [x.strip() for x in value.split(",")]


Expand Down Expand Up @@ -304,9 +306,9 @@ def __init__(
self,
section: str,
name: str,
default: Union[T, None, Callable[[], T]],
default: Union[T, Callable[[], T]],
documentation: str,
validator: Callable[[Optional[str | T]], T] = is_str,
validator: Callable[[str | T], T] = is_str,
predecessor_option: Optional["Option"] = None,
) -> None:
self.section = section
Expand Down Expand Up @@ -342,10 +344,10 @@ def get_default_desc(self) -> str:
else:
return f"``{defa}``"

def validate(self, value: Optional[str | T]) -> T:
def validate(self, value: str | T) -> T:
return self.validator(value)

def get_default_value(self) -> Optional[T]:
def get_default_value(self) -> T:
defa = self.default
if callable(defa):
return defa()
Expand Down
102 changes: 74 additions & 28 deletions src/inmanta/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4621,6 +4621,40 @@ def convert_or_ignore(rvid):
)
return out

@classmethod
async def set_deployed_multi(
cls,
environment: uuid.UUID,
resource_ids: Sequence[m.ResourceIdStr],
version: int,
connection: Optional[asyncpg.connection.Connection] = None,
) -> None:
query = "UPDATE resource SET status='deployed' WHERE environment=$1 AND model=$2 AND resource_id =ANY($3) "
async with cls.get_connection(connection) as connection:
await connection.execute(query, environment, version, resource_ids)

@classmethod
async def get_resource_ids_with_status(
cls,
environment: uuid.UUID,
resource_version_ids: list[m.ResourceIdStr],
version: int,
statuses: Sequence[const.ResourceState],
lock: Optional[RowLockMode] = None,
connection: Optional[asyncpg.connection.Connection] = None,
) -> list[m.ResourceIdStr]:
query = (
"SELECT resource_id as resource_id FROM resource WHERE "
"environment=$1 AND model=$2 AND status = ANY($3) and resource_id =ANY($4) "
)
if lock:
query += lock.value
async with cls.get_connection(connection) as connection:
return [
m.ResourceIdStr(cast(str, r["resource_id"]))
for r in await connection.fetch(query, environment, version, statuses, resource_version_ids)
]

@classmethod
async def get_undeployable(cls, environment: uuid.UUID, version: int) -> list["Resource"]:
"""
Expand Down Expand Up @@ -4794,29 +4828,42 @@ async def get_resources_for_version_raw_with_persistent_state(
cls,
environment: uuid.UUID,
version: int,
projection: Optional[list[str]],
projection_presistent: Optional[list[str]],
projection: Optional[list[typing.LiteralString]],
projection_presistent: Optional[list[typing.LiteralString]],
project_attributes: Optional[list[typing.LiteralString]] = None,
*,
connection: Optional[Connection] = None,
) -> list[dict[str, object]]:
"""This method performs none of the mangling required to produce valid resources!"""
"""This method performs none of the mangling required to produce valid resources!
project_attributes performs a projection on the json attributes of the resources table
all projections must be disjoint, as they become named fields in the output record
"""

def collect_projection(projection: Optional[list[str]], prefix: str) -> str:
if not projection:
return f"{prefix}.*"
else:
return ",".join(f"{prefix}.{field}" for field in projection)

if project_attributes:
json_projection = "," + ",".join(f"r.attributes->'{v}' as {v}" for v in project_attributes)
else:
json_projection = ""

query = f"""
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')}
SELECT {collect_projection(projection, 'r')}, {collect_projection(projection_presistent, 'ps')} {json_projection}
FROM {cls.table_name()} r JOIN resource_persistent_state ps ON r.resource_id = ps.resource_id
WHERE r.environment=$1 AND ps.environment = $1 and r.model = $2;"""

resource_records = await cls._fetch_query(query, environment, version, connection=connection)
resources = [dict(record) for record in resource_records]
for res in resources:
if "attributes" in res:
res["attributes"] = json.loads(res["attributes"])
if project_attributes:
for k in project_attributes:
if res[k]:
res[k] = json.loads(res[k])
return resources

@classmethod
Expand Down Expand Up @@ -5403,6 +5450,7 @@ async def get_list(
no_obj: Optional[bool] = None,
lock: Optional[RowLockMode] = None,
connection: Optional[asyncpg.connection.Connection] = None,
no_status: bool = False, # don't load the status field
**query: object,
) -> list["ConfigurationModel"]:
# sanitize and validate order parameters
Expand Down Expand Up @@ -5446,14 +5494,21 @@ async def get_list(
{lock_statement}"""
query_result = await cls._fetch_query(query_string, *values, connection=connection)
result = []
for record in query_result:
record = dict(record)
for in_record in query_result:
record = dict(in_record)
if no_obj:
record["status"] = await cls._get_status_field(record["environment"], record["status"])
if no_status:
record["status"] = {}
else:
record["status"] = await cls._get_status_field(record["environment"], record["status"])
result.append(record)
else:
done = record.pop("done")
status = await cls._get_status_field(record["environment"], record.pop("status"))
if no_status:
status = {}
record.pop("status")
else:
status = await cls._get_status_field(record["environment"], record.pop("status"))
obj = cls(from_postgres=True, **record)
obj._done = done
obj._status = status
Expand Down Expand Up @@ -5703,23 +5758,23 @@ async def get_increment(
deployed and different hash -> increment
"""
# Depends on deploying
projection_a_resource = [
projection_a_resource: list[typing.LiteralString] = [
"resource_id",
"attribute_hash",
"attributes",
"status",
]
projection_a_state = [
projection_a_state: list[typing.LiteralString] = [
"last_success",
"last_produced_events",
"last_deployed_attribute_hash",
"last_non_deploying_status",
]
projection = ["resource_id", "status", "attribute_hash"]
projection_a_attributes: list[typing.LiteralString] = ["requires", "send_event"]
projection: list[typing.LiteralString] = ["resource_id", "status", "attribute_hash"]

# get resources for agent
resources = await Resource.get_resources_for_version_raw_with_persistent_state(
environment, version, projection_a_resource, projection_a_state, connection=connection
environment, version, projection_a_resource, projection_a_state, projection_a_attributes, connection=connection
)

# to increment
Expand All @@ -5740,20 +5795,11 @@ async def get_increment(
continue
# Now outstanding events
last_success = resource["last_success"] or DATETIME_MIN_UTC
attributes = resource["attributes"]
assert isinstance(attributes, dict) # mypy
for req in attributes["requires"]:
for req in resource["requires"]:
req_res = id_to_resource[req]
assert req_res is not None # todo
req_res_attributes = req_res["attributes"]
assert isinstance(req_res_attributes, dict) # mypy
last_produced_events = req_res["last_produced_events"]
if (
last_produced_events is not None
and last_produced_events > last_success
and "send_event" in req_res_attributes
and req_res_attributes["send_event"]
):
if last_produced_events is not None and last_produced_events > last_success and req_res["send_event"]:
in_increment = True
break

Expand Down Expand Up @@ -5839,9 +5885,9 @@ async def get_increment(

# build lookup tables
for res in resources:
for req in res["attributes"]["requires"]:
for req in res["requires"]:
original_provides[req].append(res["resource_id"])
if "send_event" in res["attributes"] and res["attributes"]["send_event"]:
if res["send_event"]:
send_events.append(res["resource_id"])

# recursively include stuff potentially receiving events from nodes in the increment
Expand Down
2 changes: 1 addition & 1 deletion src/inmanta/server/agentmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ async def _terminate_agents(self) -> None:
async def _ensure_agents(
self,
env: data.Environment,
agents: list[str],
agents: Sequence[str],
restart: bool = False,
*,
connection: Optional[asyncpg.connection.Connection] = None,
Expand Down
10 changes: 5 additions & 5 deletions src/inmanta/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,18 +244,18 @@ def validate_fact_renew(value: object) -> int:
"server", "purge-resource-action-logs-interval", 3600, "The number of seconds between resource-action log purging", is_time
)

server_resource_action_log_prefix = Option(
server_resource_action_log_prefix: Option[str] = Option(
"server",
"resource_action_log_prefix",
"resource-actions-",
"File prefix in log-dir, containing the resource-action logs. The after the prefix the environment uuid and .log is added",
is_str,
)

server_enabled_extensions = Option(
server_enabled_extensions: Option[list[str]] = Option(
"server",
"enabled_extensions",
"",
list,
"A list of extensions the server must load. Core is always loaded."
"If an extension listed in this list is not available, the server will refuse to start.",
is_list,
Expand All @@ -271,9 +271,9 @@ def validate_fact_renew(value: object) -> int:
)


def default_hangtime() -> str:
def default_hangtime() -> int:
""":inmanta.config:option:`server.agent-timeout` *3/4"""
return str(int(agent_timeout.get() * 3 / 4))
return int(agent_timeout.get() * 3 / 4)


agent_hangtime = Option(
Expand Down

0 comments on commit 183d3de

Please sign in to comment.