Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[td] Dynamic blocks use the new memory enhancements #5106

Merged
merged 31 commits into from
May 26, 2024
Merged
9 changes: 6 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ x-server_settings: &server_settings
- AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY
- AWS_SESSION_TOKEN=$AWS_SESSION_TOKEN
- DEBUG=$DEBUG
- DEBUG_FILE_IO=$DEBUG_FILE_IO
- DEBUG_MEMORY=$DEBUG_MEMORY
- DEUS_EX_MACHINA=$DEUS_EX_MACHINA
- DISABLE_API_TERMINAL_OUTPUT=$DISABLE_API_TERMINAL_OUTPUT
- DISABLE_DATABASE_TERMINAL_OUTPUT=$DISABLE_DATABASE_TERMINAL_OUTPUT
Expand Down Expand Up @@ -37,9 +39,9 @@ x-server_settings: &server_settings
- MAGE_DATA_DIR=$MAGE_DATA_DIR
- MAGE_PRESENTERS_DIRECTORY=$MAGE_PRESENTERS_DIRECTORY
- MAX_NUMBER_OF_FILE_VERSIONS=$MAX_NUMBER_OF_FILE_VERSIONS
- MEMORY_MANAGER_PANDAS_VERSION=1
- MEMORY_MANAGER_POLARS_VERSION=1
- MEMORY_MANAGER_VERSION=1
- MEMORY_MANAGER_PANDAS_VERSION=$MEMORY_MANAGER_PANDAS_VERSION
- MEMORY_MANAGER_POLARS_VERSION=$MEMORY_MANAGER_POLARS_VERSION
- MEMORY_MANAGER_VERSION=$MEMORY_MANAGER_VERSION
- NEW_RELIC_CONFIG_PATH=$NEW_RELIC_CONFIG_PATH
- OPENAI_API_KEY=$OPENAI_API_KEY
- OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT}
Expand All @@ -52,6 +54,7 @@ x-server_settings: &server_settings
- SERVER_VERBOSITY=$SERVER_VERBOSITY
- SMTP_EMAIL=$SMTP_EMAIL
- SMTP_PASSWORD=$SMTP_PASSWORD
- VARIABLE_DATA_OUTPUT_META_CACHE=$VARIABLE_DATA_OUTPUT_META_CACHE

- path_to_keyfile=$GCP_PATH_TO_CREDENTIALS
ports:
Expand Down
57 changes: 40 additions & 17 deletions mage_ai/api/policies/BlockOutputPolicy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,45 @@ class BlockOutputPolicy(BasePolicy):
pass


BlockOutputPolicy.allow_actions([
constants.DETAIL,
], scopes=[
OauthScope.CLIENT_PRIVATE,
], condition=lambda policy: policy.has_at_least_viewer_role())
BlockOutputPolicy.allow_actions(
[
constants.DETAIL,
],
scopes=[
OauthScope.CLIENT_PRIVATE,
],
condition=lambda policy: policy.has_at_least_viewer_role(),
)

BlockOutputPolicy.allow_read(BlockOutputPresenter.default_attributes, scopes=[
OauthScope.CLIENT_PRIVATE,
], on_action=[
constants.DETAIL,
], condition=lambda policy: policy.has_at_least_viewer_role())
BlockOutputPolicy.allow_read(
BlockOutputPresenter.default_attributes,
scopes=[
OauthScope.CLIENT_PRIVATE,
],
on_action=[
constants.DETAIL,
],
condition=lambda policy: policy.has_at_least_viewer_role(),
)

BlockOutputPolicy.allow_query([
'pipeline_uuid',
], scopes=[
OauthScope.CLIENT_PRIVATE,
], on_action=[
constants.DETAIL,
], condition=lambda policy: policy.has_at_least_viewer_role())
BlockOutputPolicy.allow_query(
[
'block_uuid',
'csv_lines_only',
'dynamic_block_index',
'exclude_blank_variable_uuids',
'execution_partition',
'include_print_outputs',
'pipeline_uuid',
'sample',
'sample_count',
'variable_type',
],
scopes=[
OauthScope.CLIENT_PRIVATE,
],
on_action=[
constants.DETAIL,
],
condition=lambda policy: policy.has_at_least_viewer_role(),
)
114 changes: 73 additions & 41 deletions mage_ai/api/policies/OutputPolicy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,76 @@ def entity(self):
return Entity.PROJECT, get_project_uuid()


