From a0828eba6143c450ca77f60156ab19c95b1e66de Mon Sep 17 00:00:00 2001 From: Konstantin Prokopenko Date: Tue, 30 Sep 2025 17:45:41 +0300 Subject: [PATCH] Added recipe for YDB Go SDK BulkUpsert Apache Arrow --- .../en/core/recipes/ydb-sdk/bulk-upsert.md | 83 ++++++++++++++++++ .../ru/core/recipes/ydb-sdk/bulk-upsert.md | 84 +++++++++++++++++++ 2 files changed, 167 insertions(+) diff --git a/ydb/docs/en/core/recipes/ydb-sdk/bulk-upsert.md b/ydb/docs/en/core/recipes/ydb-sdk/bulk-upsert.md index d6ed1eb1c5ed..4ea322ffb484 100644 --- a/ydb/docs/en/core/recipes/ydb-sdk/bulk-upsert.md +++ b/ydb/docs/en/core/recipes/ydb-sdk/bulk-upsert.md @@ -138,6 +138,89 @@ Below are code examples showing the {{ ydb-short-name }} SDK built-in tools for {% endcut %} + {% cut "Bulk upsert `Apache Arrow` data" %} + + In the following example, the [arrow package](https://pkg.go.dev/github.com/apache/arrow-go/v18/arrow) is used to prepare the data. + + ```go + package main + + import ( + "bytes" + "context" + "fmt" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + ) + + func main() { + ctx := context.Background() + db, err := ydb.Open(ctx, + os.Getenv("YDB_CONNECTION_STRING"), + ydb.WithAccessTokenCredentials(os.Getenv("YDB_TOKEN")), + ) + if err != nil { + panic(err) + } + defer db.Close(ctx) // cleanup resources + + mem := memory.NewGoAllocator() + + schema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64}, + {Name: "val", Type: arrow.BinaryTypes.String}, + }, nil) + + b := array.NewRecordBuilder(mem, schema) + defer b.Release() + + b.Field(0).(*array.Int64Builder).AppendValues( + []int64{123, 234}, nil) + + b.Field(1).(*array.StringBuilder).AppendValues( + []string{"data1", "data2"}, nil) + + rec := b.NewRecordBatch() + defer rec.Release() + + schemaPayload := ipc.GetSchemaPayload(rec.Schema(), mem) + defer schemaPayload.Release() + + dataPayload, err := ipc.GetRecordBatchPayload(rec) + if err != nil { + panic(err) + } + defer dataPayload.Release() + + var schemaBuf bytes.Buffer + _, err = schemaPayload.WritePayload(&schemaBuf) + if err != nil { + panic(err) + } + + var dataBuf bytes.Buffer + _, err = dataPayload.WritePayload(&dataBuf) + if err != nil { + panic(err) + } + + err = db.Table().BulkUpsert(ctx, "/local/bulk_upsert_example", table.BulkUpsertDataArrow( + dataBuf.Bytes(), + table.WithArrowSchema(schemaBuf.Bytes()), // schema is required + )) + if err != nil { + fmt.Printf("unexpected error: %v", err) + } + } + ``` + + {% endcut %} + - Go (database/sql) The implementation of {{ ydb-short-name }} `database/sql` doesn't support bulk nontransactional upsert of data. diff --git a/ydb/docs/ru/core/recipes/ydb-sdk/bulk-upsert.md b/ydb/docs/ru/core/recipes/ydb-sdk/bulk-upsert.md index d847415f973c..c04996f10f64 100644 --- a/ydb/docs/ru/core/recipes/ydb-sdk/bulk-upsert.md +++ b/ydb/docs/ru/core/recipes/ydb-sdk/bulk-upsert.md @@ -137,6 +137,90 @@ {% endcut %} + {% cut "Пакетная вставка `Apache Arrow`" %} + + В следующем примере для подготовки данных используется пакет [arrow](https://pkg.go.dev/github.com/apache/arrow-go/v18/arrow). + + ```go + package main + + import ( + "bytes" + "context" + "fmt" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/ipc" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + ) + + func main() { + ctx := context.Background() + db, err := ydb.Open(ctx, + os.Getenv("YDB_CONNECTION_STRING"), + ydb.WithAccessTokenCredentials(os.Getenv("YDB_TOKEN")), + ) + if err != nil { + panic(err) + } + defer db.Close(ctx) // cleanup resources + + mem := memory.NewGoAllocator() + + schema := arrow.NewSchema([]arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64}, + {Name: "val", Type: arrow.BinaryTypes.String}, + }, nil) + + b := array.NewRecordBuilder(mem, schema) + defer b.Release() + + b.Field(0).(*array.Int64Builder).AppendValues( + []int64{123, 234}, nil) + + b.Field(1).(*array.StringBuilder).AppendValues( + []string{"data1", "data2"}, nil) + + rec := b.NewRecordBatch() + defer rec.Release() + + schemaPayload := ipc.GetSchemaPayload(rec.Schema(), mem) + defer schemaPayload.Release() + + dataPayload, err := ipc.GetRecordBatchPayload(rec) + if err != nil { + panic(err) + } + defer dataPayload.Release() + + var schemaBuf bytes.Buffer + _, err = schemaPayload.WritePayload(&schemaBuf) + if err != nil { + panic(err) + } + + var dataBuf bytes.Buffer + _, err = dataPayload.WritePayload(&dataBuf) + if err != nil { + panic(err) + } + + err = db.Table().BulkUpsert(ctx, "/local/bulk_upsert_example", table.BulkUpsertDataArrow( + dataBuf.Bytes(), + table.WithArrowSchema(schemaBuf.Bytes()), // schema is required + )) + if err != nil { + fmt.Printf("unexpected error: %v", err) + } + } + + ``` + + {% endcut %} + - Go (database/sql) Реализация `database/sql` драйвера для {{ ydb-short-name }} не поддерживает нетранзакционную пакетную вставку данных.