Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/arrow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# YDB Python SDK Example: arrow

Example code demonstrating how to get query execution results in Arrow format.

## Requirements

- `pyarrow` version 5.0.0 or higher
- `EnableArrowResultSetFormat` feature flag enabled on the YDB server

See the top-level [README.md](../README.md) file for instructions on running this example.
42 changes: 42 additions & 0 deletions examples/arrow/basic_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import ydb
import pyarrow as pa


def main():
driver_config = ydb.DriverConfig(
endpoint="grpc://localhost:2136",
database="/local",
# credentials=ydb.credentials_from_env_variables(),
# root_certificates=ydb.load_ydb_root_certificate(),
)

try:
driver = ydb.Driver(driver_config)
driver.wait(timeout=5)
except TimeoutError:
raise RuntimeError("Connect failed to YDB")

pool = ydb.QuerySessionPool(driver)

query = """
SELECT * FROM example ORDER BY key LIMIT 100;
"""

format_settings = ydb.ArrowFormatSettings(
compression_codec=ydb.ArrowCompressionCodec(ydb.ArrowCompressionCodecType.ZSTD, 10)
)

result = pool.execute_with_retries(
query,
result_set_format=ydb.QueryResultSetFormat.ARROW,
arrow_format_settings=format_settings,
)

for result_set in result:
schema: pa.Schema = pa.ipc.read_schema(pa.py_buffer(result_set.arrow_format_meta.schema))
batch: pa.RecordBatch = pa.ipc.read_record_batch(pa.py_buffer(result_set.data), schema)
print(f"Record batch with {batch.num_rows} rows and {batch.num_columns} columns")


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion ydb-api-protos
10 changes: 9 additions & 1 deletion ydb/_grpc/grpcwrapper/ydb_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import typing
from typing import Optional


# Workaround for good IDE and universal for runtime
if typing.TYPE_CHECKING:
from ..v4.protos import ydb_query_pb2
Expand Down Expand Up @@ -168,15 +167,24 @@ class ExecuteQueryRequest(IToProto):
exec_mode: int
parameters: dict
stats_mode: int
schema_inclusion_mode: int
result_set_format: int
arrow_format_settings: Optional[public_types.ArrowFormatSettings]

def to_proto(self) -> ydb_query_pb2.ExecuteQueryRequest:
tx_control = self.tx_control.to_proto() if self.tx_control is not None else self.tx_control
arrow_format_settings = (
self.arrow_format_settings.to_proto() if self.arrow_format_settings is not None else None
)
return ydb_query_pb2.ExecuteQueryRequest(
session_id=self.session_id,
tx_control=tx_control,
query_content=self.query_content.to_proto(),
exec_mode=self.exec_mode,
stats_mode=self.stats_mode,
schema_inclusion_mode=self.schema_inclusion_mode,
result_set_format=self.result_set_format,
arrow_format_settings=arrow_format_settings,
concurrent_result_sets=self.concurrent_result_sets,
parameters=convert.query_parameters_to_pb(self.parameters),
)
55 changes: 52 additions & 3 deletions ydb/_grpc/grpcwrapper/ydb_query_public_types.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import abc
import enum
import typing

from .common_utils import IToProto
from .common_utils import IFromProto, IToProto

# Workaround for good IDE and universal for runtime
if typing.TYPE_CHECKING:
from ..v4.protos import ydb_query_pb2
from ..v4.protos import ydb_query_pb2, ydb_formats_pb2
else:
from ..common.protos import ydb_query_pb2
from ..common.protos import ydb_query_pb2, ydb_formats_pb2


class BaseQueryTxMode(IToProto):
Expand Down Expand Up @@ -93,3 +94,51 @@ def name(self):

def to_proto(self) -> ydb_query_pb2.StaleModeSettings:
return ydb_query_pb2.StaleModeSettings()


class ArrowCompressionCodecType(enum.IntEnum):
UNSPECIFIED = 0
NONE = 1
ZSTD = 2
LZ4_FRAME = 3


class ArrowCompressionCodec(IToProto):
"""Compression codec for Arrow format result sets."""

def __init__(
self, codec_type: typing.Optional[ArrowCompressionCodecType] = None, level: typing.Optional[int] = None
):
self.type = codec_type if codec_type is not None else ArrowCompressionCodecType.UNSPECIFIED
self.level = level

def to_proto(self):
return ydb_formats_pb2.ArrowFormatSettings.CompressionCodec(type=self.type, level=self.level)


class ArrowFormatSettings(IToProto):
"""Settings for Arrow format result sets."""

def __init__(self, compression_codec: typing.Optional[ArrowCompressionCodec] = None):
self.compression_codec = compression_codec

def to_proto(self):
settings = ydb_formats_pb2.ArrowFormatSettings()
if self.compression_codec is not None:
codec_proto = self.compression_codec.to_proto()
settings.compression_codec.CopyFrom(codec_proto)
return settings


class ArrowFormatMeta(IFromProto):
"""Metadata for Arrow format result sets containing the schema."""

def __init__(self, schema: bytes):
self.schema = schema

@classmethod
def from_proto(cls, proto_message):
return cls(schema=proto_message.schema)

def __repr__(self):
return f"ArrowFormatMeta(schema_size={len(self.schema)} bytes)"
176 changes: 175 additions & 1 deletion ydb/_grpc/v3/protos/ydb_formats_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading