Skip to content

Commit

Permalink
support streaming insert;
Browse files Browse the repository at this point in the history
block in batch can be replaced by data generated outside.
  • Loading branch information
leo-cai-timeplus committed Oct 26, 2023
1 parent 86e77b8 commit 3f3b87d
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 0 deletions.
45 changes: 45 additions & 0 deletions conn_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package proton

import (
"context"
"errors"
"fmt"
"os"
"regexp"
Expand Down Expand Up @@ -153,6 +154,50 @@ func (b *batch) Send() (err error) {
return nil
}

func (b *batch) Close() (err error) {
return b.Send()
}

func (b *batch) StreamingSend() (err error) {
if b.sent {
return ErrBatchAlreadySent
}
if b.err != nil {
return b.err
}
if b.block.Rows() != 0 {
if err = b.conn.sendData(b.block, ""); err != nil {
return err
}
}
if err = b.conn.encoder.Flush(); err != nil {
return err
}
if err = b.block.Clear(); err != nil {
return err
}
return nil
}

func (b *batch) ReplaceBy(cols ...column.Interface) (err error) {
if len(b.block.Columns) != len(cols) {
return errors.New(fmt.Sprintf("colomn number is %d, not %d", len(b.block.Columns), len(cols)))
}
for i := 0; i < len(cols); i++ {
if b.block.Columns[i].Type() != cols[i].Type() {
return errors.New(fmt.Sprintf("type of colomn[%d] is %s, not %s", i, b.block.Columns[i].Type(), cols[i].Type()))
}
}
rows := cols[0].Rows()
for i := 1; i < len(cols); i++ {
if rows != cols[i].Rows() {
return errors.New("cols with different length")
}
}
b.block.Columns = cols
return nil
}

type batchColumn struct {
err error
batch *batch
Expand Down
69 changes: 69 additions & 0 deletions examples/streaming/batch_replace/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"fmt"
"github.com/timeplus-io/proton-go-driver/v2"
"github.com/timeplus-io/proton-go-driver/v2/lib/column"
"log"
"time"
)

func BatchReplaceExample() {
var (
ctx = context.Background()
conn, err = proton.Open(&proton.Options{
Addr: []string{"127.0.0.1:8463"},
Auth: proton.Auth{
Database: "default",
Username: "default",
Password: "",
},
//Debug: true,
DialTimeout: time.Second,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: time.Hour,
})
)
if err != nil {
log.Fatal(err)
}
if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil {
log.Fatal(err)
}
err = conn.Exec(ctx, `
CREATE STREAM IF NOT EXISTS example (
Col1 uint8
, Col2 string
)
`)
if err != nil {
log.Fatal(err)
}
const rows = 20
var (
col1 column.UInt8 = make([]uint8, rows)
col2 column.String = make([]string, rows)
)
for i := 0; i < rows; i++ {
col1[i] = uint8(i)
col2[i] = fmt.Sprintf("num%03d", i)
}
batch, err := conn.PrepareBatch(ctx, "INSERT INTO example (* except _tp_time)")
err = batch.ReplaceBy(
&col1,
&col2,
)
if err != nil {
return
}
err = batch.Send()
if err != nil {
return
}
}

func main() {
BatchReplaceExample()
}
72 changes: 72 additions & 0 deletions examples/streaming/streaming_send/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/timeplus-io/proton-go-driver/v2"
)

func example() error {
var (
ctx = context.Background()
conn, err = proton.Open(&proton.Options{
Addr: []string{"127.0.0.1:8463"},
Auth: proton.Auth{
Database: "default",
Username: "default",
Password: "",
},
//Debug: true,
DialTimeout: time.Second,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: time.Hour,
})
)
if err != nil {
return err
}
if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil {
return err
}
err = conn.Exec(ctx, `
CREATE STREAM IF NOT EXISTS example (
Col1 uint8
, Col2 string
)
`)
if err != nil {
return err
}

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example (* except _tp_time)")
if err != nil {
return err
}
for i := 0; i < 10; i++ {
for j := 0; j < 10; j++ {
err := batch.Append(
uint8(i*10+j),
fmt.Sprintf("num_%d_%d", j, i),
)
if err != nil {
return err
}
if err := batch.StreamingSend(); err != nil {
return err
}
}
}
return batch.Close()
}

func main() {
start := time.Now()
if err := example(); err != nil {
log.Fatal(err)
}
fmt.Println(time.Since(start))
}
4 changes: 4 additions & 0 deletions lib/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package driver

import (
"context"
"github.com/timeplus-io/proton-go-driver/v2/lib/column"
"reflect"

"github.com/timeplus-io/proton-go-driver/v2/lib/proto"
Expand Down Expand Up @@ -74,6 +75,9 @@ type (
AppendStruct(v interface{}) error
Column(int) BatchColumn
Send() error
Close() error
ReplaceBy(cols ...column.Interface) error
StreamingSend() error
}
BatchColumn interface {
Append(interface{}) error
Expand Down
10 changes: 10 additions & 0 deletions lib/proto/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ func (b *Block) AddColumn(name string, ct column.Type) error {
return nil
}

func (b *Block) Clear() (err error) {
for i := range b.Columns {
b.Columns[i], err = b.Columns[i].Type().Column()
if err != nil {
return
}
}
return nil
}

func (b *Block) Append(v ...interface{}) (err error) {
columns := b.Columns
if len(columns) != len(v) {
Expand Down

0 comments on commit 3f3b87d

Please sign in to comment.