Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add thrift wrapper for meta and storage #31

Merged
merged 17 commits into from
Aug 31, 2020
7 changes: 7 additions & 0 deletions examples/StorageClientExample.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,27 @@ def get_return_cols(space):


if __name__ == '__main__':
# initialize a MetaClient to establish connections with meta servers
jievince marked this conversation as resolved.
Show resolved Hide resolved
meta_client = MetaClient([(sys.argv[1], sys.argv[2])])
code = meta_client.connect()
if code == ErrorCode.E_FAIL_TO_CONNECT:
raise Exception('connect to %s:%d failed' % (sys.argv[1], sys.argv[2]))
# initialize a StorageClient
storage_client = StorageClient(meta_client)
# initialize a ScanEdgeProcessor to process scanned edge data
scan_edge_processor = ScanEdgeProcessor(meta_client)
# initialize a ScanVertexProcessor to process scanned vertex data
scan_vertex_processor = ScanVertexProcessor(meta_client)

space_to_read = sys.argv[3]
# get argument return_cols, which is used in function scan_edge, scan_vertex, scan_part_edge, scan_part_vertex
vertex_return_cols, edge_return_cols = get_return_cols(space_to_read)
all_cols = True

if space_to_read not in meta_client.get_parts_alloc_from_cache().keys():
raise Exception('spaceToRead %s is not found in nebula' % space_to_read)
else:
# scan vertex data
scan_vertex(space_to_read, vertex_return_cols, all_cols)
# scan edge data
scan_edge(space_to_read, edge_return_cols, all_cols)
62 changes: 34 additions & 28 deletions nebula/ngData/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
# This source code is licensed under Apache 2.0 License,
# attached with Common Clause Condition 1.0, found in the LICENSES directory.

from enum import Enum
from common.ttypes import SupportedType
import struct
import six

class PropertyDef:
PropertyType = Enum('PropertyType', ('UNKNOWN', 'BOOL', 'INT', 'VID', 'FLOAT', 'DOUBLE', \
'STRING', 'TIMESTAMP', 'VERTEX_ID', 'TAG_ID', 'SRC_ID', 'EDGE_TYPE', 'EDGE_RANK', 'DST_ID'))

def __init__(self, property_type, name):
self._property_type = property_type
self._name = name
Expand Down Expand Up @@ -40,26 +37,35 @@ def __init__(self, default_properties, properties):

class RowReader:
def __init__(self, schema, schema_version=0):
"""Initalizer
Arguments:
- schema: schema of tag or edge
- schema_version: version of schema
Returns: emtpy
"""
self._schema_version = schema_version
self._defs = []
self._field_num = 0
self._property_name_index = {}
self._property_types = []
self._offset = 0

prop_type_map = {0: PropertyDef.PropertyType.UNKNOWN, 1: PropertyDef.PropertyType.BOOL, 2: PropertyDef.PropertyType.INT,
3: PropertyDef.PropertyType.VID, 4: PropertyDef.PropertyType.FLOAT, 5: PropertyDef.PropertyType.DOUBLE,
6: PropertyDef.PropertyType.STRING, 21: PropertyDef.PropertyType.TIMESTAMP}
idx = 0
for column_def in schema.columns:
property_type = prop_type_map[column_def.type.type] # ColumnDef is in common/ttypes.py
property_type = column_def.type.type
column_name = column_def.name
self._defs.append((column_name, property_type))
self._property_name_index[column_name] = idx
idx += 1
self._field_num = len(self._defs)

