Skip to content

Commit

Permalink
fix(schema/evolution): updated schema copying
Browse files Browse the repository at this point in the history
  • Loading branch information
le-vlad committed Nov 20, 2023
1 parent d95a107 commit 29e3f3e
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 7 deletions.
2 changes: 1 addition & 1 deletion config/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ service:
org: DataBrew, Inc.
bucket: databrew_ingestor
group_name: org_11
pipeline_id: 123
pipeline_id: 1233
reload_on_restart: false
stream_schema:
- stream: flights
Expand Down
9 changes: 5 additions & 4 deletions internal/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ func (s *StreamSchemaObj) GetLatestSchema() []StreamSchema {

func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.DataType) {
streamSchema := s.streamSchemaVersions[s.lastVersion]
streamSchema = *&streamSchema
for idx, stream := range streamSchema {
var streamSchemaCopy = make([]StreamSchema, len(streamSchema))
copy(streamSchemaCopy, streamSchema)
for idx, stream := range streamSchemaCopy {
if stream.StreamName == streamName {
arrowColumn := Column{
Name: name,
Expand All @@ -49,12 +50,12 @@ func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.Data
}

stream.Columns = append(stream.Columns, arrowColumn)
streamSchema[idx] = stream
streamSchemaCopy[idx] = stream
}
}

s.lastVersion += 1
s.streamSchemaVersions[s.lastVersion] = streamSchema
s.streamSchemaVersions[s.lastVersion] = streamSchemaCopy
}

// TODO:: add columns removal
Expand Down
11 changes: 11 additions & 0 deletions internal/sinks/postgres/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package postgres

type Config struct {
Host string `json:"host" yaml:"host"`
Port int `json:"port" yaml:"port"`
Database string `json:"database" yaml:"database"`
User string `json:"user" yaml:"user"`
Schema string `json:"schema" yaml:"schema"`
Password string `json:"password" yaml:"password"`
SSLRequired bool `json:"ssl_required" yaml:"ssl_required"`
}
61 changes: 61 additions & 0 deletions internal/sinks/postgres/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package postgres

import (
"astro/internal/message"
"astro/internal/schema"
"astro/internal/sinks"
"context"
"fmt"
"github.com/jackc/pgx/v5"
)

type SinkPlugin struct {
ctx context.Context
config Config
streamSchema []schema.StreamSchema
conn *pgx.Conn
}

func NewPostgresSinkPlugin(config Config, schema []schema.StreamSchema) sinks.DataSink {
return &SinkPlugin{
config: config,
streamSchema: schema,
}
}

func (s *SinkPlugin) Connect(context context.Context) error {
connStr := fmt.Sprintf("postgres://%s:%s@%s:%d/%s",
s.config.User,
s.config.Password,
s.config.Host,
s.config.Port,
s.config.Database,
)

conn, err := pgx.Connect(context, connStr)
s.conn = conn

return err
}

func (s *SinkPlugin) SetExpectedSchema(schema []schema.StreamSchema) {
s.streamSchema = schema
s.generateCreateTableStatements()
}

func (s *SinkPlugin) GetType() sinks.SinkDriver {
return sinks.PostgresSinkType
}

func (s *SinkPlugin) Write(m message.Message) error {
//TODO implement me
panic("implement me")
}

func (s *SinkPlugin) Stop() {
s.conn.Close(s.ctx)
}

func (s *SinkPlugin) generateCreateTableStatements() {

}
1 change: 1 addition & 0 deletions internal/sinks/sink_drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ const (
StdOutSinkType SinkDriver = "stdout"
WebSocketSinkType SinkDriver = "websocket"
KafkaSinkType SinkDriver = "kafka"
PostgresSinkType SinkDriver = "postgres"
)
3 changes: 1 addition & 2 deletions public/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ func InitFromConfig(config config.Configuration) (*Stream, error) {
Info("Loaded")
}

s.evolveSchemaForSinks(s.schema)

streamContext.Logger.WithPrefix("Sinks").With(
"driver", config.Sink.Driver,
).Info("Loading...")
Expand All @@ -79,6 +77,7 @@ func InitFromConfig(config config.Configuration) (*Stream, error) {
return nil, err
}

s.evolveSchemaForSinks(s.schema)
s.sinks[0].SetExpectedSchema(s.schema)

s.dataStream = stream.OfChannel(s.source.Events())
Expand Down

0 comments on commit 29e3f3e

Please sign in to comment.