Skip to content

Commit

Permalink
chore: introduce encoding factory (#3740)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 21, 2023
1 parent 9fcead7 commit 188b95c
Show file tree
Hide file tree
Showing 37 changed files with 628 additions and 527 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ require (
github.com/snowflakedb/gosnowflake v1.6.23
github.com/sony/gobreaker v0.5.0
github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/cast v1.5.0
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.16.0
Expand Down Expand Up @@ -239,7 +240,6 @@ require (
github.com/shirou/gopsutil/v3 v3.23.4 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
Expand Down
2 changes: 0 additions & 2 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
"github.com/rudderlabs/rudder-server/warehouse"
"github.com/rudderlabs/rudder-server/warehouse/encoding"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/rudderlabs/rudder-server/warehouse/validations"
)
Expand Down Expand Up @@ -330,7 +329,6 @@ func runAllInit() {
diagnostics.Init()
backendconfig.Init()
warehouseutils.Init()
encoding.Init()
pgnotifier.Init()
jobsdb.Init()
warehouse.Init4()
Expand Down
23 changes: 12 additions & 11 deletions warehouse/encoding/csvloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,56 +10,57 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
)

// CsvLoader is common for non-BQ warehouses.
// csvLoader is common for non-BQ warehouses.
// If you need any custom logic, either extend this or use destType and if/else/switch.
type CsvLoader struct {
type csvLoader struct {
destType string
csvRow []string
buff bytes.Buffer
csvWriter *csv.Writer
fileWriter LoadFileWriter
}

func NewCSVLoader(destType string, writer LoadFileWriter) *CsvLoader {
loader := &CsvLoader{destType: destType, fileWriter: writer}
func newCSVLoader(writer LoadFileWriter, destType string) *csvLoader {
loader := &csvLoader{destType: destType, fileWriter: writer}
loader.csvRow = []string{}
loader.buff = bytes.Buffer{}
loader.csvWriter = csv.NewWriter(&loader.buff)
return loader
}

func (loader *CsvLoader) IsLoadTimeColumn(columnName string) bool {
func (loader *csvLoader) IsLoadTimeColumn(columnName string) bool {
return columnName == warehouseutils.ToProviderCase(loader.destType, UUIDTsColumn)
}

func (*CsvLoader) GetLoadTimeFormat(string) string {
func (*csvLoader) GetLoadTimeFormat(string) string {
return misc.RFC3339Milli
}

func (loader *CsvLoader) AddColumn(_, _ string, val interface{}) {
func (loader *csvLoader) AddColumn(_, _ string, val interface{}) {
valString := fmt.Sprintf("%v", val)
loader.csvRow = append(loader.csvRow, valString)
}

func (loader *CsvLoader) AddRow(_, row []string) {
func (loader *csvLoader) AddRow(_, row []string) {
loader.csvRow = append(loader.csvRow, row...)
}

func (loader *CsvLoader) AddEmptyColumn(columnName string) {
func (loader *csvLoader) AddEmptyColumn(columnName string) {
loader.AddColumn(columnName, "", "")
}

func (loader *CsvLoader) WriteToString() (string, error) {
func (loader *csvLoader) WriteToString() (string, error) {
err := loader.csvWriter.Write(loader.csvRow)
if err != nil {
return "", fmt.Errorf("csvWriter write: %w", err)
}

loader.csvWriter.Flush()

return loader.buff.String(), nil
}

func (loader *CsvLoader) Write() error {
func (loader *csvLoader) Write() error {
eventData, err := loader.WriteToString()
if err != nil {
return fmt.Errorf("writing to string: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions warehouse/encoding/csvreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"io"
)

type CsvReader struct {
type csvReader struct {
reader *csv.Reader
}

func (csv *CsvReader) Read([]string) (record []string, err error) {
func (csv *csvReader) Read([]string) (record []string, err error) {
record, err = csv.reader.Read()
return
}

func NewCsvReader(r io.Reader) *CsvReader {
return &CsvReader{reader: csv.NewReader(r)}
func newCsvReader(r io.Reader) *csvReader {
return &csvReader{reader: csv.NewReader(r)}
}
72 changes: 63 additions & 9 deletions warehouse/encoding/encoding.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package encoding

import (
"io"
"os"

"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"

"github.com/rudderlabs/rudder-go-kit/config"
)

Expand All @@ -13,11 +18,23 @@ const (
BQUuidTSFormat = "2006-01-02 15:04:05 Z"
)

var (
maxStagingFileReadBufferCapacityInK int
parquetParallelWriters int64
)
type Factory struct {
config struct {
maxStagingFileReadBufferCapacityInK int
parquetParallelWriters int64
}
}

func NewFactory(conf *config.Config) *Factory {
m := &Factory{}

conf.RegisterIntConfigVariable(10240, &m.config.maxStagingFileReadBufferCapacityInK, false, 1, "Warehouse.maxStagingFileReadBufferCapacityInK")
conf.RegisterInt64ConfigVariable(8, &m.config.parquetParallelWriters, true, 1, "Warehouse.parquetParallelWriters")

return m
}

// LoadFileWriter is an interface for writing events to a load file
type LoadFileWriter interface {
WriteGZ(s string) error
Write(p []byte) (int, error)
Expand All @@ -26,11 +43,48 @@ type LoadFileWriter interface {
GetLoadFile() *os.File
}

func Init() {
loadConfig()
func (m *Factory) NewLoadFileWriter(loadFileType, outputFilePath string, schema model.TableSchema, destType string) (LoadFileWriter, error) {
switch loadFileType {
case warehouseutils.LoadFileTypeParquet:
return createParquetWriter(outputFilePath, schema, destType, m.config.parquetParallelWriters)
default:
return misc.CreateGZ(outputFilePath)
}
}

// EventLoader is an interface for loading events into a load file
// It's used to load singular BatchRouterEvent events into a load file
type EventLoader interface {
IsLoadTimeColumn(columnName string) bool
GetLoadTimeFormat(columnName string) string
AddColumn(columnName, columnType string, val interface{})
AddRow(columnNames, values []string)
AddEmptyColumn(columnName string)
WriteToString() (string, error)
Write() error
}

func (m *Factory) NewEventLoader(w LoadFileWriter, loadFileType, destinationType string) EventLoader {
switch loadFileType {
case warehouseutils.LoadFileTypeJson:
return newJSONLoader(w, destinationType)
case warehouseutils.LoadFileTypeParquet:
return newParquetLoader(w, destinationType)
default:
return newCSVLoader(w, destinationType)
}
}

// EventReader is an interface for reading events from a load file
type EventReader interface {
Read(columnNames []string) (record []string, err error)
}

func loadConfig() {
config.RegisterIntConfigVariable(10240, &maxStagingFileReadBufferCapacityInK, false, 1, "Warehouse.maxStagingFileReadBufferCapacityInK")
config.RegisterInt64ConfigVariable(8, &parquetParallelWriters, true, 1, "Warehouse.parquetParallelWriters")
func (m *Factory) NewEventReader(r io.Reader, destType string) EventReader {
switch destType {
case warehouseutils.BQ:
return newJSONReader(r, m.config.maxStagingFileReadBufferCapacityInK)
default:
return newCsvReader(r)
}
}
Loading

0 comments on commit 188b95c

Please sign in to comment.