Skip to content

Commit

Permalink
move all state parsing to types.go
Browse files Browse the repository at this point in the history
Print the number of rows synced in each session
  • Loading branch information
Phani Raj committed Aug 4, 2022
1 parent f5685d3 commit 8946b72
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 28 deletions.
9 changes: 9 additions & 0 deletions cmd/http-tap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func execute(logger internal.Logger, apiUrl string, batchSize int, token string)
stream *internal.Stream
)

recordCount := 0
batchWriter := internal.NewBatchWriter(batchSize, logger, apiUrl, apiToken)

for scanner.Scan() {
Expand All @@ -65,13 +66,18 @@ func execute(logger internal.Logger, apiUrl string, batchSize int, token string)
if err := batchWriter.Flush(stream); err != nil {
return err
}
if recordCount > 0 {
logger.Info(fmt.Sprintf("Published [%v] records for stream %q", recordCount, stream.Name))
}
}

// we retain the catalog so we can build a BatchMessage
stream = s
recordCount = 0
}

if r != nil {
recordCount += 1
if err := batchWriter.Send(r, stream); err != nil {
return err
}
Expand All @@ -82,6 +88,9 @@ func execute(logger internal.Logger, apiUrl string, batchSize int, token string)
return scanner.Err()
}

if recordCount > 0 {
logger.Info(fmt.Sprintf("Published [%v] records for stream %q", recordCount, stream.Name))
}
return batchWriter.Flush(stream)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Sync(ctx context.Context, mysqlDatabase PlanetScaleEdgeMysqlAccess, edgeDat
}

// if there is no last known state, start from the beginning.
if state == nil {
if state == nil || len(state.Streams) == 0 {
state = beginningState
}

Expand Down
41 changes: 41 additions & 0 deletions cmd/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package internal

import (
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"

"github.com/pkg/errors"
psdbconnect "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
"github.com/planetscale/psdb/core/codec"
Expand Down Expand Up @@ -406,6 +410,43 @@ type SerializedCursor struct {
Cursor string `json:"cursor"`
}

func ParseSavedState(stateFilePath string) (*State, error) {
var (
state *State
wrappedState *WrappedState
err error
)

state, err = Parse(stateFilePath, state)
if err != nil {
return nil, fmt.Errorf("state file contents are invalid: %q", err)
}

if state == nil || len(state.Streams) == 0 {
wrappedState, err = Parse(stateFilePath, wrappedState)
if err != nil {
return nil, fmt.Errorf("state file contents are invalid: %q", err)
}
if wrappedState != nil {
state = &wrappedState.Value
}
}

return state, nil
}

func Parse[T any](path string, obj T) (T, error) {
b, err := ioutil.ReadFile(path)
if err != nil {
return obj, errors.Wrapf(err, "unable to read file at path %v", path)
}

if err = json.Unmarshal(b, &obj); err != nil {
return obj, err
}
return obj, nil
}

func (s SerializedCursor) SerializedCursorToTableCursor() (*psdbconnect.TableCursor, error) {
var (
tc psdbconnect.TableCursor
Expand Down
30 changes: 3 additions & 27 deletions cmd/singer-tap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"strings"

Expand Down Expand Up @@ -53,15 +51,14 @@ func execute(discoverMode bool, logger internal.Logger, configFilePath, catalogF
sourceConfig internal.PlanetScaleSource
catalog internal.Catalog
state *internal.State
wrappedState *internal.WrappedState
err error
)

if len(configFilePath) == 0 {
return errors.New("Please specify path to a valid configuration file with the --config flag")
}

sourceConfig, err = parse(configFilePath, sourceConfig)
sourceConfig, err = internal.Parse(configFilePath, sourceConfig)
if err != nil {
return fmt.Errorf("config file contents are invalid: %q", err)
}
Expand All @@ -84,25 +81,16 @@ func execute(discoverMode bool, logger internal.Logger, configFilePath, catalogF
return errors.New("Please specify path to a valid catalog file with the --catalog flag")
}

catalog, err = parse(catalogFilePath, catalog)
catalog, err = internal.Parse(catalogFilePath, catalog)
if err != nil {
return fmt.Errorf("catalog file contents are invalid: %q", err)
}

if len(stateFilePath) > 0 {
state, err = parse(stateFilePath, state)
state, err = internal.ParseSavedState(stateFilePath)
if err != nil {
return fmt.Errorf("state file contents are invalid: %q", err)
}
if state == nil || len(state.Streams) == 0 {
wrappedState, err = parse(stateFilePath, wrappedState)
if err != nil {
return fmt.Errorf("state file contents are invalid: %q", err)
}
if wrappedState != nil {
state = &wrappedState.Value
}
}
}

return sync(context.Background(), logger, sourceConfig, catalog, state)
Expand Down Expand Up @@ -135,15 +123,3 @@ func discover(ctx context.Context, logger internal.Logger, source internal.Plane

return logger.Schema(catalog)
}

func parse[T any](path string, obj T) (T, error) {
b, err := ioutil.ReadFile(path)
if err != nil {
return obj, errors.Wrapf(err, "unable to read file at path %v", path)
}

if err = json.Unmarshal(b, &obj); err != nil {
return obj, err
}
return obj, nil
}

0 comments on commit 8946b72

Please sign in to comment.