diff --git a/config/example.yaml b/config/example.yaml index ee79063..79035ba 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -10,7 +10,7 @@ service: org: DataBrew, Inc. bucket: databrew_ingestor group_name: org_11 - pipeline_id: 123 + pipeline_id: 1233 reload_on_restart: false stream_schema: - stream: flights diff --git a/internal/schema/schema.go b/internal/schema/schema.go index 860d87b..9399cff 100644 --- a/internal/schema/schema.go +++ b/internal/schema/schema.go @@ -37,8 +37,9 @@ func (s *StreamSchemaObj) GetLatestSchema() []StreamSchema { func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.DataType) { streamSchema := s.streamSchemaVersions[s.lastVersion] - streamSchema = *&streamSchema - for idx, stream := range streamSchema { + var streamSchemaCopy = make([]StreamSchema, len(streamSchema)) + copy(streamSchemaCopy, streamSchema) + for idx, stream := range streamSchemaCopy { if stream.StreamName == streamName { arrowColumn := Column{ Name: name, @@ -49,12 +50,12 @@ func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.Data } stream.Columns = append(stream.Columns, arrowColumn) - streamSchema[idx] = stream + streamSchemaCopy[idx] = stream } } s.lastVersion += 1 - s.streamSchemaVersions[s.lastVersion] = streamSchema + s.streamSchemaVersions[s.lastVersion] = streamSchemaCopy } // TODO:: add columns removal diff --git a/internal/sinks/postgres/config.go b/internal/sinks/postgres/config.go new file mode 100644 index 0000000..5a1feff --- /dev/null +++ b/internal/sinks/postgres/config.go @@ -0,0 +1,11 @@ +package postgres + +type Config struct { + Host string `json:"host" yaml:"host"` + Port int `json:"port" yaml:"port"` + Database string `json:"database" yaml:"database"` + User string `json:"user" yaml:"user"` + Schema string `json:"schema" yaml:"schema"` + Password string `json:"password" yaml:"password"` + SSLRequired bool `json:"ssl_required" yaml:"ssl_required"` +} diff --git a/internal/sinks/postgres/plugin.go b/internal/sinks/postgres/plugin.go new file mode 100644 index 0000000..3cfc692 --- /dev/null +++ b/internal/sinks/postgres/plugin.go @@ -0,0 +1,61 @@ +package postgres + +import ( + "astro/internal/message" + "astro/internal/schema" + "astro/internal/sinks" + "context" + "fmt" + "github.com/jackc/pgx/v5" +) + +type SinkPlugin struct { + ctx context.Context + config Config + streamSchema []schema.StreamSchema + conn *pgx.Conn +} + +func NewPostgresSinkPlugin(config Config, schema []schema.StreamSchema) sinks.DataSink { + return &SinkPlugin{ + config: config, + streamSchema: schema, + } +} + +func (s *SinkPlugin) Connect(context context.Context) error { + connStr := fmt.Sprintf("postgres://%s:%s@%s:%d/%s", + s.config.User, + s.config.Password, + s.config.Host, + s.config.Port, + s.config.Database, + ) + + conn, err := pgx.Connect(context, connStr) + s.conn = conn + + return err +} + +func (s *SinkPlugin) SetExpectedSchema(schema []schema.StreamSchema) { + s.streamSchema = schema + s.generateCreateTableStatements() +} + +func (s *SinkPlugin) GetType() sinks.SinkDriver { + return sinks.PostgresSinkType +} + +func (s *SinkPlugin) Write(m message.Message) error { + //TODO implement me + panic("implement me") +} + +func (s *SinkPlugin) Stop() { + s.conn.Close(s.ctx) +} + +func (s *SinkPlugin) generateCreateTableStatements() { + +} diff --git a/internal/sinks/sink_drivers.go b/internal/sinks/sink_drivers.go index 58dacd5..9d29ac8 100644 --- a/internal/sinks/sink_drivers.go +++ b/internal/sinks/sink_drivers.go @@ -6,4 +6,5 @@ const ( StdOutSinkType SinkDriver = "stdout" WebSocketSinkType SinkDriver = "websocket" KafkaSinkType SinkDriver = "kafka" + PostgresSinkType SinkDriver = "postgres" ) diff --git a/public/stream/stream.go b/public/stream/stream.go index 25e7117..8f9f8f7 100644 --- a/public/stream/stream.go +++ b/public/stream/stream.go @@ -64,8 +64,6 @@ func InitFromConfig(config config.Configuration) (*Stream, error) { Info("Loaded") } - s.evolveSchemaForSinks(s.schema) - streamContext.Logger.WithPrefix("Sinks").With( "driver", config.Sink.Driver, ).Info("Loading...") @@ -79,6 +77,7 @@ func InitFromConfig(config config.Configuration) (*Stream, error) { return nil, err } + s.evolveSchemaForSinks(s.schema) s.sinks[0].SetExpectedSchema(s.schema) s.dataStream = stream.OfChannel(s.source.Events())