From 5dbe851e1cc63eacbe93a6686d9fe15ba6d1965c Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Fri, 23 May 2025 11:43:34 -0500 Subject: [PATCH 1/9] Add incremental export commands --- singlestoredb/fusion/handlers/export.py | 249 ++++++++++++++++++------ singlestoredb/management/export.py | 73 ++++++- 2 files changed, 259 insertions(+), 63 deletions(-) diff --git a/singlestoredb/fusion/handlers/export.py b/singlestoredb/fusion/handlers/export.py index ff0fe1e3a..c2458f2e4 100644 --- a/singlestoredb/fusion/handlers/export.py +++ b/singlestoredb/fusion/handlers/export.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import datetime import json from typing import Any from typing import Dict @@ -104,7 +105,99 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: CreateClusterIdentity.register(overwrite=True) -class CreateExport(SQLHandler): +def _start_export(params: Dict[str, Any]) -> Optional[FusionSQLResult]: + # From table + if isinstance(params['from_table'], str): + from_database = None + from_table = params['from_table'] + else: + from_database, from_table = params['from_table'] + + # Catalog + catalog_config = json.loads(params['catalog'].get('catalog_config', '{}') or '{}') + catalog_creds = json.loads(params['catalog'].get('catalog_creds', '{}') or '{}') + + # Storage + storage_config = json.loads(params['storage'].get('link_config', '{}') or '{}') + storage_creds = json.loads(params['storage'].get('link_creds', '{}') or '{}') + + storage_config['provider'] = 'S3' + + wsg = get_workspace_group({}) + + if from_database is None: + raise ValueError('database name must be specified for source table') + + if wsg._manager is None: + raise TypeError('no workspace manager is associated with workspace group') + + partition_by = [] + if params['partition_by']: + for key in params['partition_by']: + transform = key['partition_key']['transform']['col_transform'] + part = {} + part['transform'] = transform[0].lower() + part['name'] = transform[-1]['transform_col'] + partition_by.append(part) + + order_by = [] + if params['order_by'] and params['order_by']['by']: + for key in params['order_by']['by']: + transform = key['transform']['col_transform'] + order = {} + order['transform'] = transform[0].lower() + order['name'] = transform[-1]['transform_col'] + order['direction'] = 'ascending' + order['null_order'] = 'nulls_first' + if key.get('direction'): + if 'desc' in key['direction'].lower(): + order['direction'] = 'descending' + if key.get('null_order'): + if 'last' in key['null_order'].lower(): + order['null_order'] = 'nulls_last' + order_by.append(order) + + # Refresh interval + refresh_interval_delta = None + refresh_interval = params.get('refresh_interval', None) + if refresh_interval is not None: + value = int(refresh_interval['refresh_interval_value']) + time_unit = refresh_interval['refresh_interval_time_unit'] + if value < 0: + raise ValueError('refresh interval must be greater than 0') + if time_unit == 'SECONDS': + refresh_interval_delta = datetime.timedelta(seconds=int(value)) + elif time_unit == 'MINUTES': + refresh_interval_delta = datetime.timedelta(minutes=int(value)) + elif time_unit == 'HOURS': + refresh_interval_delta = datetime.timedelta(hours=int(value)) + elif time_unit == 'DAYS': + refresh_interval_delta = datetime.timedelta(days=int(value)) + else: + raise ValueError('invalid refresh interval time unit') + + out = ExportService( + wsg, + from_database, + from_table, + dict(**catalog_config, **catalog_creds), + dict(**storage_config, **storage_creds), + columns=None, + partition_by=partition_by or None, + order_by=order_by or None, + properties=json.loads(params['properties']) if params['properties'] else None, + incremental=params.get('incremental', False), + refresh_interval=refresh_interval_delta, + ).start() + + res = FusionSQLResult() + res.add_field('ExportID', result.STRING) + res.set_rows([(out.export_id,)]) + + return res + + +class StartExport(SQLHandler): """ START EXPORT from_table @@ -150,7 +243,7 @@ class CreateExport(SQLHandler): Description ----------- - Create an export configuration. + Start an export. Arguments --------- @@ -180,7 +273,6 @@ class CreateExport(SQLHandler): LINK S3 CONFIG '{ "region": "us-east-1", "endpoint_url": "s3://bucket-name" - }' ; @@ -189,77 +281,110 @@ class CreateExport(SQLHandler): _enabled = False def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: - # From table - if isinstance(params['from_table'], str): - from_database = None - from_table = params['from_table'] - else: - from_database, from_table = params['from_table'] + return _start_export(params) - # Catalog - catalog_config = json.loads(params['catalog'].get('catalog_config', '{}') or '{}') - catalog_creds = json.loads(params['catalog'].get('catalog_creds', '{}') or '{}') - # Storage - storage_config = json.loads(params['storage'].get('link_config', '{}') or '{}') - storage_creds = json.loads(params['storage'].get('link_creds', '{}') or '{}') +StartExport.register(overwrite=True) - storage_config['provider'] = 'S3' - wsg = get_workspace_group({}) +class StartIncrementalExport(SQLHandler): + """ + START INCREMENTAL EXPORT + from_table + catalog + storage + [ partition_by ] + [ order_by ] + [ properties ] + [ refresh_interval ] + ; - if from_database is None: - raise ValueError('database name must be specified for source table') + # From table + from_table = FROM - if wsg._manager is None: - raise TypeError('no workspace manager is associated with workspace group') + # Transforms + _col_transform = { VOID | IDENTITY | YEAR | MONTH | DAY | HOUR } ( _transform_col ) + _transform_col = + _arg_transform = { BUCKET | TRUNCATE } ( _transform_col _transform_arg ) + _transform_arg = + transform = { _col_transform | _arg_transform } - partition_by = [] - if params['partition_by']: - for key in params['partition_by']: - transform = key['partition_key']['transform']['col_transform'] - part = {} - part['transform'] = transform[0].lower() - part['name'] = transform[-1]['transform_col'] - partition_by.append(part) - - order_by = [] - if params['order_by'] and params['order_by']['by']: - for key in params['order_by']['by']: - transform = key['transform']['col_transform'] - order = {} - order['transform'] = transform[0].lower() - order['name'] = transform[-1]['transform_col'] - order['direction'] = 'ascending' - order['null_order'] = 'nulls_first' - if key.get('direction'): - if 'desc' in key['direction'].lower(): - order['direction'] = 'descending' - if key.get('null_order'): - if 'last' in key['null_order'].lower(): - order['null_order'] = 'nulls_last' - order_by.append(order) + # Partitions + partition_by = PARTITION BY partition_key,... + partition_key = transform - out = ExportService( - wsg, - from_database, - from_table, - dict(**catalog_config, **catalog_creds), - dict(**storage_config, **storage_creds), - columns=None, - partition_by=partition_by or None, - order_by=order_by or None, - properties=json.loads(params['properties']) if params['properties'] else None, - ).start() + # Sort order + order_by = ORDER BY sort_key,... + sort_key = transform [ direction ] [ null_order ] + direction = { ASC | DESC | ASCENDING | DESCENDING } + null_order = { NULLS_FIRST | NULLS_LAST } - res = FusionSQLResult() - res.add_field('ExportID', result.STRING) - res.set_rows([(out.export_id,)]) + # Properties + properties = PROPERTIES '' - return res + # Catolog + catalog = CATALOG [ _catalog_config ] [ _catalog_creds ] + _catalog_config = CONFIG '' + _catalog_creds = CREDENTIALS '' + + # Storage + storage = LINK [ _link_config ] [ _link_creds ] + _link_config = S3 CONFIG '' + _link_creds = CREDENTIALS '' + + # Refresh interval + refresh_interval = REFRESH INTERVAL _refresh_interval_value _refresh_interval_time_unit + _refresh_interval_value = + _refresh_interval_time_unit = { SECONDS | MINUTES | HOURS | DAYS } + + Description + ----------- + Start an incremental export. + + Arguments + --------- + * ```` and ````: The catalog configuration. + * ```` and ````: The storage link configuration. + + Remarks + ------- + * ``FROM
`` specifies the SingleStore table to export. The same name will + be used for the exported table. + * ``CATALOG`` specifies the details of the catalog to connect to. + * ``LINK`` specifies the details of the data storage to connect to. + * ``REFRESH INTERVAL`` specifies the interval for refreshing the + incremental export. The default is 1 day. + + Examples + -------- + The following statement starts an export operation with the given + catalog and link configurations. The source table to export is + named "customer_data":: + + START INCREMENTAL EXPORT FROM my_db.customer_data + CATALOG CONFIG '{ + "catalog_type": "GLUE", + "table_format": "ICEBERG", + "catalog_id": "13983498723498", + "catalog_region": "us-east-1" + }' + LINK S3 CONFIG '{ + "region": "us-east-1", + "endpoint_url": "s3://bucket-name" + }' + REFRESH INTERVAL 24 HOURS + ; + + """ # noqa + + _enabled = False + + def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: + params['incremental'] = True + return _start_export(params) -CreateExport.register(overwrite=True) +StartIncrementalExport.register(overwrite=True) class ShowExport(SQLHandler): diff --git a/singlestoredb/management/export.py b/singlestoredb/management/export.py index 58516709b..089ebd1d3 100644 --- a/singlestoredb/management/export.py +++ b/singlestoredb/management/export.py @@ -3,6 +3,7 @@ from __future__ import annotations import copy +import datetime import json from typing import Any from typing import Dict @@ -27,6 +28,9 @@ class ExportService(object): partition_by: Optional[List[Dict[str, str]]] order_by: Optional[List[Dict[str, Dict[str, str]]]] properties: Optional[Dict[str, Any]] + incremental: bool + refresh_interval: Optional[datetime.timedelta] + export_id: Optional[str] def __init__( self, @@ -38,6 +42,8 @@ def __init__( columns: Optional[List[str]] = None, partition_by: Optional[List[Dict[str, str]]] = None, order_by: Optional[List[Dict[str, Dict[str, str]]]] = None, + incremental: bool = False, + refresh_interval: Optional[datetime.timedelta] = None, properties: Optional[Dict[str, Any]] = None, ): #: Workspace group @@ -68,8 +74,30 @@ def __init__( self.order_by = order_by or None self.properties = properties or None + self.incremental = incremental + self.refresh_interval = refresh_interval + + self.export_id = None + self._manager: Optional[WorkspaceManager] = workspace_group._manager + @classmethod + def from_export_id( + self, + workspace_group: WorkspaceGroup, + export_id: str, + ) -> ExportService: + """Create export service from export ID.""" + out = ExportService( + workspace_group=workspace_group, + database='', + table='', + catalog_info={}, + storage_info={}, + ) + out.export_id = export_id + return out + def __str__(self) -> str: """Return string representation.""" return vars_to_str(self) @@ -98,6 +126,11 @@ def create_cluster_identity(self) -> Dict[str, Any]: def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus': """Start the export process.""" + if not self.table or not self.database: + raise ManagementError( + msg='Database and table must be set before starting the export.', + ) + if self._manager is None: raise ManagementError( msg='No workspace manager is associated with this object.', @@ -122,11 +155,49 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus': partitionSpec=partition_spec, sortOrderSpec=sort_order_spec, properties=self.properties, + incremental=self.incremental, + refreshInterval=self.refresh_interval.total_seconds() + if self.refresh_interval is not None else None, ).items() if v is not None }, ) - return ExportStatus(out.json()['egressID'], self.workspace_group) + self.export_id = str(out.json()['egressID']) + + return ExportStatus(self.export_id, self.workspace_group) + + def stop(self) -> 'ExportStatus': + """Stop the export process.""" + if self._manager is None: + raise ManagementError( + msg='No workspace manager is associated with this object.', + ) + + if self.export_id is None: + raise ManagementError( + msg='Export ID is not set. You must start the export first.', + ) + + self._manager._post( + f'workspaceGroups/{self.workspace_group.id}/egress/stopTableEgress', + json=dict(egressID=self.export_id), + ) + + return ExportStatus(self.export_id, self.workspace_group) + + def status(self) -> ExportStatus: + """Get the status of the export process.""" + if self._manager is None: + raise ManagementError( + msg='No workspace manager is associated with this object.', + ) + + if self.export_id is None: + raise ManagementError( + msg='Export ID is not set. You must start the export first.', + ) + + return ExportStatus(self.export_id, self.workspace_group) class ExportStatus(object): From dc9b2bf22ba8146918d399385dc072628e3b3856 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Fri, 23 May 2025 12:35:55 -0500 Subject: [PATCH 2/9] Minor fixes --- singlestoredb/fusion/handlers/export.py | 2 +- singlestoredb/management/export.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/singlestoredb/fusion/handlers/export.py b/singlestoredb/fusion/handlers/export.py index c2458f2e4..e153a9cbf 100644 --- a/singlestoredb/fusion/handlers/export.py +++ b/singlestoredb/fusion/handlers/export.py @@ -162,7 +162,7 @@ def _start_export(params: Dict[str, Any]) -> Optional[FusionSQLResult]: refresh_interval = params.get('refresh_interval', None) if refresh_interval is not None: value = int(refresh_interval['refresh_interval_value']) - time_unit = refresh_interval['refresh_interval_time_unit'] + time_unit = refresh_interval['refresh_interval_time_unit'].upper() if value < 0: raise ValueError('refresh interval must be greater than 0') if time_unit == 'SECONDS': diff --git a/singlestoredb/management/export.py b/singlestoredb/management/export.py index 089ebd1d3..75e8d8bdd 100644 --- a/singlestoredb/management/export.py +++ b/singlestoredb/management/export.py @@ -155,7 +155,7 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus': partitionSpec=partition_spec, sortOrderSpec=sort_order_spec, properties=self.properties, - incremental=self.incremental, + incremental=self.incremental or None, refreshInterval=self.refresh_interval.total_seconds() if self.refresh_interval is not None else None, ).items() if v is not None From 399ff2b42a2cb61f9b6569ee4e4066ac29d0e96a Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Fri, 23 May 2025 12:41:35 -0500 Subject: [PATCH 3/9] Add stop command --- singlestoredb/fusion/handlers/export.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/singlestoredb/fusion/handlers/export.py b/singlestoredb/fusion/handlers/export.py index e153a9cbf..d180cff7f 100644 --- a/singlestoredb/fusion/handlers/export.py +++ b/singlestoredb/fusion/handlers/export.py @@ -420,3 +420,24 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: ShowExport.register(overwrite=True) + + +class StopExport(SQLHandler): + """ + STOP EXPORT export_id; + + # ID of export + export_id = '' + + """ + + _enabled = False + + def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: + wsg = get_workspace_group({}) + service = ExportService.from_export_id(wsg, params['export_id']) + service.stop() + return None + + +StopExport.register(overwrite=True) From 9a06b851a530d38e51c0b87fd9d571312ced357b Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 5 Jun 2025 13:55:16 -0500 Subject: [PATCH 4/9] Add unsigned types --- singlestoredb/types.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/singlestoredb/types.py b/singlestoredb/types.py index 5558c0d57..f5ccced60 100644 --- a/singlestoredb/types.py +++ b/singlestoredb/types.py @@ -173,17 +173,25 @@ class ColumnType(object): 'DECIMAL', 'DEC', 'FIXED', 'NUMERIC', 0, decimal.Decimal, ) DEC = FIXED = NUMERIC = DECIMAL - TINY = TINYINT = BOOL = BOOLEAN = NumberDBAPIType( - 'TINY', 'TINYINT', 'BOOL', 'BOOLEAN', 1, + TINY = TINYINT = BOOL = BOOLEAN = UNSIGNED_TINY = UNSIGNED_TINYINT = NumberDBAPIType( + 'TINY', 'TINYINT', 'BOOL', 'BOOLEAN', 'UNSIGNED TINY', 'UNSIGNED TINYINT', 1, + ) + SHORT = SMALLINT = UNSIGNED_SHORT = UNSIGNED_SMALLINT = NumberDBAPIType( + 'SMALLINT', 'SHORT', 'UNSIGNED SHORT', 'UNSIGNED SMALLINT', 2, + ) + LONG = INT = UNSIGNED_LONG = UNSIGNED_INT = NumberDBAPIType( + 'LONG', 'INT', 'UNSIGNED LONG', 'UNSIGNED INT', 3, ) - SHORT = SMALLINT = NumberDBAPIType('SMALLINT', 'SHORT', 2) - LONG = INT = NumberDBAPIType('LONG', 'INT', 3) FLOAT = NumberDBAPIType('FLOAT', 4) DOUBLE = REAL = NumberDBAPIType('DOUBLE', 5, float) NULL = DBAPIType('NULL', 6) TIMESTAMP = DatetimeDBAPIType('TIMESTAMP', 7) - LONGLONG = BIGINT = NumberDBAPIType('BIGINT', 'LONGLONG', 8, int) - MEDIUMINT = INT24 = NumberDBAPIType('MEDIUMINT', 'INT24', 9) + LONGLONG = BIGINT = UNSIGNED_LONGLONG = UNSIGNED_BIGINT = NumberDBAPIType( + 'BIGINT', 'LONGLONG', 'UNSIGNED LONGLONG', 'UNSIGNED_BIGINT', 8, int, + ) + MEDIUMINT = INT24 = UNSIGNED_MEDIUMINT = UNSIGNED_INT24 = NumberDBAPIType( + 'MEDIUMINT', 'INT24', 'UNSIGNED MEDIUMINT', 'UNSIGNED INT24', 9, + ) DATE = DBAPIType('DATE', 10, datetime.date) TIME = DBAPIType('TIME', 11, datetime.time) DATETIME = DatetimeDBAPIType('DATETIME', 12, datetime.datetime) From f699d11ba0f5a0a63d2b1b025d14e472fc400794 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 5 Jun 2025 16:04:21 -0500 Subject: [PATCH 5/9] Fix typeo --- singlestoredb/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singlestoredb/types.py b/singlestoredb/types.py index f5ccced60..051d465b4 100644 --- a/singlestoredb/types.py +++ b/singlestoredb/types.py @@ -187,7 +187,7 @@ class ColumnType(object): NULL = DBAPIType('NULL', 6) TIMESTAMP = DatetimeDBAPIType('TIMESTAMP', 7) LONGLONG = BIGINT = UNSIGNED_LONGLONG = UNSIGNED_BIGINT = NumberDBAPIType( - 'BIGINT', 'LONGLONG', 'UNSIGNED LONGLONG', 'UNSIGNED_BIGINT', 8, int, + 'BIGINT', 'LONGLONG', 'UNSIGNED LONGLONG', 'UNSIGNED BIGINT', 8, int, ) MEDIUMINT = INT24 = UNSIGNED_MEDIUMINT = UNSIGNED_INT24 = NumberDBAPIType( 'MEDIUMINT', 'INT24', 'UNSIGNED MEDIUMINT', 'UNSIGNED INT24', 9, From 0270fc133a7bedf6f847fb3aef9b8b0eb8043563 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Fri, 6 Jun 2025 08:59:09 -0500 Subject: [PATCH 6/9] Fix error parser --- singlestoredb/http/connection.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/singlestoredb/http/connection.py b/singlestoredb/http/connection.py index 874f8173d..1d1dce5d8 100644 --- a/singlestoredb/http/connection.py +++ b/singlestoredb/http/connection.py @@ -569,8 +569,10 @@ def _execute( if res.status_code >= 400: if res.text: - if re.match(r'^Error\s+\d+:', res.text): - code, msg = res.text.split(':', 1) + m = re.match(r'^Error\s+(\d+).*?:', res.text) + if m: + code = m.group(1) + msg = res.text.split(':', 1)[-1] icode = int(code.split()[-1]) else: icode = res.status_code From 6ec3064effb2814315bcbcf59d9a920982898b1d Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Mon, 9 Jun 2025 13:28:13 -0500 Subject: [PATCH 7/9] Cast refresh interval to int --- singlestoredb/management/export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singlestoredb/management/export.py b/singlestoredb/management/export.py index 75e8d8bdd..48a3e4d37 100644 --- a/singlestoredb/management/export.py +++ b/singlestoredb/management/export.py @@ -156,7 +156,7 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus': sortOrderSpec=sort_order_spec, properties=self.properties, incremental=self.incremental or None, - refreshInterval=self.refresh_interval.total_seconds() + refreshInterval=int(self.refresh_interval.total_seconds()) if self.refresh_interval is not None else None, ).items() if v is not None }, From e81f39ec4fd02f4c7d1f8850cc08f8474301a9bd Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Tue, 10 Jun 2025 10:06:26 -0500 Subject: [PATCH 8/9] Add Iceberg export commands; change refresh interval to int --- singlestoredb/fusion/handlers/export.py | 106 +++++++++++++++++++++--- singlestoredb/management/export.py | 69 +++++++++++++-- 2 files changed, 156 insertions(+), 19 deletions(-) diff --git a/singlestoredb/fusion/handlers/export.py b/singlestoredb/fusion/handlers/export.py index d180cff7f..7a879b4da 100644 --- a/singlestoredb/fusion/handlers/export.py +++ b/singlestoredb/fusion/handlers/export.py @@ -6,6 +6,7 @@ from typing import Optional from .. import result +from ...management.export import _get_exports from ...management.export import ExportService from ...management.export import ExportStatus from ..handler import SQLHandler @@ -187,7 +188,8 @@ def _start_export(params: Dict[str, Any]) -> Optional[FusionSQLResult]: order_by=order_by or None, properties=json.loads(params['properties']) if params['properties'] else None, incremental=params.get('incremental', False), - refresh_interval=refresh_interval_delta, + refresh_interval=int(refresh_interval_delta.total_seconds()) + if refresh_interval_delta is not None else None, ).start() res = FusionSQLResult() @@ -387,6 +389,25 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: StartIncrementalExport.register(overwrite=True) +def _format_status(export_id: str, status: ExportStatus) -> Optional[FusionSQLResult]: + """Return the status of an export operation.""" + info = status._info() + + res = FusionSQLResult() + res.add_field('ExportID', result.STRING) + res.add_field('Status', result.STRING) + res.add_field('Message', result.STRING) + res.set_rows([ + ( + export_id, + info.get('status', 'Unknown'), + info.get('statusMsg', ''), + ), + ]) + + return res + + class ShowExport(SQLHandler): """ SHOW EXPORT export_id; @@ -400,9 +421,29 @@ class ShowExport(SQLHandler): def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: wsg = get_workspace_group({}) - out = ExportStatus(params['export_id'], wsg) + return _format_status( + params['export_id'], ExportStatus(params['export_id'], wsg), + ) - status = out._info() + +ShowExport.register(overwrite=True) + + +class ShowExports(SQLHandler): + """ + SHOW EXPORTS [ scope ]; + + # Location of the export + scope = FOR '' + + """ + + _enabled = False + + def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: + wsg = get_workspace_group({}) + + exports = _get_exports(wsg, params.get('scope', 'all')) res = FusionSQLResult() res.add_field('ExportID', result.STRING) @@ -410,21 +451,62 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: res.add_field('Message', result.STRING) res.set_rows([ ( - params['export_id'], - status.get('status', 'Unknown'), - status.get('statusMsg', ''), - ), + info['egressID'], + info.get('status', 'Unknown'), + info.get('statusMsg', ''), + ) + for info in [x._info() for x in exports] ]) return res -ShowExport.register(overwrite=True) +ShowExports.register(overwrite=True) + + +class SuspendExport(SQLHandler): + """ + SUSPEND EXPORT export_id; + + # ID of export + export_id = '' + + """ + + _enabled = False + + def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: + wsg = get_workspace_group({}) + service = ExportService.from_export_id(wsg, params['export_id']) + return _format_status(params['export_id'], service.suspend()) + + +SuspendExport.register(overwrite=True) + + +class ResumeExport(SQLHandler): + """ + RESUME EXPORT export_id; + + # ID of export + export_id = '' + + """ + + _enabled = False + + def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: + wsg = get_workspace_group({}) + service = ExportService.from_export_id(wsg, params['export_id']) + return _format_status(params['export_id'], service.resume()) + + +ResumeExport.register(overwrite=True) -class StopExport(SQLHandler): +class DropExport(SQLHandler): """ - STOP EXPORT export_id; + DROP EXPORT export_id; # ID of export export_id = '' @@ -436,8 +518,8 @@ class StopExport(SQLHandler): def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]: wsg = get_workspace_group({}) service = ExportService.from_export_id(wsg, params['export_id']) - service.stop() + service.drop() return None -StopExport.register(overwrite=True) +DropExport.register(overwrite=True) diff --git a/singlestoredb/management/export.py b/singlestoredb/management/export.py index 48a3e4d37..83a89718a 100644 --- a/singlestoredb/management/export.py +++ b/singlestoredb/management/export.py @@ -3,7 +3,6 @@ from __future__ import annotations import copy -import datetime import json from typing import Any from typing import Dict @@ -29,7 +28,7 @@ class ExportService(object): order_by: Optional[List[Dict[str, Dict[str, str]]]] properties: Optional[Dict[str, Any]] incremental: bool - refresh_interval: Optional[datetime.timedelta] + refresh_interval: Optional[int] export_id: Optional[str] def __init__( @@ -43,7 +42,7 @@ def __init__( partition_by: Optional[List[Dict[str, str]]] = None, order_by: Optional[List[Dict[str, Dict[str, str]]]] = None, incremental: bool = False, - refresh_interval: Optional[datetime.timedelta] = None, + refresh_interval: Optional[int] = None, properties: Optional[Dict[str, Any]] = None, ): #: Workspace group @@ -156,7 +155,7 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus': sortOrderSpec=sort_order_spec, properties=self.properties, incremental=self.incremental or None, - refreshInterval=int(self.refresh_interval.total_seconds()) + refreshInterval=self.refresh_interval if self.refresh_interval is not None else None, ).items() if v is not None }, @@ -166,8 +165,8 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus': return ExportStatus(self.export_id, self.workspace_group) - def stop(self) -> 'ExportStatus': - """Stop the export process.""" + def suspend(self) -> 'ExportStatus': + """Suspend the export process.""" if self._manager is None: raise ManagementError( msg='No workspace manager is associated with this object.', @@ -179,12 +178,50 @@ def stop(self) -> 'ExportStatus': ) self._manager._post( - f'workspaceGroups/{self.workspace_group.id}/egress/stopTableEgress', + f'workspaceGroups/{self.workspace_group.id}/egress/suspendTableEgress', json=dict(egressID=self.export_id), ) return ExportStatus(self.export_id, self.workspace_group) + def resume(self) -> 'ExportStatus': + """Resume the export process.""" + if self._manager is None: + raise ManagementError( + msg='No workspace manager is associated with this object.', + ) + + if self.export_id is None: + raise ManagementError( + msg='Export ID is not set. You must start the export first.', + ) + + self._manager._post( + f'workspaceGroups/{self.workspace_group.id}/egress/resumeTableEgress', + json=dict(egressID=self.export_id), + ) + + return ExportStatus(self.export_id, self.workspace_group) + + def drop(self) -> None: + """Drop the export process.""" + if self._manager is None: + raise ManagementError( + msg='No workspace manager is associated with this object.', + ) + + if self.export_id is None: + raise ManagementError( + msg='Export ID is not set. You must start the export first.', + ) + + self._manager._post( + f'workspaceGroups/{self.workspace_group.id}/egress/dropTableEgress', + json=dict(egressID=self.export_id), + ) + + return None + def status(self) -> ExportStatus: """Get the status of the export process.""" if self._manager is None: @@ -238,3 +275,21 @@ def __str__(self) -> str: def __repr__(self) -> str: return self.status + + +def _get_exports( + workspace_group: WorkspaceGroup, + scope: str = 'all', +) -> List[ExportStatus]: + """Get all exports in the workspace group.""" + if workspace_group._manager is None: + raise ManagementError( + msg='No workspace manager is associated with this object.', + ) + + out = workspace_group._manager._get( + f'workspaceGroups/{workspace_group.id}/egress/tableEgressStatus', + json=dict(scope=scope), + ) + + return out.json() From ad867ed0ddd7cac9d7b9b45470f4d1169c0d3325 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Tue, 10 Jun 2025 14:09:21 -0500 Subject: [PATCH 9/9] Make test filenames unique --- singlestoredb/tests/test_management.py | 76 +++++++++++++++----------- 1 file changed, 45 insertions(+), 31 deletions(-) diff --git a/singlestoredb/tests/test_management.py b/singlestoredb/tests/test_management.py index f67de5b15..86043c1ee 100755 --- a/singlestoredb/tests/test_management.py +++ b/singlestoredb/tests/test_management.py @@ -450,55 +450,59 @@ def test_upload_file(self): def test_open(self): st = self.wg.stage + open_test_sql = f'open_test_{id(self)}.sql' + # See if error is raised for non-existent file with self.assertRaises(s2.ManagementError): - st.open('open_test.sql', 'r') + st.open(open_test_sql, 'r') # Load test file - st.upload_file(TEST_DIR / 'test.sql', 'open_test.sql') + st.upload_file(TEST_DIR / 'test.sql', open_test_sql) # Read file using `open` - with st.open('open_test.sql', 'r') as rfile: + with st.open(open_test_sql, 'r') as rfile: assert rfile.read() == open(TEST_DIR / 'test.sql').read() # Read file using `open` with 'rt' mode - with st.open('open_test.sql', 'rt') as rfile: + with st.open(open_test_sql, 'rt') as rfile: assert rfile.read() == open(TEST_DIR / 'test.sql').read() # Read file using `open` with 'rb' mode - with st.open('open_test.sql', 'rb') as rfile: + with st.open(open_test_sql, 'rb') as rfile: assert rfile.read() == open(TEST_DIR / 'test.sql', 'rb').read() # Read file using `open` with 'rb' mode with self.assertRaises(ValueError): - with st.open('open_test.sql', 'b') as rfile: + with st.open(open_test_sql, 'b') as rfile: pass # Attempt overwrite file using `open` with mode 'x' with self.assertRaises(OSError): - with st.open('open_test.sql', 'x') as wfile: + with st.open(open_test_sql, 'x') as wfile: pass # Attempt overwrite file using `open` with mode 'w' - with st.open('open_test.sql', 'w') as wfile: + with st.open(open_test_sql, 'w') as wfile: wfile.write(open(TEST_DIR / 'test2.sql').read()) - txt = st.download_file('open_test.sql', encoding='utf-8') + txt = st.download_file(open_test_sql, encoding='utf-8') assert txt == open(TEST_DIR / 'test2.sql').read() + open_raw_test_sql = f'open_raw_test_{id(self)}.sql' + # Test writer without context manager - wfile = st.open('open_raw_test.sql', 'w') + wfile = st.open(open_raw_test_sql, 'w') for line in open(TEST_DIR / 'test.sql'): wfile.write(line) wfile.close() - txt = st.download_file('open_raw_test.sql', encoding='utf-8') + txt = st.download_file(open_raw_test_sql, encoding='utf-8') assert txt == open(TEST_DIR / 'test.sql').read() # Test reader without context manager - rfile = st.open('open_raw_test.sql', 'r') + rfile = st.open(open_raw_test_sql, 'r') txt = '' for line in rfile: txt += line @@ -509,15 +513,18 @@ def test_open(self): def test_obj_open(self): st = self.wg.stage + obj_open_test_sql = f'obj_open_test_{id(self)}.sql' + obj_open_dir = f'obj_open_dir_{id(self)}' + # Load test file - f = st.upload_file(TEST_DIR / 'test.sql', 'obj_open_test.sql') + f = st.upload_file(TEST_DIR / 'test.sql', obj_open_test_sql) # Read file using `open` with f.open() as rfile: assert rfile.read() == open(TEST_DIR / 'test.sql').read() # Make sure directories error out - d = st.mkdir('obj_open_dir') + d = st.mkdir(obj_open_dir) with self.assertRaises(IsADirectoryError): d.open() @@ -1143,58 +1150,62 @@ def test_upload_file_io(self): def test_open(self): for space in [self.personal_space, self.shared_space]: + open_test_ipynb = f'open_test_ipynb_{id(self)}.ipynb' + # See if error is raised for non-existent file with self.assertRaises(s2.ManagementError): - space.open('open_test.ipynb', 'r') + space.open(open_test_ipynb, 'r') # Load test file - space.upload_file(TEST_DIR / 'test.ipynb', 'open_test.ipynb') + space.upload_file(TEST_DIR / 'test.ipynb', open_test_ipynb) # Read file using `open` - with space.open('open_test.ipynb', 'r') as rfile: + with space.open(open_test_ipynb, 'r') as rfile: assert rfile.read() == open(TEST_DIR / 'test.ipynb').read() # Read file using `open` with 'rt' mode - with space.open('open_test.ipynb', 'rt') as rfile: + with space.open(open_test_ipynb, 'rt') as rfile: assert rfile.read() == open(TEST_DIR / 'test.ipynb').read() # Read file using `open` with 'rb' mode - with space.open('open_test.ipynb', 'rb') as rfile: + with space.open(open_test_ipynb, 'rb') as rfile: assert rfile.read() == open(TEST_DIR / 'test.ipynb', 'rb').read() # Read file using `open` with 'rb' mode with self.assertRaises(ValueError): - with space.open('open_test.ipynb', 'b') as rfile: + with space.open(open_test_ipynb, 'b') as rfile: pass # Attempt overwrite file using `open` with mode 'x' with self.assertRaises(OSError): - with space.open('open_test.ipynb', 'x') as wfile: + with space.open(open_test_ipynb, 'x') as wfile: pass # Attempt overwrite file using `open` with mode 'w' - with space.open('open_test.ipynb', 'w') as wfile: + with space.open(open_test_ipynb, 'w') as wfile: wfile.write(open(TEST_DIR / 'test2.ipynb').read()) - txt = space.download_file('open_test.ipynb', encoding='utf-8') + txt = space.download_file(open_test_ipynb, encoding='utf-8') assert txt == open(TEST_DIR / 'test2.ipynb').read() + open_raw_test_ipynb = f'open_raw_test_{id(self)}.ipynb' + # Test writer without context manager - wfile = space.open('open_raw_test.ipynb', 'w') + wfile = space.open(open_raw_test_ipynb, 'w') for line in open(TEST_DIR / 'test.ipynb'): wfile.write(line) wfile.close() txt = space.download_file( - 'open_raw_test.ipynb', + open_raw_test_ipynb, encoding='utf-8', ) assert txt == open(TEST_DIR / 'test.ipynb').read() # Test reader without context manager - rfile = space.open('open_raw_test.ipynb', 'r') + rfile = space.open(open_raw_test_ipynb, 'r') txt = '' for line in rfile: txt += line @@ -1203,15 +1214,18 @@ def test_open(self): assert txt == open(TEST_DIR / 'test.ipynb').read() # Cleanup - space.remove('open_test.ipynb') - space.remove('open_raw_test.ipynb') + space.remove(open_test_ipynb) + space.remove(open_raw_test_ipynb) def test_obj_open(self): for space in [self.personal_space, self.shared_space]: + obj_open_test_ipynb = f'obj_open_test_{id(self)}.ipynb' + obj_open_dir = f'obj_open_dir_{id(self)}' + # Load test file f = space.upload_file( TEST_DIR / 'test.ipynb', - 'obj_open_test.ipynb', + obj_open_test_ipynb, ) # Read file using `open` @@ -1220,7 +1234,7 @@ def test_obj_open(self): # Make sure directories error out with self.assertRaises(s2.ManagementError): - space.mkdir('obj_open_dir') + space.mkdir(obj_open_dir) # Write file using `open` with f.open('w', encoding='utf-8') as wfile: @@ -1248,7 +1262,7 @@ def test_obj_open(self): assert txt == open(TEST_DIR / 'test.ipynb').read() # Cleanup - space.remove('obj_open_test.ipynb') + space.remove(obj_open_test_ipynb) def test_os_directories(self): for space in [self.personal_space, self.shared_space]: