Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/nebulagraph_python/decoder/decode_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,22 @@ def is_null_bit_map_all_set(vector: NestedVector) -> bool:
def bytes_to_sized_string(data: bytes, start_pos: int, byte_order: ByteOrder) -> str:
"""Match Java's DecodeUtils.bytesToSizedString"""
length = bytes_to_int16(
data[start_pos : start_pos + ELEMENT_NUMBER_SIZE_FOR_ANY_VALUE],
data[start_pos: start_pos + ELEMENT_NUMBER_SIZE_FOR_ANY_VALUE],
byte_order,
)
start_pos += ELEMENT_NUMBER_SIZE_FOR_ANY_VALUE

# Use charset-based decoding instead of character by character
str_bytes = data[start_pos : start_pos + length]
str_bytes = data[start_pos: start_pos + length]
return str_bytes.decode(charset)

def mod_math(a, b):
if b == 0:
raise RuntimeError("cannot be zero")

if (a >= 0) == (b >= 0):
trunc_div = a // b
else:
trunc_div = -(abs(a) // abs(b))
remainder = a - trunc_div * b
return remainder
114 changes: 59 additions & 55 deletions src/nebulagraph_python/decoder/value_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
bytes_to_uint16,
bytes_to_uint32,
charset,
mod_math,
)
from nebulagraph_python.decoder.size_constant import (
ANY_HEADER_SIZE,
Expand Down Expand Up @@ -135,20 +136,20 @@ class ValueParser:
byte_order: ByteOrder

def __init__(
self,
graph_schemas: ResultGraphSchemas,
timezone_offset: int,
byte_order: ByteOrder,
self,
graph_schemas: ResultGraphSchemas,
timezone_offset: int,
byte_order: ByteOrder,
):
self.graph_schemas = graph_schemas
self.timezone_offset = timezone_offset
self.byte_order = byte_order

def decode_value_wrapper(
self,
vector: VectorWrapper,
data_type: DataType,
row_idx: int,
self,
vector: VectorWrapper,
data_type: DataType,
row_idx: int,
) -> ValueWrapper:
"""Decode value and wrap in ValueWrapper"""
value = self._decode_value(vector, data_type, row_idx)
Expand All @@ -159,10 +160,10 @@ def decode_value_wrapper(
return ValueWrapper(value, data_type.get_type())

def _decode_value(
self,
vector: VectorWrapper,
data_type: DataType,
row_idx: int,
self,
vector: VectorWrapper,
data_type: DataType,
row_idx: int,
) -> Any:
"""Main decode method matching Java's decodeValue"""
# Check if value at index is null
Expand Down Expand Up @@ -199,10 +200,10 @@ def _decode_value(
return value

def _decode_flat_value(
self,
vector: VectorWrapper,
data_type: DataType,
row_idx: int,
self,
vector: VectorWrapper,
data_type: DataType,
row_idx: int,
) -> Any:
"""Decode flat vector value at given row index"""
vector_data = vector.get_vector_data()
Expand All @@ -217,15 +218,18 @@ def _decode_flat_value(

if column_type in [ColumnType.INT16, ColumnType.UINT16]:
value_data = self._get_sub_bytes(vector_data, INT16_SIZE, row_idx)
return bytes_to_int16(value_data, self.byte_order) if column_type == ColumnType.INT16 else bytes_to_uint16(value_data, self.byte_order)
return bytes_to_int16(value_data, self.byte_order) if column_type == ColumnType.INT16 else bytes_to_uint16(
value_data, self.byte_order)

if column_type in [ColumnType.INT32, ColumnType.UINT32]:
value_data = self._get_sub_bytes(vector_data, INT32_SIZE, row_idx)
return bytes_to_int32(value_data, self.byte_order) if column_type == ColumnType.INT32 else bytes_to_uint32(value_data, self.byte_order)
return bytes_to_int32(value_data, self.byte_order) if column_type == ColumnType.INT32 else bytes_to_uint32(
value_data, self.byte_order)

if column_type in [ColumnType.INT64, ColumnType.UINT64]:
value_data = self._get_sub_bytes(vector_data, INT64_SIZE, row_idx)
return bytes_to_int64(value_data, self.byte_order) if column_type == ColumnType.INT64 else bytes_to_int64(value_data, self.byte_order) & 0xFFFFFFFFFFFFFFFF
return bytes_to_int64(value_data, self.byte_order) if column_type == ColumnType.INT64 else bytes_to_int64(
value_data, self.byte_order) & 0xFFFFFFFFFFFFFFFF

if column_type == ColumnType.FLOAT32:
value_data = self._get_sub_bytes(vector_data, FLOAT_SIZE, row_idx)
Expand Down Expand Up @@ -376,8 +380,8 @@ def _decode_flat_value(
)
node_header = NodeHeader(node_header_binary, self.byte_order)
if (
node_header.graph_id not in node_prop_types
or node_header.node_type_id not in node_prop_types[node_header.graph_id]
node_header.graph_id not in node_prop_types
or node_header.node_type_id not in node_prop_types[node_header.graph_id]
):
raise RuntimeError(
f"Value type for NODE does not contain graphId {node_header.graph_id} "
Expand Down Expand Up @@ -429,8 +433,8 @@ def _decode_flat_value(
no_directed_type_id = edge_header.edge_type_id & 0x3FFFFFFF

if (
edge_header.graph_id not in edge_prop_types
or no_directed_type_id not in edge_prop_types[edge_header.graph_id]
edge_header.graph_id not in edge_prop_types
or no_directed_type_id not in edge_prop_types[edge_header.graph_id]
):
raise RuntimeError(
f"Value type for EDGE does not contain graphId {edge_header.graph_id} "
Expand Down Expand Up @@ -574,7 +578,7 @@ def _decode_flat_value(
for i in range(dimension):
start = offset + i * FLOAT32_SIZE
values[i] = bytes_to_float(
vector_view[start : start + FLOAT32_SIZE].tobytes(),
vector_view[start: start + FLOAT32_SIZE].tobytes(),
self.byte_order,
)

Expand Down Expand Up @@ -629,40 +633,40 @@ def bytes_to_string(self, string_header: bytes, vector: NestedVector) -> str:
# If string is small enough, read directly from header
if string_value_length <= STRING_MAX_VALUE_LENGTH_IN_HEADER:
return string_header[
STRING_VALUE_LENGTH_SIZE : STRING_VALUE_LENGTH_SIZE
+ string_value_length
STRING_VALUE_LENGTH_SIZE: STRING_VALUE_LENGTH_SIZE
+ string_value_length
].decode(charset)

# Get chunk index and offset for longer strings
chunk_index = bytes_to_int32(
string_header[
CHUNK_INDEX_START_POSITION_IN_STRING_HEADER : CHUNK_INDEX_START_POSITION_IN_STRING_HEADER
+ CHUNK_INDEX_LENGTH_IN_STRING_HEADER
CHUNK_INDEX_START_POSITION_IN_STRING_HEADER: CHUNK_INDEX_START_POSITION_IN_STRING_HEADER
+ CHUNK_INDEX_LENGTH_IN_STRING_HEADER
],
self.byte_order,
)

chunk_offset = bytes_to_int32(
string_header[
CHUNK_OFFSET_START_POSITION_IN_STRING_HEADER : CHUNK_OFFSET_START_POSITION_IN_STRING_HEADER
+ CHUNK_OFFSET_LENGTH_IN_STRING_HEADER
CHUNK_OFFSET_START_POSITION_IN_STRING_HEADER: CHUNK_OFFSET_START_POSITION_IN_STRING_HEADER
+ CHUNK_OFFSET_LENGTH_IN_STRING_HEADER
],
self.byte_order,
)

# Get string data from chunk
string_chunk_vector = vector.nested_vectors[chunk_index]
value_data = string_chunk_vector.vector_data[
chunk_offset : chunk_offset + string_value_length
chunk_offset: chunk_offset + string_value_length
]
return value_data.decode(charset)

def bytes_to_date(self, data: bytes) -> datetime.date:
"""Convert bytes to date"""
year = bytes_to_uint16(data[0:YEAR_SIZE], self.byte_order)
month = bytes_to_uint8(data[YEAR_SIZE : YEAR_SIZE + MONTH_SIZE])
month = bytes_to_uint8(data[YEAR_SIZE: YEAR_SIZE + MONTH_SIZE])
day = bytes_to_uint8(
data[YEAR_SIZE + MONTH_SIZE : YEAR_SIZE + MONTH_SIZE + DAY_SIZE],
data[YEAR_SIZE + MONTH_SIZE: YEAR_SIZE + MONTH_SIZE + DAY_SIZE],
)
return datetime.date(year, month, day)

Expand Down Expand Up @@ -693,8 +697,8 @@ def bytes_to_zoned_time(self, data: bytes) -> datetime.time:
# Create base time and add timezone offset minutes
base_time = datetime.time(hour % 24, minute, second, microsecond)
adjusted_time = (
datetime.datetime.combine(datetime.date.today(), base_time)
+ datetime.timedelta(minutes=current_offset)
datetime.datetime.combine(datetime.date.today(), base_time)
+ datetime.timedelta(minutes=current_offset)
).time()

# Create timezone with offset
Expand Down Expand Up @@ -745,14 +749,14 @@ def bytes_to_duration(self, data: bytes) -> "NDuration":
if is_month_based:
# For month-based duration
year = int(duration_value / 12)
month = int(duration_value % 12)
month = int(mod_math(duration_value, 12))
else:
# For time-based duration
day = int (duration_value / MICRO_SECONDS_OF_DAY)
hour = int (duration_value % MICRO_SECONDS_OF_DAY / MICRO_SECONDS_OF_HOUR)
minute = int (duration_value % MICRO_SECONDS_OF_HOUR / MICRO_SECONDS_OF_MINUTE)
second = int ((duration_value % MICRO_SECONDS_OF_MINUTE) / MICRO_SECONDS_OF_SECOND)
micro_sec = int (duration_value % MICRO_SECONDS_OF_SECOND)
day = int(duration_value / MICRO_SECONDS_OF_DAY)
hour = int(mod_math(duration_value, MICRO_SECONDS_OF_DAY) / MICRO_SECONDS_OF_HOUR)
minute = int(mod_math(duration_value, MICRO_SECONDS_OF_HOUR) / MICRO_SECONDS_OF_MINUTE)
second = int(mod_math(duration_value, MICRO_SECONDS_OF_MINUTE) / MICRO_SECONDS_OF_SECOND)
micro_sec = int(mod_math(duration_value, MICRO_SECONDS_OF_SECOND))

return NDuration(
is_month_based=is_month_based,
Expand All @@ -766,10 +770,10 @@ def bytes_to_duration(self, data: bytes) -> "NDuration":
)

def bytes_to_any(
self,
value: bytes,
vector: VectorWrapper,
row_idx: int,
self,
value: bytes,
vector: VectorWrapper,
row_idx: int,
) -> "AnyValue":
"""Convert bytes to AnyValue for flat vector"""
# Get data type from first vector wrapper
Expand Down Expand Up @@ -805,7 +809,7 @@ def bytes_to_any(
if value_type.is_composite():
# Handle composite types
sub_vector = vector.get_vector_wrapper(any_header.chunk_index)
reader = BytesReader(sub_vector.get_vector_data()[any_header.offset :])
reader = BytesReader(sub_vector.get_vector_data()[any_header.offset:])
obj = self._decode_composite_value(reader, value_type)

return AnyValue(obj, value_type)
Expand All @@ -828,9 +832,9 @@ def bytes_to_const_any(self, reader: BytesReader) -> "AnyValue":
return AnyValue(obj, column_type)

def bytes_basic_to_object(
self,
reader: BytesReader,
column_type: ColumnType,
self,
reader: BytesReader,
column_type: ColumnType,
) -> Any:
"""Convert bytes to basic type object"""
obj = None
Expand Down Expand Up @@ -897,9 +901,9 @@ def string_to_decimal(self, decimal_str: str) -> decimal.Decimal:
return decimal.Decimal(decimal_str)

def _decode_composite_value(
self,
reader: BytesReader,
column_type: ColumnType,
self,
reader: BytesReader,
column_type: ColumnType,
) -> Any:
"""Decode composite types from binary reader"""
if column_type == ColumnType.NULL:
Expand Down Expand Up @@ -1166,9 +1170,9 @@ def decode_value_type(self, reader: BytesReader) -> DataType:
raise RuntimeError(f"unsupported type: {column_type}")

def _get_property_name_and_type_from_value_type(
self,
reader: BytesReader,
type_id_size: int,
self,
reader: BytesReader,
type_id_size: int,
) -> Dict[int, Dict[int, Dict[str, DataType]]]:
"""Get property name and type mapping for nodes/edges
Returns mapping: graph_id -> (type_id -> (prop_name -> prop_type))
Expand Down