Skip to content

Commit

Permalink
Merge pull request #26 from usedatabrew/feat/dat-372
Browse files Browse the repository at this point in the history
feat(DAT-372): added clickhouse as a sink connector
  • Loading branch information
le-vlad committed Mar 20, 2024
2 parents d3fd4d7 + dc2f23a commit 53e2325
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 36 deletions.
31 changes: 20 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ replace github.com/coreos/bbolt v1.3.4 => go.etcd.io/bbolt v1.3.4
replace go.etcd.io/bbolt v1.3.4 => github.com/coreos/bbolt v1.3.4

require (
github.com/ClickHouse/clickhouse-go/v2 v2.20.0
github.com/InfluxCommunity/influxdb3-go v0.4.0
github.com/apache/arrow/go/v14 v14.0.1
github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df
Expand Down Expand Up @@ -41,9 +42,10 @@ require (

require (
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/ClickHouse/ch-go v0.61.3 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/apache/arrow/go/v13 v13.0.0 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/charmbracelet/lipgloss v0.9.1 // indirect
Expand All @@ -53,14 +55,16 @@ require (
github.com/fatih/structs v1.1.0 // indirect
github.com/frankban/quicktest v1.14.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
github.com/ivancorrales/knoa v0.0.2 // indirect
Expand All @@ -69,7 +73,7 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/juju/errors v0.0.0-20170703010042-c7d06af17c68 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/lib/pq v1.10.9 // indirect
Expand All @@ -83,16 +87,19 @@ require (
github.com/muesli/termenv v0.15.2 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 // indirect
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7 // indirect
github.com/pingcap/tidb/parser v0.0.0-20221126021158-6b02a5d8ba7d // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/rabbitmq/amqp091-go v1.7.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect
github.com/spf13/pflag v1.0.5 // indirect
Expand All @@ -105,15 +112,17 @@ require (
github.com/zeebo/xxh3 v1.0.2 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
72 changes: 49 additions & 23 deletions go.sum

Large diffs are not rendered by default.

46 changes: 44 additions & 2 deletions internal/helper/arrow_type_helper.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package helper

import (
"reflect"
"strconv"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/charmbracelet/log"
cqtypes "github.com/cloudquery/plugin-sdk/v4/types"
"reflect"
"strconv"
)

func IsPrimitiveType(fieldType string) bool {
Expand Down Expand Up @@ -229,3 +230,44 @@ func ArrowToCockroach(t arrow.DataType) string {
return "text"
}
}

func ArrowToClickHouse(t arrow.DataType) string {
switch t.(type) {
case *arrow.BooleanType:
return "bool"
case *arrow.Uint8Type:
return "UInt8"
case *arrow.Uint16Type:
return "UInt16"
case *arrow.Uint32Type:
return "UInt32"
case *arrow.Uint64Type:
return "UInt64"
case *arrow.Int8Type:
return "Int8"
case *arrow.Int16Type:
return "Int16"
case *arrow.Int32Type:
return "Int32"
case *arrow.Int64Type:
return "Int64"
case *arrow.Float32Type:
return "Float32"
case *arrow.Float64Type:
return "Float64"
case arrow.DecimalType:
return "Decimal"
case *arrow.StringType:
return "String"
case *arrow.Date32Type:
return "Date32"
case *arrow.Date64Type:
return "Date64"
case *arrow.TimestampType:
return "DateTime"
case *cqtypes.JSONType:
return "JSON"
default:
return "String"
}
}
9 changes: 9 additions & 0 deletions internal/sinks/clickhouse/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package clickhouse

type Config struct {
Host string `json:"host" yaml:"host"`
Port int `json:"port" yaml:"port"`
Database string `json:"database" yaml:"database"`
User string `json:"user" yaml:"user"`
Password string `json:"password" yaml:"password"`
}
65 changes: 65 additions & 0 deletions internal/sinks/clickhouse/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package clickhouse

import (
"fmt"
"slices"
"strings"

"github.com/usedatabrew/blink/internal/helper"
"github.com/usedatabrew/blink/internal/schema"
)

func generateCreateTableStatement(table string, columns []schema.Column) string {
statement := fmt.Sprintf("CREATE TABLE IF NOT EXISTS \"%s\" (\n", table)
var primaryKeys []string

for idx, column := range columns {
statement += fmt.Sprintf(" %s %s", column.Name, helper.ArrowToClickHouse(helper.MapPlainTypeToArrow(column.DatabrewType)))
if column.PK {
primaryKeys = append(primaryKeys, column.Name)
}
if !column.Nullable {
statement += " NOT NULL"
}
if idx < len(columns)-1 {
statement += ",\n"
}
}

statement += "\n)"

statement += " ENGINE = MergeTree()\n"
statement += fmt.Sprintf("PRIMARY KEY(%v)", primaryKeys)

return statement
}

func generateInsertStatement(table schema.StreamSchema) string {
columnNames := getColumnNames(table.Columns)
valuesPlaceholder := getValuesPlaceholder(len(table.Columns))

return fmt.Sprintf("INSERT INTO \"%s\" (%s) VALUES %s;", table.StreamName, columnNames, valuesPlaceholder)
}

func getColumnNames(columns []schema.Column) string {
return strings.Join(getColumnNamesSorted(columns), ", ")
}

func getColumnNamesSorted(columns []schema.Column) []string {
var columnNames []string
for _, column := range columns {
columnNames = append(columnNames, column.Name)
}

slices.Sort(columnNames)

return columnNames
}

func getValuesPlaceholder(numColumns int) string {
var valuePlaceholders []string
for i := 0; i < numColumns; i++ {
valuePlaceholders = append(valuePlaceholders, fmt.Sprintf("$%d", i+1))
}
return fmt.Sprintf("(%s )", strings.Join(valuePlaceholders, ", "))
}
144 changes: 144 additions & 0 deletions internal/sinks/clickhouse/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package clickhouse

import (
"context"
"fmt"
"net"
"time"

clickhouseClient "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/charmbracelet/log"
"github.com/usedatabrew/blink/internal/schema"
"github.com/usedatabrew/blink/internal/sinks"
"github.com/usedatabrew/blink/internal/stream_context"
"github.com/usedatabrew/message"
)

type SinkPlugin struct {
ctx *stream_context.Context
config Config
inputSchema map[string]schema.StreamSchema
logger *log.Logger
connection driver.Conn
rowStatements map[string]map[message.Event]string
}

func NewClickHouseSinkPlugin(config Config, ctx *stream_context.Context) sinks.DataSink {
return &SinkPlugin{
ctx: ctx,
config: config,
logger: ctx.Logger.WithPrefix("[sink]: clickhouse"),
}
}

func (p *SinkPlugin) Connect(ctx context.Context) error {
host := fmt.Sprintf("%s:%d", p.config.Host, p.config.Port)

conn, err := clickhouseClient.Open(&clickhouseClient.Options{
Addr: []string{host},
Auth: clickhouseClient.Auth{
Database: p.config.Database,
Username: p.config.User,
Password: p.config.Password,
},
DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
var d net.Dialer
return d.DialContext(ctx, "tcp", addr)
},
Settings: clickhouseClient.Settings{
"max_execution_time": 60,
},
Compression: &clickhouseClient.Compression{
Method: clickhouseClient.CompressionLZ4,
},
DialTimeout: time.Second * 30,
MaxOpenConns: 5,
MaxIdleConns: 5,
ConnOpenStrategy: clickhouseClient.ConnOpenInOrder,
BlockBufferSize: 10,
MaxCompressionBuffer: 10240,
})

if err != nil {
return err
}

conn.Ping(p.ctx.GetContext())

p.connection = conn

return nil
}

func (p *SinkPlugin) Write(m *message.Message) error {
event := m.GetEvent()

if event != message.Insert {
return nil
}

p.logger.Info("Applying operation", "op", m.GetEvent(), "stream", m.GetStream())

var colValues []interface{}
for _, col := range getColumnNamesSorted(p.inputSchema[m.GetStream()].Columns) {
colValues = append(colValues, m.Data.AccessProperty(col))
}

statement := p.rowStatements[m.GetStream()][m.GetEvent()]

err := p.connection.AsyncInsert(p.ctx.GetContext(), statement, false, colValues...)

if err != nil {
return err
}

return nil
}

func (p *SinkPlugin) GetType() sinks.SinkDriver {
return sinks.ClickHouse
}

func (p *SinkPlugin) SetExpectedSchema(sCh []schema.StreamSchema) {
iSchema := make(map[string]schema.StreamSchema)

for _, stream := range sCh {
iSchema[stream.StreamName] = stream
}

p.inputSchema = iSchema

p.createInitStatements()
}

func (p *SinkPlugin) Stop() {
p.connection.Close()
}

func (p *SinkPlugin) createInitStatements() {
dbCreateTableStatements := make(map[string]string)
var rowStatements = make(map[string]map[message.Event]string)

for _, stream := range p.inputSchema {
dbCreateTableStatements[stream.StreamName] = generateCreateTableStatement(stream.StreamName, stream.Columns)

insertStatement := generateInsertStatement(stream)

rowStatements[stream.StreamName] = map[message.Event]string{
message.Insert: insertStatement,
}
}

p.rowStatements = rowStatements

p.logger.Info("Generated init statements to create table for the sink database", "statements", dbCreateTableStatements)

for streamName, dbCreateTableStatement := range dbCreateTableStatements {
err := p.connection.Exec(p.ctx.GetContext(), dbCreateTableStatement)

if err != nil {
p.logger.Fatal("Failed to create table for stream", "stream", streamName, "error", err)
}
}
}
1 change: 1 addition & 0 deletions internal/sinks/sink_drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ const (
NatsSinkType SinkDriver = "nats"
RabbitMqSinkType SinkDriver = "rabbitmq"
RedisSinkType SinkDriver = "redis"
ClickHouse SinkDriver = "clickhouse"
)
Loading

0 comments on commit 53e2325

Please sign in to comment.