Skip to content

Commit

Permalink
fix(streams): added stream name normalizer
Browse files Browse the repository at this point in the history
  • Loading branch information
le-vlad committed Feb 13, 2024
1 parent 1bf6f87 commit 63f02fa
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 4 deletions.
12 changes: 12 additions & 0 deletions internal/helper/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package helper

import "strings"

func NormalizeStreamName(stream string) string {
var normalizedName = stream
if len(strings.Split(stream, ".")) == 2 {
normalizedName = strings.Split(stream, ".")[1]
}

return normalizedName
}
3 changes: 2 additions & 1 deletion internal/processors/sql/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/barkimedes/go-deepcopy"
"github.com/blastrain/vitess-sqlparser/sqlparser"
"github.com/charmbracelet/log"
"github.com/usedatabrew/blink/internal/helper"
"github.com/usedatabrew/blink/internal/schema"
"github.com/usedatabrew/blink/internal/stream_context"
"github.com/usedatabrew/message"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (p *Plugin) EvolveSchema(streamSchema *schema.StreamSchemaObj) error {

var streamToProcess *schema.StreamSchema
for _, stream := range streamSchema.GetLatestSchema() {
if stream.StreamName == streamName {
if helper.NormalizeStreamName(stream.StreamName) == streamName {
streamToProcess = &stream
}
}
Expand Down
5 changes: 3 additions & 2 deletions internal/schema/schema_obj.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package schema
import (
"github.com/apache/arrow/go/v14/arrow"
"github.com/barkimedes/go-deepcopy"
"github.com/usedatabrew/blink/internal/helper"
"slices"
)

Expand All @@ -27,7 +28,7 @@ func (s *StreamSchemaObj) GetLatestSchema() []StreamSchema {
func (s *StreamSchemaObj) AddField(streamName, name string, fieldType arrow.DataType, driverType string) {
var streamSchemaCopy = s.getLastSchemaDeepCopy()
for idx, stream := range streamSchemaCopy {
if stream.StreamName == streamName {
if helper.NormalizeStreamName(stream.StreamName) == streamName {
arrowColumn := Column{
Name: name,
DatabrewType: fieldType.String(),
Expand All @@ -53,7 +54,7 @@ func (s *StreamSchemaObj) FakeEvolve() {
func (s *StreamSchemaObj) RemoveField(streamName, columnName string) {
var streamSchemaCopy = s.getLastSchemaDeepCopy()
for streamIndex, stream := range streamSchemaCopy {
if stream.StreamName == streamName {
if helper.NormalizeStreamName(stream.StreamName) == streamName {
for colIdx, column := range stream.Columns {
if column.Name == columnName {
streamSchemaCopy[streamIndex].Columns = remove(stream.Columns, colIdx)
Expand Down
3 changes: 2 additions & 1 deletion public/stream/config_loader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stream

import (
"fmt"
"github.com/usedatabrew/blink/config"
"os"
"regexp"
Expand All @@ -23,7 +24,7 @@ func ReadInitConfigFromYaml(configBytes []byte) (config.Configuration, error) {
for _, ss := range conf.Source.StreamSchema {
ss.SortColumnsAsc()
}

fmt.Println(string(configBytes))
config.ValidateConfigSchema(conf)
return conf, err
}
Expand Down

0 comments on commit 63f02fa

Please sign in to comment.