def decode_value(self, value, schema_version=None):
""" decode data
Arguments:
- value: data value scanned from the storage server
- schema_version: version of schema
Returns: decoded property values
"""
if schema_version is None:
schema_version = self._schema_version
self._offset = 0
Expand All @@ -86,34 +92,34 @@ def decode_value(self, value, schema_version=None):
for i in range(len(self._defs)):
field = self._defs[i][0]
property_type = self._defs[i][1]
if property_type == PropertyDef.PropertyType.BOOL:
if property_type == SupportedType.BOOL:
properties.append(self.get_bool_property(field, value))
elif property_type == PropertyDef.PropertyType.INT:
elif property_type == SupportedType.INT:
properties.append(self.get_int_property(field, value))
elif property_type == PropertyDef.PropertyType.FLOAT: #unused now
elif property_type == SupportedType.FLOAT: #unused now
properties.append(self.get_float_property(field, value))
elif property_type == PropertyDef.PropertyType.DOUBLE:
elif property_type == SupportedType.DOUBLE:
properties.append(self.get_double_property(field, value))
elif property_type == PropertyDef.PropertyType.STRING:
elif property_type == SupportedType.STRING:
properties.append(self.get_string_property(field, value))
elif property_type == PropertyDef.PropertyType.TIMESTAMP:
elif property_type == SupportedType.TIMESTAMP:
properties.append(self.get_timestamp_property(field, value))
else:
raise Exception('Invalid propertyType in schema: ', property_type)
raise Exception('Unsupported propertyType in schema: ', property_type)

return properties

def edge_key(self, srcId, edgeType, dstId):
def edge_key(self, srcId, edgeName, dstId):
properties = []
properties.append(Property(PropertyDef.PropertyType.SRC_ID, "_srcId", srcId))
properties.append(Property(PropertyDef.PropertyType.EDGE_TYPE, "_edgeType", edgeType))
properties.append(Property(PropertyDef.PropertyType.DST_ID, "_dstId", dstId))
properties.append(Property(SupportedType.VID, "_src", srcId))
properties.append(Property(SupportedType.STRING, "_edge", edgeName))
properties.append(Property(SupportedType.VID, "_dst", dstId))
return properties

def vertex_key(self, vertexId, tagId):
def vertex_key(self, vertexId, tagName):
properties = []
properties.append(Property(PropertyDef.PropertyType.VERTEX_ID, "_vertexId", vertexId))
properties.append(Property(PropertyDef.PropertyType.TAG_ID, "_tagId", tagId))
properties.append(Property(SupportedType.VID, "_vid", vertexId))
properties.append(Property(SupportedType.STRING, "_tag", tagName))
return properties

def get_property(self, row, name):
Expand All @@ -132,26 +138,26 @@ def get_bool_property(self, name, value):
else:
val = value[self._offset] != 0x00
self._offset += 1
return Property(PropertyDef.PropertyType.BOOL, name, val)
return Property(SupportedType.BOOL, name, val)

def get_int_property(self, name, value):
val = self.read_compressed_int(value)
return Property(PropertyDef.PropertyType.INT, name, val) #### 字节流解析出data
return Property(SupportedType.INT, name, val) #### 字节流解析出data

def get_timestamp_property(self, name, value):
val = self.read_compressed_int(value)
# val = datetime.fromtimestamp(val)
return Property(PropertyDef.PropertyType.TIMESTAMP, name, val)
return Property(SupportedType.TIMESTAMP, name, val)

def get_float_property(self, name, value):
val = struct.unpack_from('<f', value, self._offset)[0]
self._offset += 4
return Property(PropertyDef.PropertyType.FLOAT, name, val)
return Property(SupportedType.FLOAT, name, val)

def get_double_property(self, name, value):
val = struct.unpack_from('<d', value, self._offset)[0]
self._offset += 8
return Property(PropertyDef.PropertyType.DOUBLE, name, val)
return Property(SupportedType.DOUBLE, name, val)

def get_string_property(self, name, value):
strLen = self.read_compressed_int(value)
Expand All @@ -160,7 +166,7 @@ def get_string_property(self, name, value):
else:
val = str(value[self._offset:self._offset+strLen], encoding='utf-8')
self._offset += strLen
return Property(PropertyDef.PropertyType.STRING, name, val)
return Property(SupportedType.STRING, name, val)

def read_compressed_int(self, value):
shift = 0
Expand Down
Loading