Skip to content

Commit

Permalink
Merge pull request #24 from usedatabrew/dat-371
Browse files Browse the repository at this point in the history
Dat 371
  • Loading branch information
le-vlad committed Mar 5, 2024
2 parents d0afad8 + 2ce01b1 commit 782b772
Show file tree
Hide file tree
Showing 9 changed files with 418 additions and 15 deletions.
46 changes: 46 additions & 0 deletions examples/mysql_cdc/example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
service:
id: 32333
pipeline_id: 32333
enable_etcd_registry: true
etcd:
host: http://etcd-server.internal:2379
reload_on_restart: false
source:
driver: mysql_cdc
config:
host: 127.0.0.1
port: 3306
database: db
user: root
password: test
flavor: mysql
stream_snapshot: true
tables:
- lights
stream_schema:
- stream: flights
columns:
- name: id
databrewType: Int32
nativeConnectorType: integer
pk: true
nullable: false
- name: created_at
databrewType: String
nativeConnectorType: varchar
pk: false
nullable: true
- name: payment_method
databrewType: String
nativeConnectorType: character varying
pk: false
nullable: false
- name: duration
databrewType: Int32
nativeConnectorType: integer
pk: false
nullable: false

sink:
driver: stdout
config: { }
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/blastrain/vitess-sqlparser v0.0.0-20201030050434-a139afbb1aba
github.com/charmbracelet/log v0.3.1
github.com/cloudquery/plugin-sdk/v4 v4.16.1
github.com/go-mysql-org/go-mysql v1.7.0
github.com/go-playground/validator/v10 v10.14.0
github.com/goccy/go-json v0.10.2
github.com/gorilla/websocket v1.5.0
Expand All @@ -39,8 +40,10 @@ require (
)

