diff --git a/cmd/http-tap/main.go b/cmd/http-tap/main.go index cbd340d..4acc04c 100644 --- a/cmd/http-tap/main.go +++ b/cmd/http-tap/main.go @@ -51,6 +51,7 @@ func execute(logger internal.Logger, apiUrl string, batchSize int, token string) stream *internal.Stream ) + recordCount := 0 batchWriter := internal.NewBatchWriter(batchSize, logger, apiUrl, apiToken) for scanner.Scan() { @@ -65,13 +66,18 @@ func execute(logger internal.Logger, apiUrl string, batchSize int, token string) if err := batchWriter.Flush(stream); err != nil { return err } + if recordCount > 0 { + logger.Info(fmt.Sprintf("Published [%v] records for stream %q", recordCount, stream.Name)) + } } // we retain the catalog so we can build a BatchMessage stream = s + recordCount = 0 } if r != nil { + recordCount += 1 if err := batchWriter.Send(r, stream); err != nil { return err } @@ -82,6 +88,9 @@ func execute(logger internal.Logger, apiUrl string, batchSize int, token string) return scanner.Err() } + if recordCount > 0 { + logger.Info(fmt.Sprintf("Published [%v] records for stream %q", recordCount, stream.Name)) + } return batchWriter.Flush(stream) } diff --git a/cmd/internal/sync.go b/cmd/internal/sync.go index 4dd3060..05cb4ba 100644 --- a/cmd/internal/sync.go +++ b/cmd/internal/sync.go @@ -28,7 +28,7 @@ func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDat } // if there is no last known state, start from the beginning. - if state == nil { + if state == nil || len(state.Streams) == 0 { state = beginningState } diff --git a/cmd/internal/types.go b/cmd/internal/types.go index 6bd1efe..1ee1eea 100644 --- a/cmd/internal/types.go +++ b/cmd/internal/types.go @@ -2,6 +2,10 @@ package internal import ( "encoding/base64" + "encoding/json" + "fmt" + "io/ioutil" + "github.com/pkg/errors" psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1" "github.com/planetscale/psdb/core/codec" @@ -406,6 +410,43 @@ type SerializedCursor struct { Cursor string `json:"cursor"` } +func ParseSavedState(stateFilePath string) (*State, error) { + var ( + state *State + wrappedState *WrappedState + err error + ) + + state, err = Parse(stateFilePath, state) + if err != nil { + return nil, fmt.Errorf("state file contents are invalid: %q", err) + } + + if state == nil || len(state.Streams) == 0 { + wrappedState, err = Parse(stateFilePath, wrappedState) + if err != nil { + return nil, fmt.Errorf("state file contents are invalid: %q", err) + } + if wrappedState != nil { + state = &wrappedState.Value + } + } + + return state, nil +} + +func Parse[T any](path string, obj T) (T, error) { + b, err := ioutil.ReadFile(path) + if err != nil { + return obj, errors.Wrapf(err, "unable to read file at path %v", path) + } + + if err = json.Unmarshal(b, &obj); err != nil { + return obj, err + } + return obj, nil +} + func (s SerializedCursor) SerializedCursorToTableCursor() (*psdbconnect.TableCursor, error) { var ( tc psdbconnect.TableCursor diff --git a/cmd/singer-tap/main.go b/cmd/singer-tap/main.go index 9837994..63454bd 100644 --- a/cmd/singer-tap/main.go +++ b/cmd/singer-tap/main.go @@ -2,10 +2,8 @@ package main import ( "context" - "encoding/json" "flag" "fmt" - "io/ioutil" "os" "strings" @@ -53,7 +51,6 @@ func execute(discoverMode bool, logger internal.Logger, configFilePath, catalogF sourceConfig internal.PlanetScaleSource catalog internal.Catalog state *internal.State - wrappedState *internal.WrappedState err error ) @@ -61,7 +58,7 @@ func execute(discoverMode bool, logger internal.Logger, configFilePath, catalogF return errors.New("Please specify path to a valid configuration file with the --config flag") } - sourceConfig, err = parse(configFilePath, sourceConfig) + sourceConfig, err = internal.Parse(configFilePath, sourceConfig) if err != nil { return fmt.Errorf("config file contents are invalid: %q", err) } @@ -84,25 +81,16 @@ func execute(discoverMode bool, logger internal.Logger, configFilePath, catalogF return errors.New("Please specify path to a valid catalog file with the --catalog flag") } - catalog, err = parse(catalogFilePath, catalog) + catalog, err = internal.Parse(catalogFilePath, catalog) if err != nil { return fmt.Errorf("catalog file contents are invalid: %q", err) } if len(stateFilePath) > 0 { - state, err = parse(stateFilePath, state) + state, err = internal.ParseSavedState(stateFilePath) if err != nil { return fmt.Errorf("state file contents are invalid: %q", err) } - if state == nil || len(state.Streams) == 0 { - wrappedState, err = parse(stateFilePath, wrappedState) - if err != nil { - return fmt.Errorf("state file contents are invalid: %q", err) - } - if wrappedState != nil { - state = &wrappedState.Value - } - } } return sync(context.Background(), logger, sourceConfig, catalog, state) @@ -135,15 +123,3 @@ func discover(ctx context.Context, logger internal.Logger, source internal.Plane return logger.Schema(catalog) } - -func parse[T any](path string, obj T) (T, error) { - b, err := ioutil.ReadFile(path) - if err != nil { - return obj, errors.Wrapf(err, "unable to read file at path %v", path) - } - - if err = json.Unmarshal(b, &obj); err != nil { - return obj, err - } - return obj, nil -}