-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdataset.py
342 lines (259 loc) · 12.1 KB
/
dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# ******************************************************************************************************
# dataset.py - Gbtc
#
# Copyright © 2022, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 08/25/2022 - J. Ritchie Carroll
# Generated original version of source code.
#
# ******************************************************************************************************
from __future__ import annotations
from gsf import Convert, Empty
from .datatable import DataTable
from .datatype import DataType, parse_xsddatatype
from typing import Dict, Iterator, List, Tuple, Union, Optional
from decimal import Decimal
from datetime import datetime
from uuid import UUID
from io import BytesIO, StringIO
from xml.etree import ElementTree
from xml.etree.ElementTree import Element
import numpy as np
XMLSCHEMA_NAMESPACE = "http://www.w3.org/2001/XMLSchema"
"""
Defines schema namespace for the W3C XML Schema Definition Language (XSD) used by STTP metadata tables.
"""
EXT_XMLSCHEMADATA_NAMESPACE = "urn:schemas-microsoft-com:xml-msdata"
"""
Defines extended types for XSD elements, e.g., Guid and expression data types.
"""
def xsdformat(value: datetime) -> str:
"""
Converts date/time value to a string in XSD XML schema format.
"""
return value.isoformat(timespec="milliseconds")[:-1] # 2 digit fractional second
class DataSet:
"""
Represents an in-memory cache of records that is structured similarly to information
defined in a database. The data set object consists of a collection of data table objects.
See https://sttp.github.io/documentation/data-sets/ for more information.
Note that this implementation uses a case-insensitive map for `DataTable` name lookups.
Internally, case-insensitive lookups are accomplished using `str.upper()`.
"""
DEFAULT_NAME = "DataSet"
def __init__(self,
name: str = ...
):
"""
Creates a new `DataSet`.
"""
self._tables: Dict[str, DataTable] = {}
self.name = DataSet.DEFAULT_NAME if name is ... else name
"""
Defines the name of the `DataSet`.
"""
# Case-insensitive get table by name; None returned when value does not exist
def __getitem__(self, key: str) -> DataTable:
return self.table(key)
def __delitem__(self, key: str):
del self._tables[key]
def __len__(self) -> int:
return len(self._tables)
# Case-insensitive table name search
def __contains__(self, item: str) -> bool:
return self[item] is not None
def __iter__(self) -> Iterator[DataTable]:
return iter(self._tables.values())
def clear_tables(self):
"""
Clears the internal table collection.
Any existing tables will be deleted.
"""
self._tables = {}
def add_table(self, table: DataTable):
"""
Adds the specified table to the `DataSet`.
"""
self._tables[table.name.upper()] = table
def table(self, tablename: str) -> Optional[DataTable]:
"""
Gets the `DataTable` for the specified table name if it exists;
otherwise, None is returned. Lookup is case-insensitive.
"""
return self._tables.get(tablename.upper())
def tablenames(self) -> List[str]:
"""
Gets the table names defined in the `DataSet`.
"""
return [table.name for table in self._tables.values()]
def tables(self) -> List[DataTable]:
"""
Gets the `DataTable` instances defined in the `DataSet`.
"""
return list(self._tables.values())
def create_table(self, name: str) -> DataTable:
"""
Creates a new `DataTable` associated with the `DataSet`.
Use `add_table` to add the new table to the `DataSet`.
"""
return DataTable(self, name)
@property
def tablecount(self) -> int:
"""
Gets the total number of tables defined in the `DataSet`.
"""
return len(self._tables)
def remove_table(self, tablename: str) -> bool:
"""
Removes the specified table name from the `DataSet`. Returns
True if table was removed; otherwise, False if it did not exist.
Lookup is case-insensitive.
"""
return self._tables.pop(tablename.upper()) is not None
def __repr__(self):
image: List[str] = [f"{self.name} ["]
for i, table in enumerate(self._tables):
if i > 0:
image.append(", ")
image.append(table.name)
image.append("]")
return "".join(image)
@staticmethod
def from_xml(buffer: Union[str, bytes]) -> Tuple[DataSet, Optional[Exception]]:
"""
Creates a new `DataSet` as read from the XML in the specified buffer.
"""
dataset = DataSet()
err = dataset.parse_xml(buffer)
return dataset, err
def parse_xml(self, buffer: Union[str, bytes]) -> Optional[Exception]:
"""
Loads the `DataSet` from the XML in the specified buffer.
"""
err: Optional[Exception] = None
try:
doc = ElementTree.fromstring(buffer)
except Exception as ex:
err = ex
if err is not None:
return err
bufferio = StringIO(buffer) if isinstance(buffer, str) else BytesIO(buffer)
namespaces: Dict[str, str] = dict(
[node for _, node in ElementTree.iterparse(bufferio, events=["start-ns"])])
if namespaces.get(Empty.STRING) is not None:
del namespaces[Empty.STRING]
return self.parse_xmldoc(doc, namespaces)
def parse_xmldoc(self, root: Element, namespaces: Dict[str, str]) -> Optional[Exception]:
"""
Loads the `DataSet` from an existing root XML document element.
"""
def get_schemaprefix(target_namespace: str):
prefix = ""
for key in namespaces:
if namespaces[key] == target_namespace:
prefix = key
break
if len(prefix) > 0:
prefix += ":"
return prefix
xs = get_schemaprefix(XMLSCHEMA_NAMESPACE)
# Find schema node
schema = root.find(f"{xs}schema", namespaces)
if schema is None:
return RuntimeError("failed to parse DataSet XML: Cannot find schema node")
if (id := schema.attrib.get("id")) is None or id != root.tag:
return RuntimeError(f"failed to parse DataSet XML: Cannot find schema node matching \"{root.tag}\"")
# Populate DataSet schema
self._load_schema(schema, namespaces, xs)
# Populate DataSet records
self._load_records(root)
return None
def _load_schema(self, schema: Element, namespaces: Dict[str, str], xs: str):
EXT_PREFIX = f"{{{EXT_XMLSCHEMADATA_NAMESPACE}}}"
# Find choice elements representing schema table definitions
tablenodes = schema.findall(f"{xs}element/{xs}complexType/{xs}choice/{xs}element", namespaces)
for tablenode in tablenodes:
if (tablename := tablenode.attrib.get("name")) is None:
continue
datatable = self.create_table(tablename)
# Find sequence elements representing schema table field definitions
fieldnodes = tablenode.findall(f"{xs}complexType/{xs}sequence/{xs}element", namespaces)
for fieldnode in fieldnodes:
if (fieldname := fieldnode.attrib.get("name")) is None:
continue
if (typename := fieldnode.attrib.get("type")) is None:
continue
if typename.startswith(xs):
typename = typename[len(xs):]
# Check for extended data type (allows XSD Guid field definitions)
extdatatype = fieldnode.attrib.get(f"{EXT_PREFIX}DataType")
datatype, found = parse_xsddatatype(typename, extdatatype)
# Columns with unsupported XSD data types are skipped
if not found:
continue
# Check for computed expression
expression = fieldnode.attrib.get(f"{EXT_PREFIX}Expression")
datacolumn = datatable.create_column(fieldname, datatype, expression)
datatable.add_column(datacolumn)
self.add_table(datatable)
def _load_records(self, root: Element): # sourcery skip: low-code-quality
# Each root node child that matches a table name represents a record
for record in root:
table = self.table(record.tag)
if table is None:
continue
datarow = table.create_row()
# Each child node of a record represents a field value
for field in record:
column = table.column_byname(field.tag)
if column is None:
continue
index = column.index
datatype = column.datatype
value = field.text
if datatype == DataType.STRING:
datarow[index] = Empty.STRING if value is None else value
elif datatype == DataType.GUID:
datarow[index] = Empty.GUID if value is None else UUID(value)
elif datatype == DataType.DATETIME:
datarow[index] = Empty.DATETIME if value is None else Convert.from_str(value, datetime)
elif datatype == DataType.BOOLEAN:
datarow[index] = False if value is None else bool(value)
elif datatype == DataType.SINGLE:
datarow[index] = Empty.SINGLE if value is None else Convert.from_str(value, np.float32)
elif datatype == DataType.DOUBLE:
datarow[index] = Empty.DOUBLE if value is None else Convert.from_str(value, np.float64)
elif datatype == DataType.DECIMAL:
datarow[index] = Empty.DECIMAL if value is None else Decimal(value)
elif datatype == DataType.INT8:
datarow[index] = Empty.INT8 if value is None else Convert.from_str(value, np.int8)
elif datatype == DataType.INT16:
datarow[index] = Empty.INT16 if value is None else Convert.from_str(value, np.int16)
elif datatype == DataType.INT32:
datarow[index] = Empty.INT32 if value is None else Convert.from_str(value, np.int32)
elif datatype == DataType.INT64:
datarow[index] = Empty.INT64 if value is None else Convert.from_str(value, np.int64)
elif datatype == DataType.UINT8:
datarow[index] = Empty.UINT8 if value is None else Convert.from_str(value, np.uint8)
elif datatype == DataType.UINT16:
datarow[index] = Empty.UINT16 if value is None else Convert.from_str(value, np.uint16)
elif datatype == DataType.UINT32:
datarow[index] = Empty.UINT32 if value is None else Convert.from_str(value, np.uint32)
elif datatype == DataType.UINT64:
datarow[index] = Empty.UINT64 if value is None else Convert.from_str(value, np.uint64)
else:
datarow[index] = None
table.add_row(datarow)