require (
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/apache/arrow/go/v13 v13.0.0 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/charmbracelet/lipgloss v0.9.1 // indirect
Expand Down Expand Up @@ -81,11 +84,17 @@ require (
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 // indirect
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7 // indirect
github.com/pingcap/tidb/parser v0.0.0-20221126021158-6b02a5d8ba7d // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/rabbitmq/amqp091-go v1.7.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/thedevsaddam/gojsonq/v2 v2.5.2 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
Expand Down Expand Up @@ -113,4 +122,5 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)
56 changes: 56 additions & 0 deletions go.sum

Large diffs are not rendered by default.

10 changes: 0 additions & 10 deletions internal/sources/mongo_stream/types.go

This file was deleted.

11 changes: 11 additions & 0 deletions internal/sources/mysql_cdc/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package mysql_cdc

type Config struct {
Host string `json:"host" yaml:"host"`
Port uint16 `json:"port" yaml:"port"`
Database string `json:"database" yaml:"database"`
User string `json:"user" yaml:"user"`
Password string `json:"password" yaml:"password"`
Flavor string `json:"flavor" yaml:"flavor"`
StreamSnapshot bool `json:"stream_snapshot" yaml:"stream_snapshot"`
}
89 changes: 89 additions & 0 deletions internal/sources/mysql_cdc/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package mysql_cdc

import (
"encoding/json"
"strings"
"time"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/schema"
)

const mysqlDateFormat = "2006-01-02"

func convertData(col schema.TableColumn, value interface{}) interface{} {
switch col.Type {
case schema.TYPE_ENUM:
switch value := value.(type) {
case int64:
// for binlog, ENUM may be int64, but for dump, enum is string
eNum := value - 1
if eNum < 0 || eNum >= int64(len(col.EnumValues)) {
return ""
}

return col.EnumValues[eNum]
}
case schema.TYPE_SET:
switch value := value.(type) {
case int64:
// for binlog, SET may be int64, but for dump, SET is string
bitmask := value
sets := make([]string, 0, len(col.SetValues))
for i, s := range col.SetValues {
if bitmask&int64(1<<uint(i)) > 0 {
sets = append(sets, s)
}
}
return strings.Join(sets, ",")
}
case schema.TYPE_BIT:
switch value := value.(type) {
case string:
// for binlog, BIT is int64, but for dump, BIT is string
// for dump 0x01 is for 1, \0 is for 0
if value == "\x01" {
return int64(1)
}

return int64(0)
}
case schema.TYPE_STRING:
switch value := value.(type) {
case []byte:
return string(value[:])
}
case schema.TYPE_JSON:
var f interface{}
var err error
switch v := value.(type) {
case string:
err = json.Unmarshal([]byte(v), &f)
case []byte:
err = json.Unmarshal(v, &f)
}
if err == nil && f != nil {
return f
}
case schema.TYPE_DATETIME, schema.TYPE_TIMESTAMP:
switch v := value.(type) {
case string:
vt, err := time.ParseInLocation(mysql.TimeFormat, string(v), time.Local)
if err != nil || vt.IsZero() { // failed to parse date or zero date
return nil
}
return vt.Format(time.RFC3339)
}
case schema.TYPE_DATE:
switch v := value.(type) {
case string:
vt, err := time.Parse(mysqlDateFormat, string(v))
if err != nil || vt.IsZero() { // failed to parse date or zero date
return nil
}
return vt.Format(mysqlDateFormat)
}
}

return value
}
189 changes: 189 additions & 0 deletions internal/sources/mysql_cdc/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package mysql_cdc

import (
"context"
"errors"
"fmt"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/cloudquery/plugin-sdk/v4/scalar"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/usedatabrew/blink/internal/helper"
"github.com/usedatabrew/blink/internal/schema"
"github.com/usedatabrew/blink/internal/sources"
"github.com/usedatabrew/message"
)

type DataTableSchema struct {
TableName string
Schema *arrow.Schema
}

type ProcessEventParams struct {
initValue, incrementValue int
}

type SourcePlugin struct {
config Config
inputSchema map[string]schema.StreamSchema
outputSchema map[string]DataTableSchema
messagesStream chan sources.MessageEvent
canal *canal.Canal
canal.DummyEventHandler
}

func NewMysqlSourcePlugin(config Config, sCh []schema.StreamSchema) sources.DataSource {
iSchema := make(map[string]schema.StreamSchema)

for _, stream := range sCh {
iSchema[stream.StreamName] = stream
}

instance := &SourcePlugin{
config: config,
inputSchema: iSchema,
messagesStream: make(chan sources.MessageEvent),
}

instance.buildOutputSchema()

return instance
}

func (p *SourcePlugin) Connect(ctx context.Context) error {
cfg := canal.NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:%d", p.config.Host, p.config.Port)
cfg.User = p.config.User
cfg.Password = p.config.Password
cfg.Flavor = p.config.Flavor

cfg.Dump.TableDB = p.config.Database

var tables []string

for _, table := range p.inputSchema {
tables = append(tables, table.StreamName)
}

cfg.Dump.Tables = tables

c, err := canal.NewCanal(cfg)

if err != nil {
return err
}

p.canal = c

return nil
}

func (p *SourcePlugin) Start() {
p.canal.SetEventHandler(p)

if p.config.StreamSnapshot {
p.canal.Run()
} else {
coords, _ := p.canal.GetMasterPos()

p.canal.RunFrom(coords)
}
}

func (p *SourcePlugin) Stop() {
p.canal.Close()
}

func (p *SourcePlugin) Events() chan sources.MessageEvent {
return p.messagesStream
}

func (p *SourcePlugin) OnRow(e *canal.RowsEvent) error {
if p.config.Database != e.Table.Schema {
return nil
}

if _, ok := p.inputSchema[e.Table.Name]; !ok {
return nil
}

switch e.Action {
case canal.InsertAction:
return p.processEvent(e, ProcessEventParams{initValue: 0, incrementValue: 1})
case canal.DeleteAction:
return p.processEvent(e, ProcessEventParams{initValue: 0, incrementValue: 1})
case canal.UpdateAction:
return p.processEvent(e, ProcessEventParams{initValue: 1, incrementValue: 2})
default:
return errors.New("invalid rows action")
}
}

func (p *SourcePlugin) processEvent(e *canal.RowsEvent, params ProcessEventParams) error {
inputSchema := p.inputSchema[e.Table.Name]
outputSchema := p.outputSchema[e.Table.Name]

builder := array.NewRecordBuilder(memory.DefaultAllocator, outputSchema.Schema)

for i := params.initValue; i < len(e.Rows); i += params.incrementValue {
for i, v := range e.Rows[i] {
outputIndex := -1

for inputSchemaIndex, inputSchemaColumn := range inputSchema.Columns {
if e.Table.Columns[i].Name == inputSchemaColumn.Name {
outputIndex = inputSchemaIndex
}
}

if outputIndex == -1 {
continue
}

s := scalar.NewScalar(outputSchema.Schema.Field(outputIndex).Type)

if err := s.Set(convertData(e.Table.Columns[i], v)); err != nil {
panic(err)
}

scalar.AppendToBuilder(builder.Field(outputIndex), s)
}
}

bytes, _ := builder.NewRecord().MarshalJSON()
m := message.NewMessage(message.Event(e.Action), e.Table.Name, bytes)

p.messagesStream <- sources.MessageEvent{
Message: m,
Err: nil,
}

return nil
}

func (p *SourcePlugin) buildOutputSchema() {
outputSchema := make(map[string]DataTableSchema)

for _, stream := range p.inputSchema {
tSch := DataTableSchema{
TableName: stream.StreamName,
}

var arrowSchemaFields []arrow.Field

for _, schemaCol := range stream.Columns {
arrowSchemaFields = append(arrowSchemaFields, arrow.Field{
Name: schemaCol.Name,
Type: helper.MapPlainTypeToArrow(schemaCol.DatabrewType),
Nullable: schemaCol.Nullable,
Metadata: arrow.Metadata{},
})
}

tSch.Schema = arrow.NewSchema(arrowSchemaFields, nil)
outputSchema[stream.StreamName] = tSch
}

p.outputSchema = outputSchema
}
1 change: 1 addition & 0 deletions internal/sources/source_drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ const (
WebSockets SourceDriver = "websocket"
AirTable SourceDriver = "airtable"
Playground SourceDriver = "playground"
MysqlCDC SourceDriver = "mysql_cdc"
)
Loading

0 comments on commit 782b772

Please sign in to comment.