diff --git a/singlestoredb/fusion/handlers/export.py b/singlestoredb/fusion/handlers/export.py
index ff0fe1e3a..7a879b4da 100644
--- a/singlestoredb/fusion/handlers/export.py
+++ b/singlestoredb/fusion/handlers/export.py
@@ -1,10 +1,12 @@
#!/usr/bin/env python3
+import datetime
import json
from typing import Any
from typing import Dict
from typing import Optional
from .. import result
+from ...management.export import _get_exports
from ...management.export import ExportService
from ...management.export import ExportStatus
from ..handler import SQLHandler
@@ -104,7 +106,100 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
CreateClusterIdentity.register(overwrite=True)
-class CreateExport(SQLHandler):
+def _start_export(params: Dict[str, Any]) -> Optional[FusionSQLResult]:
+ # From table
+ if isinstance(params['from_table'], str):
+ from_database = None
+ from_table = params['from_table']
+ else:
+ from_database, from_table = params['from_table']
+
+ # Catalog
+ catalog_config = json.loads(params['catalog'].get('catalog_config', '{}') or '{}')
+ catalog_creds = json.loads(params['catalog'].get('catalog_creds', '{}') or '{}')
+
+ # Storage
+ storage_config = json.loads(params['storage'].get('link_config', '{}') or '{}')
+ storage_creds = json.loads(params['storage'].get('link_creds', '{}') or '{}')
+
+ storage_config['provider'] = 'S3'
+
+ wsg = get_workspace_group({})
+
+ if from_database is None:
+ raise ValueError('database name must be specified for source table')
+
+ if wsg._manager is None:
+ raise TypeError('no workspace manager is associated with workspace group')
+
+ partition_by = []
+ if params['partition_by']:
+ for key in params['partition_by']:
+ transform = key['partition_key']['transform']['col_transform']
+ part = {}
+ part['transform'] = transform[0].lower()
+ part['name'] = transform[-1]['transform_col']
+ partition_by.append(part)
+
+ order_by = []
+ if params['order_by'] and params['order_by']['by']:
+ for key in params['order_by']['by']:
+ transform = key['transform']['col_transform']
+ order = {}
+ order['transform'] = transform[0].lower()
+ order['name'] = transform[-1]['transform_col']
+ order['direction'] = 'ascending'
+ order['null_order'] = 'nulls_first'
+ if key.get('direction'):
+ if 'desc' in key['direction'].lower():
+ order['direction'] = 'descending'
+ if key.get('null_order'):
+ if 'last' in key['null_order'].lower():
+ order['null_order'] = 'nulls_last'
+ order_by.append(order)
+
+ # Refresh interval
+ refresh_interval_delta = None
+ refresh_interval = params.get('refresh_interval', None)
+ if refresh_interval is not None:
+ value = int(refresh_interval['refresh_interval_value'])
+ time_unit = refresh_interval['refresh_interval_time_unit'].upper()
+ if value < 0:
+ raise ValueError('refresh interval must be greater than 0')
+ if time_unit == 'SECONDS':
+ refresh_interval_delta = datetime.timedelta(seconds=int(value))
+ elif time_unit == 'MINUTES':
+ refresh_interval_delta = datetime.timedelta(minutes=int(value))
+ elif time_unit == 'HOURS':
+ refresh_interval_delta = datetime.timedelta(hours=int(value))
+ elif time_unit == 'DAYS':
+ refresh_interval_delta = datetime.timedelta(days=int(value))
+ else:
+ raise ValueError('invalid refresh interval time unit')
+
+ out = ExportService(
+ wsg,
+ from_database,
+ from_table,
+ dict(**catalog_config, **catalog_creds),
+ dict(**storage_config, **storage_creds),
+ columns=None,
+ partition_by=partition_by or None,
+ order_by=order_by or None,
+ properties=json.loads(params['properties']) if params['properties'] else None,
+ incremental=params.get('incremental', False),
+ refresh_interval=int(refresh_interval_delta.total_seconds())
+ if refresh_interval_delta is not None else None,
+ ).start()
+
+ res = FusionSQLResult()
+ res.add_field('ExportID', result.STRING)
+ res.set_rows([(out.export_id,)])
+
+ return res
+
+
+class StartExport(SQLHandler):
"""
START EXPORT
from_table
@@ -150,7 +245,7 @@ class CreateExport(SQLHandler):
Description
-----------
- Create an export configuration.
+ Start an export.
Arguments
---------
@@ -180,7 +275,6 @@ class CreateExport(SQLHandler):
LINK S3 CONFIG '{
"region": "us-east-1",
"endpoint_url": "s3://bucket-name"
-
}'
;
@@ -189,77 +283,129 @@ class CreateExport(SQLHandler):
_enabled = False
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
- # From table
- if isinstance(params['from_table'], str):
- from_database = None
- from_table = params['from_table']
- else:
- from_database, from_table = params['from_table']
+ return _start_export(params)
- # Catalog
- catalog_config = json.loads(params['catalog'].get('catalog_config', '{}') or '{}')
- catalog_creds = json.loads(params['catalog'].get('catalog_creds', '{}') or '{}')
- # Storage
- storage_config = json.loads(params['storage'].get('link_config', '{}') or '{}')
- storage_creds = json.loads(params['storage'].get('link_creds', '{}') or '{}')
+StartExport.register(overwrite=True)
- storage_config['provider'] = 'S3'
- wsg = get_workspace_group({})
+class StartIncrementalExport(SQLHandler):
+ """
+ START INCREMENTAL EXPORT
+ from_table
+ catalog
+ storage
+ [ partition_by ]
+ [ order_by ]
+ [ properties ]
+ [ refresh_interval ]
+ ;
- if from_database is None:
- raise ValueError('database name must be specified for source table')
+ # From table
+ from_table = FROM
- if wsg._manager is None:
- raise TypeError('no workspace manager is associated with workspace group')
+ # Transforms
+ _col_transform = { VOID | IDENTITY | YEAR | MONTH | DAY | HOUR } ( _transform_col )
+ _transform_col =
+ _arg_transform = { BUCKET | TRUNCATE } ( _transform_col _transform_arg )
+ _transform_arg =
+ transform = { _col_transform | _arg_transform }
- partition_by = []
- if params['partition_by']:
- for key in params['partition_by']:
- transform = key['partition_key']['transform']['col_transform']
- part = {}
- part['transform'] = transform[0].lower()
- part['name'] = transform[-1]['transform_col']
- partition_by.append(part)
-
- order_by = []
- if params['order_by'] and params['order_by']['by']:
- for key in params['order_by']['by']:
- transform = key['transform']['col_transform']
- order = {}
- order['transform'] = transform[0].lower()
- order['name'] = transform[-1]['transform_col']
- order['direction'] = 'ascending'
- order['null_order'] = 'nulls_first'
- if key.get('direction'):
- if 'desc' in key['direction'].lower():
- order['direction'] = 'descending'
- if key.get('null_order'):
- if 'last' in key['null_order'].lower():
- order['null_order'] = 'nulls_last'
- order_by.append(order)
+ # Partitions
+ partition_by = PARTITION BY partition_key,...
+ partition_key = transform
- out = ExportService(
- wsg,
- from_database,
- from_table,
- dict(**catalog_config, **catalog_creds),
- dict(**storage_config, **storage_creds),
- columns=None,
- partition_by=partition_by or None,
- order_by=order_by or None,
- properties=json.loads(params['properties']) if params['properties'] else None,
- ).start()
+ # Sort order
+ order_by = ORDER BY sort_key,...
+ sort_key = transform [ direction ] [ null_order ]
+ direction = { ASC | DESC | ASCENDING | DESCENDING }
+ null_order = { NULLS_FIRST | NULLS_LAST }
- res = FusionSQLResult()
- res.add_field('ExportID', result.STRING)
- res.set_rows([(out.export_id,)])
+ # Properties
+ properties = PROPERTIES ''
- return res
+ # Catolog
+ catalog = CATALOG [ _catalog_config ] [ _catalog_creds ]
+ _catalog_config = CONFIG ''
+ _catalog_creds = CREDENTIALS ''
+
+ # Storage
+ storage = LINK [ _link_config ] [ _link_creds ]
+ _link_config = S3 CONFIG ''
+ _link_creds = CREDENTIALS ''
+
+ # Refresh interval
+ refresh_interval = REFRESH INTERVAL _refresh_interval_value _refresh_interval_time_unit
+ _refresh_interval_value =
+ _refresh_interval_time_unit = { SECONDS | MINUTES | HOURS | DAYS }
+
+ Description
+ -----------
+ Start an incremental export.
+
+ Arguments
+ ---------
+ * ```` and ````: The catalog configuration.
+ * ```` and ````: The storage link configuration.
+
+ Remarks
+ -------
+ * ``FROM `` specifies the SingleStore table to export. The same name will
+ be used for the exported table.
+ * ``CATALOG`` specifies the details of the catalog to connect to.
+ * ``LINK`` specifies the details of the data storage to connect to.
+ * ``REFRESH INTERVAL`` specifies the interval for refreshing the
+ incremental export. The default is 1 day.
+
+ Examples
+ --------
+ The following statement starts an export operation with the given
+ catalog and link configurations. The source table to export is
+ named "customer_data"::
+ START INCREMENTAL EXPORT FROM my_db.customer_data
+ CATALOG CONFIG '{
+ "catalog_type": "GLUE",
+ "table_format": "ICEBERG",
+ "catalog_id": "13983498723498",
+ "catalog_region": "us-east-1"
+ }'
+ LINK S3 CONFIG '{
+ "region": "us-east-1",
+ "endpoint_url": "s3://bucket-name"
+ }'
+ REFRESH INTERVAL 24 HOURS
+ ;
+
+ """ # noqa
+
+ _enabled = False
+
+ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
+ params['incremental'] = True
+ return _start_export(params)
+
+
+StartIncrementalExport.register(overwrite=True)
+
+
+def _format_status(export_id: str, status: ExportStatus) -> Optional[FusionSQLResult]:
+ """Return the status of an export operation."""
+ info = status._info()
-CreateExport.register(overwrite=True)
+ res = FusionSQLResult()
+ res.add_field('ExportID', result.STRING)
+ res.add_field('Status', result.STRING)
+ res.add_field('Message', result.STRING)
+ res.set_rows([
+ (
+ export_id,
+ info.get('status', 'Unknown'),
+ info.get('statusMsg', ''),
+ ),
+ ])
+
+ return res
class ShowExport(SQLHandler):
@@ -275,9 +421,29 @@ class ShowExport(SQLHandler):
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
wsg = get_workspace_group({})
- out = ExportStatus(params['export_id'], wsg)
+ return _format_status(
+ params['export_id'], ExportStatus(params['export_id'], wsg),
+ )
+
+
+ShowExport.register(overwrite=True)
+
+
+class ShowExports(SQLHandler):
+ """
+ SHOW EXPORTS [ scope ];
+
+ # Location of the export
+ scope = FOR ''
+
+ """
+
+ _enabled = False
+
+ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
+ wsg = get_workspace_group({})
- status = out._info()
+ exports = _get_exports(wsg, params.get('scope', 'all'))
res = FusionSQLResult()
res.add_field('ExportID', result.STRING)
@@ -285,13 +451,75 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
res.add_field('Message', result.STRING)
res.set_rows([
(
- params['export_id'],
- status.get('status', 'Unknown'),
- status.get('statusMsg', ''),
- ),
+ info['egressID'],
+ info.get('status', 'Unknown'),
+ info.get('statusMsg', ''),
+ )
+ for info in [x._info() for x in exports]
])
return res
-ShowExport.register(overwrite=True)
+ShowExports.register(overwrite=True)
+
+
+class SuspendExport(SQLHandler):
+ """
+ SUSPEND EXPORT export_id;
+
+ # ID of export
+ export_id = ''
+
+ """
+
+ _enabled = False
+
+ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
+ wsg = get_workspace_group({})
+ service = ExportService.from_export_id(wsg, params['export_id'])
+ return _format_status(params['export_id'], service.suspend())
+
+
+SuspendExport.register(overwrite=True)
+
+
+class ResumeExport(SQLHandler):
+ """
+ RESUME EXPORT export_id;
+
+ # ID of export
+ export_id = ''
+
+ """
+
+ _enabled = False
+
+ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
+ wsg = get_workspace_group({})
+ service = ExportService.from_export_id(wsg, params['export_id'])
+ return _format_status(params['export_id'], service.resume())
+
+
+ResumeExport.register(overwrite=True)
+
+
+class DropExport(SQLHandler):
+ """
+ DROP EXPORT export_id;
+
+ # ID of export
+ export_id = ''
+
+ """
+
+ _enabled = False
+
+ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
+ wsg = get_workspace_group({})
+ service = ExportService.from_export_id(wsg, params['export_id'])
+ service.drop()
+ return None
+
+
+DropExport.register(overwrite=True)
diff --git a/singlestoredb/http/connection.py b/singlestoredb/http/connection.py
index 874f8173d..1d1dce5d8 100644
--- a/singlestoredb/http/connection.py
+++ b/singlestoredb/http/connection.py
@@ -569,8 +569,10 @@ def _execute(
if res.status_code >= 400:
if res.text:
- if re.match(r'^Error\s+\d+:', res.text):
- code, msg = res.text.split(':', 1)
+ m = re.match(r'^Error\s+(\d+).*?:', res.text)
+ if m:
+ code = m.group(1)
+ msg = res.text.split(':', 1)[-1]
icode = int(code.split()[-1])
else:
icode = res.status_code
diff --git a/singlestoredb/management/export.py b/singlestoredb/management/export.py
index 58516709b..83a89718a 100644
--- a/singlestoredb/management/export.py
+++ b/singlestoredb/management/export.py
@@ -27,6 +27,9 @@ class ExportService(object):
partition_by: Optional[List[Dict[str, str]]]
order_by: Optional[List[Dict[str, Dict[str, str]]]]
properties: Optional[Dict[str, Any]]
+ incremental: bool
+ refresh_interval: Optional[int]
+ export_id: Optional[str]
def __init__(
self,
@@ -38,6 +41,8 @@ def __init__(
columns: Optional[List[str]] = None,
partition_by: Optional[List[Dict[str, str]]] = None,
order_by: Optional[List[Dict[str, Dict[str, str]]]] = None,
+ incremental: bool = False,
+ refresh_interval: Optional[int] = None,
properties: Optional[Dict[str, Any]] = None,
):
#: Workspace group
@@ -68,8 +73,30 @@ def __init__(
self.order_by = order_by or None
self.properties = properties or None
+ self.incremental = incremental
+ self.refresh_interval = refresh_interval
+
+ self.export_id = None
+
self._manager: Optional[WorkspaceManager] = workspace_group._manager
+ @classmethod
+ def from_export_id(
+ self,
+ workspace_group: WorkspaceGroup,
+ export_id: str,
+ ) -> ExportService:
+ """Create export service from export ID."""
+ out = ExportService(
+ workspace_group=workspace_group,
+ database='',
+ table='',
+ catalog_info={},
+ storage_info={},
+ )
+ out.export_id = export_id
+ return out
+
def __str__(self) -> str:
"""Return string representation."""
return vars_to_str(self)
@@ -98,6 +125,11 @@ def create_cluster_identity(self) -> Dict[str, Any]:
def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus':
"""Start the export process."""
+ if not self.table or not self.database:
+ raise ManagementError(
+ msg='Database and table must be set before starting the export.',
+ )
+
if self._manager is None:
raise ManagementError(
msg='No workspace manager is associated with this object.',
@@ -122,11 +154,87 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus':
partitionSpec=partition_spec,
sortOrderSpec=sort_order_spec,
properties=self.properties,
+ incremental=self.incremental or None,
+ refreshInterval=self.refresh_interval
+ if self.refresh_interval is not None else None,
).items() if v is not None
},
)
- return ExportStatus(out.json()['egressID'], self.workspace_group)
+ self.export_id = str(out.json()['egressID'])
+
+ return ExportStatus(self.export_id, self.workspace_group)
+
+ def suspend(self) -> 'ExportStatus':
+ """Suspend the export process."""
+ if self._manager is None:
+ raise ManagementError(
+ msg='No workspace manager is associated with this object.',
+ )
+
+ if self.export_id is None:
+ raise ManagementError(
+ msg='Export ID is not set. You must start the export first.',
+ )
+
+ self._manager._post(
+ f'workspaceGroups/{self.workspace_group.id}/egress/suspendTableEgress',
+ json=dict(egressID=self.export_id),
+ )
+
+ return ExportStatus(self.export_id, self.workspace_group)
+
+ def resume(self) -> 'ExportStatus':
+ """Resume the export process."""
+ if self._manager is None:
+ raise ManagementError(
+ msg='No workspace manager is associated with this object.',
+ )
+
+ if self.export_id is None:
+ raise ManagementError(
+ msg='Export ID is not set. You must start the export first.',
+ )
+
+ self._manager._post(
+ f'workspaceGroups/{self.workspace_group.id}/egress/resumeTableEgress',
+ json=dict(egressID=self.export_id),
+ )
+
+ return ExportStatus(self.export_id, self.workspace_group)
+
+ def drop(self) -> None:
+ """Drop the export process."""
+ if self._manager is None:
+ raise ManagementError(
+ msg='No workspace manager is associated with this object.',
+ )
+
+ if self.export_id is None:
+ raise ManagementError(
+ msg='Export ID is not set. You must start the export first.',
+ )
+
+ self._manager._post(
+ f'workspaceGroups/{self.workspace_group.id}/egress/dropTableEgress',
+ json=dict(egressID=self.export_id),
+ )
+
+ return None
+
+ def status(self) -> ExportStatus:
+ """Get the status of the export process."""
+ if self._manager is None:
+ raise ManagementError(
+ msg='No workspace manager is associated with this object.',
+ )
+
+ if self.export_id is None:
+ raise ManagementError(
+ msg='Export ID is not set. You must start the export first.',
+ )
+
+ return ExportStatus(self.export_id, self.workspace_group)
class ExportStatus(object):
@@ -167,3 +275,21 @@ def __str__(self) -> str:
def __repr__(self) -> str:
return self.status
+
+
+def _get_exports(
+ workspace_group: WorkspaceGroup,
+ scope: str = 'all',
+) -> List[ExportStatus]:
+ """Get all exports in the workspace group."""
+ if workspace_group._manager is None:
+ raise ManagementError(
+ msg='No workspace manager is associated with this object.',
+ )
+
+ out = workspace_group._manager._get(
+ f'workspaceGroups/{workspace_group.id}/egress/tableEgressStatus',
+ json=dict(scope=scope),
+ )
+
+ return out.json()
diff --git a/singlestoredb/tests/test_management.py b/singlestoredb/tests/test_management.py
index f67de5b15..86043c1ee 100755
--- a/singlestoredb/tests/test_management.py
+++ b/singlestoredb/tests/test_management.py
@@ -450,55 +450,59 @@ def test_upload_file(self):
def test_open(self):
st = self.wg.stage
+ open_test_sql = f'open_test_{id(self)}.sql'
+
# See if error is raised for non-existent file
with self.assertRaises(s2.ManagementError):
- st.open('open_test.sql', 'r')
+ st.open(open_test_sql, 'r')
# Load test file
- st.upload_file(TEST_DIR / 'test.sql', 'open_test.sql')
+ st.upload_file(TEST_DIR / 'test.sql', open_test_sql)
# Read file using `open`
- with st.open('open_test.sql', 'r') as rfile:
+ with st.open(open_test_sql, 'r') as rfile:
assert rfile.read() == open(TEST_DIR / 'test.sql').read()
# Read file using `open` with 'rt' mode
- with st.open('open_test.sql', 'rt') as rfile:
+ with st.open(open_test_sql, 'rt') as rfile:
assert rfile.read() == open(TEST_DIR / 'test.sql').read()
# Read file using `open` with 'rb' mode
- with st.open('open_test.sql', 'rb') as rfile:
+ with st.open(open_test_sql, 'rb') as rfile:
assert rfile.read() == open(TEST_DIR / 'test.sql', 'rb').read()
# Read file using `open` with 'rb' mode
with self.assertRaises(ValueError):
- with st.open('open_test.sql', 'b') as rfile:
+ with st.open(open_test_sql, 'b') as rfile:
pass
# Attempt overwrite file using `open` with mode 'x'
with self.assertRaises(OSError):
- with st.open('open_test.sql', 'x') as wfile:
+ with st.open(open_test_sql, 'x') as wfile:
pass
# Attempt overwrite file using `open` with mode 'w'
- with st.open('open_test.sql', 'w') as wfile:
+ with st.open(open_test_sql, 'w') as wfile:
wfile.write(open(TEST_DIR / 'test2.sql').read())
- txt = st.download_file('open_test.sql', encoding='utf-8')
+ txt = st.download_file(open_test_sql, encoding='utf-8')
assert txt == open(TEST_DIR / 'test2.sql').read()
+ open_raw_test_sql = f'open_raw_test_{id(self)}.sql'
+
# Test writer without context manager
- wfile = st.open('open_raw_test.sql', 'w')
+ wfile = st.open(open_raw_test_sql, 'w')
for line in open(TEST_DIR / 'test.sql'):
wfile.write(line)
wfile.close()
- txt = st.download_file('open_raw_test.sql', encoding='utf-8')
+ txt = st.download_file(open_raw_test_sql, encoding='utf-8')
assert txt == open(TEST_DIR / 'test.sql').read()
# Test reader without context manager
- rfile = st.open('open_raw_test.sql', 'r')
+ rfile = st.open(open_raw_test_sql, 'r')
txt = ''
for line in rfile:
txt += line
@@ -509,15 +513,18 @@ def test_open(self):
def test_obj_open(self):
st = self.wg.stage
+ obj_open_test_sql = f'obj_open_test_{id(self)}.sql'
+ obj_open_dir = f'obj_open_dir_{id(self)}'
+
# Load test file
- f = st.upload_file(TEST_DIR / 'test.sql', 'obj_open_test.sql')
+ f = st.upload_file(TEST_DIR / 'test.sql', obj_open_test_sql)
# Read file using `open`
with f.open() as rfile:
assert rfile.read() == open(TEST_DIR / 'test.sql').read()
# Make sure directories error out
- d = st.mkdir('obj_open_dir')
+ d = st.mkdir(obj_open_dir)
with self.assertRaises(IsADirectoryError):
d.open()
@@ -1143,58 +1150,62 @@ def test_upload_file_io(self):
def test_open(self):
for space in [self.personal_space, self.shared_space]:
+ open_test_ipynb = f'open_test_ipynb_{id(self)}.ipynb'
+
# See if error is raised for non-existent file
with self.assertRaises(s2.ManagementError):
- space.open('open_test.ipynb', 'r')
+ space.open(open_test_ipynb, 'r')
# Load test file
- space.upload_file(TEST_DIR / 'test.ipynb', 'open_test.ipynb')
+ space.upload_file(TEST_DIR / 'test.ipynb', open_test_ipynb)
# Read file using `open`
- with space.open('open_test.ipynb', 'r') as rfile:
+ with space.open(open_test_ipynb, 'r') as rfile:
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()
# Read file using `open` with 'rt' mode
- with space.open('open_test.ipynb', 'rt') as rfile:
+ with space.open(open_test_ipynb, 'rt') as rfile:
assert rfile.read() == open(TEST_DIR / 'test.ipynb').read()
# Read file using `open` with 'rb' mode
- with space.open('open_test.ipynb', 'rb') as rfile:
+ with space.open(open_test_ipynb, 'rb') as rfile:
assert rfile.read() == open(TEST_DIR / 'test.ipynb', 'rb').read()
# Read file using `open` with 'rb' mode
with self.assertRaises(ValueError):
- with space.open('open_test.ipynb', 'b') as rfile:
+ with space.open(open_test_ipynb, 'b') as rfile:
pass
# Attempt overwrite file using `open` with mode 'x'
with self.assertRaises(OSError):
- with space.open('open_test.ipynb', 'x') as wfile:
+ with space.open(open_test_ipynb, 'x') as wfile:
pass
# Attempt overwrite file using `open` with mode 'w'
- with space.open('open_test.ipynb', 'w') as wfile:
+ with space.open(open_test_ipynb, 'w') as wfile:
wfile.write(open(TEST_DIR / 'test2.ipynb').read())
- txt = space.download_file('open_test.ipynb', encoding='utf-8')
+ txt = space.download_file(open_test_ipynb, encoding='utf-8')
assert txt == open(TEST_DIR / 'test2.ipynb').read()
+ open_raw_test_ipynb = f'open_raw_test_{id(self)}.ipynb'
+
# Test writer without context manager
- wfile = space.open('open_raw_test.ipynb', 'w')
+ wfile = space.open(open_raw_test_ipynb, 'w')
for line in open(TEST_DIR / 'test.ipynb'):
wfile.write(line)
wfile.close()
txt = space.download_file(
- 'open_raw_test.ipynb',
+ open_raw_test_ipynb,
encoding='utf-8',
)
assert txt == open(TEST_DIR / 'test.ipynb').read()
# Test reader without context manager
- rfile = space.open('open_raw_test.ipynb', 'r')
+ rfile = space.open(open_raw_test_ipynb, 'r')
txt = ''
for line in rfile:
txt += line
@@ -1203,15 +1214,18 @@ def test_open(self):
assert txt == open(TEST_DIR / 'test.ipynb').read()
# Cleanup
- space.remove('open_test.ipynb')
- space.remove('open_raw_test.ipynb')
+ space.remove(open_test_ipynb)
+ space.remove(open_raw_test_ipynb)
def test_obj_open(self):
for space in [self.personal_space, self.shared_space]:
+ obj_open_test_ipynb = f'obj_open_test_{id(self)}.ipynb'
+ obj_open_dir = f'obj_open_dir_{id(self)}'
+
# Load test file
f = space.upload_file(
TEST_DIR / 'test.ipynb',
- 'obj_open_test.ipynb',
+ obj_open_test_ipynb,
)
# Read file using `open`
@@ -1220,7 +1234,7 @@ def test_obj_open(self):
# Make sure directories error out
with self.assertRaises(s2.ManagementError):
- space.mkdir('obj_open_dir')
+ space.mkdir(obj_open_dir)
# Write file using `open`
with f.open('w', encoding='utf-8') as wfile:
@@ -1248,7 +1262,7 @@ def test_obj_open(self):
assert txt == open(TEST_DIR / 'test.ipynb').read()
# Cleanup
- space.remove('obj_open_test.ipynb')
+ space.remove(obj_open_test_ipynb)
def test_os_directories(self):
for space in [self.personal_space, self.shared_space]:
diff --git a/singlestoredb/types.py b/singlestoredb/types.py
index 5558c0d57..051d465b4 100644
--- a/singlestoredb/types.py
+++ b/singlestoredb/types.py
@@ -173,17 +173,25 @@ class ColumnType(object):
'DECIMAL', 'DEC', 'FIXED', 'NUMERIC', 0, decimal.Decimal,
)
DEC = FIXED = NUMERIC = DECIMAL
- TINY = TINYINT = BOOL = BOOLEAN = NumberDBAPIType(
- 'TINY', 'TINYINT', 'BOOL', 'BOOLEAN', 1,
+ TINY = TINYINT = BOOL = BOOLEAN = UNSIGNED_TINY = UNSIGNED_TINYINT = NumberDBAPIType(
+ 'TINY', 'TINYINT', 'BOOL', 'BOOLEAN', 'UNSIGNED TINY', 'UNSIGNED TINYINT', 1,
+ )
+ SHORT = SMALLINT = UNSIGNED_SHORT = UNSIGNED_SMALLINT = NumberDBAPIType(
+ 'SMALLINT', 'SHORT', 'UNSIGNED SHORT', 'UNSIGNED SMALLINT', 2,
+ )
+ LONG = INT = UNSIGNED_LONG = UNSIGNED_INT = NumberDBAPIType(
+ 'LONG', 'INT', 'UNSIGNED LONG', 'UNSIGNED INT', 3,
)
- SHORT = SMALLINT = NumberDBAPIType('SMALLINT', 'SHORT', 2)
- LONG = INT = NumberDBAPIType('LONG', 'INT', 3)
FLOAT = NumberDBAPIType('FLOAT', 4)
DOUBLE = REAL = NumberDBAPIType('DOUBLE', 5, float)
NULL = DBAPIType('NULL', 6)
TIMESTAMP = DatetimeDBAPIType('TIMESTAMP', 7)
- LONGLONG = BIGINT = NumberDBAPIType('BIGINT', 'LONGLONG', 8, int)
- MEDIUMINT = INT24 = NumberDBAPIType('MEDIUMINT', 'INT24', 9)
+ LONGLONG = BIGINT = UNSIGNED_LONGLONG = UNSIGNED_BIGINT = NumberDBAPIType(
+ 'BIGINT', 'LONGLONG', 'UNSIGNED LONGLONG', 'UNSIGNED BIGINT', 8, int,
+ )
+ MEDIUMINT = INT24 = UNSIGNED_MEDIUMINT = UNSIGNED_INT24 = NumberDBAPIType(
+ 'MEDIUMINT', 'INT24', 'UNSIGNED MEDIUMINT', 'UNSIGNED INT24', 9,
+ )
DATE = DBAPIType('DATE', 10, datetime.date)
TIME = DBAPIType('TIME', 11, datetime.time)
DATETIME = DatetimeDBAPIType('DATETIME', 12, datetime.datetime)