OutputPolicy.allow_actions([
constants.CREATE,
constants.DETAIL,
constants.LIST,
], scopes=[
OauthScope.CLIENT_PRIVATE,
], condition=lambda policy: policy.has_at_least_viewer_role())

OutputPolicy.allow_read(OutputPresenter.default_attributes + [], scopes=[
OauthScope.CLIENT_PRIVATE,
], on_action=[
constants.CREATE,
constants.DETAIL,
constants.LIST,
], condition=lambda policy: policy.has_at_least_viewer_role())

OutputPolicy.allow_write([
'block_uuid',
'partition',
'persist',
'pipeline_uuid',
'refresh',
'sample_count',
'streams',
], scopes=[
OauthScope.CLIENT_PRIVATE,
], on_action=[
constants.CREATE,
], condition=lambda policy: policy.has_at_least_viewer_role())

OutputPolicy.allow_query([
'block_uuid',
'parent_stream',
'partition',
'sample_count',
'stream',
], scopes=[
OauthScope.CLIENT_PRIVATE,
], on_action=[
constants.DETAIL,
], condition=lambda policy: policy.has_at_least_viewer_role())
OutputPolicy.allow_actions(
[
constants.CREATE,
constants.DETAIL,
constants.LIST,
],
scopes=[
OauthScope.CLIENT_PRIVATE,
],
condition=lambda policy: policy.has_at_least_viewer_role(),
)

OutputPolicy.allow_read(
OutputPresenter.default_attributes + [],
scopes=[
OauthScope.CLIENT_PRIVATE,
],
on_action=[
constants.CREATE,
constants.DETAIL,
constants.LIST,
],
condition=lambda policy: policy.has_at_least_viewer_role(),
)

OutputPolicy.allow_write(
[
'block_uuid',
'partition',
'persist',
'pipeline_uuid',
'refresh',
'sample_count',
'streams',
],
scopes=[
OauthScope.CLIENT_PRIVATE,
],
on_action=[
constants.CREATE,
],
condition=lambda policy: policy.has_at_least_viewer_role(),
)

OutputPolicy.allow_query(
[
'block_uuid',
'parent_stream',
'partition',
'sample_count',
'stream',
],
scopes=[
OauthScope.CLIENT_PRIVATE,
],
on_action=[
constants.DETAIL,
],
condition=lambda policy: policy.has_at_least_viewer_role(),
)

OutputPolicy.allow_query(
[
'sample_count',
],
scopes=[
OauthScope.CLIENT_PRIVATE,
],
on_action=[
constants.LIST,
],
condition=lambda policy: policy.has_at_least_viewer_role(),
)
46 changes: 37 additions & 9 deletions mage_ai/api/resources/BlockOutputResource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from mage_ai.api.errors import ApiError
from mage_ai.api.resources.GenericResource import GenericResource
from mage_ai.data_preparation.models.constants import DATAFRAME_SAMPLE_COUNT
from mage_ai.data_preparation.models.pipeline import Pipeline
from mage_ai.data_preparation.models.variables.constants import VariableType
from mage_ai.orchestration.db import safe_db_query
Expand All @@ -14,30 +15,57 @@ class BlockOutputResource(GenericResource):

@classmethod
@safe_db_query
def member(self, pk, user, **kwargs):
def member(cls, pk, user, **kwargs):
query = kwargs.get('query', {})
block_uuid = pk

query = kwargs.get("query", {})
pipeline_uuid = query.get("pipeline_uuid", [None])
pipeline_uuid = query.get('pipeline_uuid', [None])
if pipeline_uuid:
pipeline_uuid = pipeline_uuid[0]

outputs_query = {}
for key in [
'block_uuid',
'csv_lines_only',
'dynamic_block_index',
'exclude_blank_variable_uuids',
'execution_partition',
'include_print_outputs',
'sample',
'sample_count',
'variable_type',
]:
value = query.get(key, [None])
if value is not None:
value = value[0]
if value is not None:
outputs_query[key] = value

for key, value in [
('exclude_blank_variable_uuids', True),
('include_print_outputs', False),
('sample_count', DATAFRAME_SAMPLE_COUNT),
('variable_type', VariableType.DATAFRAME),
]:
if key not in outputs_query:
outputs_query[key] = value

outputs = []
if pipeline_uuid is not None:
repo_path = get_repo_path(user=user)
pipeline = Pipeline.get(pipeline_uuid, repo_path=repo_path)
block = pipeline.get_block(block_uuid)

error = ApiError.RESOURCE_ERROR.copy()
if block is None:
error.update(
message=f"Block {block_uuid} does not exist in pipeline {pipeline_uuid}"
message=f'Block {block_uuid} does not exist in pipeline {pipeline_uuid}'
)
raise ApiError(error)

# Only fetch dataframe variables by default
outputs = block.get_outputs(
exclude_blank_variable_uuids=True,
include_print_outputs=False,
sample_count=None,
variable_type=VariableType.DATAFRAME,
**outputs_query,
)

return self(dict(outputs=outputs), user, **kwargs)
return cls(dict(outputs=outputs), user, **kwargs)
49 changes: 27 additions & 22 deletions mage_ai/api/resources/BlockRunResource.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,22 @@ def build_result_set(self, arr, user, **kwargs):
started_at = tup.started_at if hasattr(tup, 'started_at') else None
status = tup.status if hasattr(tup, 'status') else None
updated_at = tup.updated_at if hasattr(tup, 'updated_at') else None
pipeline_schedule_id = tup.pipeline_schedule_id if hasattr(
tup, 'pipeline_schedule_id',
) else None
pipeline_schedule_name = tup.pipeline_schedule_name if hasattr(
tup, 'pipeline_schedule_name',
) else None
pipeline_schedule_id = (
tup.pipeline_schedule_id
if hasattr(
tup,
'pipeline_schedule_id',
)
else None
)
pipeline_schedule_name = (
tup.pipeline_schedule_name
if hasattr(
tup,
'pipeline_schedule_name',
)
else None
)
else:
(
block_uuid,
Expand Down Expand Up @@ -98,29 +108,22 @@ def collection(self, query_arg, meta, user, **kwargs):
]

query = (
BlockRun.
select(*columns).
join(b, a.pipeline_run_id == b.id).
join(c, b.pipeline_schedule_id == c.id)
BlockRun.select(*columns)
.join(b, a.pipeline_run_id == b.id)
.join(c, b.pipeline_schedule_id == c.id)
)

pipeline_run_id = query_arg.get('pipeline_run_id', [None])
if pipeline_run_id:
pipeline_run_id = pipeline_run_id[0]
if pipeline_run_id and is_number(pipeline_run_id):
query = (
query.
filter(a.pipeline_run_id == int(pipeline_run_id))
)
query = query.filter(a.pipeline_run_id == int(pipeline_run_id))

pipeline_uuid = query_arg.get('pipeline_uuid', [None])
if pipeline_uuid:
pipeline_uuid = pipeline_uuid[0]
if pipeline_uuid:
query = (
query.
filter(c.pipeline_uuid == pipeline_uuid)
)
query = query.filter(c.pipeline_uuid == pipeline_uuid)

# The order_by value should be an attribute on the BlockRun model.
order_by_arg = query_arg.get('order_by', [None])
Expand All @@ -138,10 +141,12 @@ def collection(self, query_arg, meta, user, **kwargs):
try:
br_col = getattr(a, col)
initial_results = query.order_by(getattr(br_col, asc_desc)())
except (AttributeError):
raise Exception('Block run sort column/query is invalid. The sort column ' +
'must be an attribute of the BlockRun model. The sort direction ' +
'is either "asc" (ascending order) or "desc" (descending order).')
except AttributeError:
raise Exception(
'Block run sort column/query is invalid. The sort column '
+ 'must be an attribute of the BlockRun model. The sort direction '
+ 'is either "asc" (ascending order) or "desc" (descending order).'
)
else:
initial_results = query.order_by(
a.started_at.desc(),
Expand Down
Loading
Loading