Skip to content

Commit

Permalink
chore: introduce encoding manager manager
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 11, 2023
1 parent 09a3c13 commit f84572f
Show file tree
Hide file tree
Showing 36 changed files with 332 additions and 339 deletions.
2 changes: 0 additions & 2 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
"github.com/rudderlabs/rudder-server/warehouse"
"github.com/rudderlabs/rudder-server/warehouse/encoding"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/rudderlabs/rudder-server/warehouse/validations"
)
Expand Down Expand Up @@ -331,7 +330,6 @@ func runAllInit() {
diagnostics.Init()
backendconfig.Init()
warehouseutils.Init()
encoding.Init()
archiver.Init()
pgnotifier.Init()
jobsdb.Init()
Expand Down
25 changes: 13 additions & 12 deletions warehouse/encoding/csvloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,56 +10,57 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
)

// CsvLoader is common for non-BQ warehouses.
// csvLoader is common for non-BQ warehouses.
// If you need any custom logic, either extend this or use destType and if/else/switch.
type CsvLoader struct {
type csvLoader struct {
destType string
csvRow []string
buff bytes.Buffer
csvWriter *csv.Writer
fileWriter LoadFileWriter
fileWriter Writer
}

func NewCSVLoader(destType string, writer LoadFileWriter) *CsvLoader {
loader := &CsvLoader{destType: destType, fileWriter: writer}
func newCSVLoader(destType string, writer Writer) *csvLoader {
loader := &csvLoader{destType: destType, fileWriter: writer}
loader.csvRow = []string{}
loader.buff = bytes.Buffer{}
loader.csvWriter = csv.NewWriter(&loader.buff)
return loader
}

func (loader *CsvLoader) IsLoadTimeColumn(columnName string) bool {
func (loader *csvLoader) IsLoadTimeColumn(columnName string) bool {
return columnName == warehouseutils.ToProviderCase(loader.destType, UUIDTsColumn)
}

func (*CsvLoader) GetLoadTimeFormat(string) string {
func (*csvLoader) GetLoadTimeFormat(string) string {
return misc.RFC3339Milli
}

func (loader *CsvLoader) AddColumn(_, _ string, val interface{}) {
func (loader *csvLoader) AddColumn(_, _ string, val interface{}) {
valString := fmt.Sprintf("%v", val)
loader.csvRow = append(loader.csvRow, valString)
}

func (loader *CsvLoader) AddRow(_, row []string) {
func (loader *csvLoader) AddRow(_, row []string) {
loader.csvRow = append(loader.csvRow, row...)
}

func (loader *CsvLoader) AddEmptyColumn(columnName string) {
func (loader *csvLoader) AddEmptyColumn(columnName string) {
loader.AddColumn(columnName, "", "")
}

func (loader *CsvLoader) WriteToString() (string, error) {
func (loader *csvLoader) WriteToString() (string, error) {
err := loader.csvWriter.Write(loader.csvRow)
if err != nil {
return "", fmt.Errorf("csvWriter write: %w", err)
}

loader.csvWriter.Flush()

return loader.buff.String(), nil
}

func (loader *CsvLoader) Write() error {
func (loader *csvLoader) Write() error {
eventData, err := loader.WriteToString()
if err != nil {
return fmt.Errorf("writing to string: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions warehouse/encoding/csvreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"io"
)

type CsvReader struct {
type csvReader struct {
reader *csv.Reader
}

func (csv *CsvReader) Read([]string) (record []string, err error) {
func (csv *csvReader) Read([]string) (record []string, err error) {
record, err = csv.reader.Read()
return
}

func NewCsvReader(r io.Reader) *CsvReader {
return &CsvReader{reader: csv.NewReader(r)}
func newCsvReader(r io.Reader) *csvReader {
return &csvReader{reader: csv.NewReader(r)}
}
70 changes: 60 additions & 10 deletions warehouse/encoding/encoding.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package encoding

import (
"io"
"os"

"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"

"github.com/rudderlabs/rudder-go-kit/config"
)

Expand All @@ -13,24 +18,69 @@ const (
BQUuidTSFormat = "2006-01-02 15:04:05 Z"
)

var (
maxStagingFileReadBufferCapacityInK int
parquetParallelWriters int64
)
type Manager struct {
config struct {
maxStagingFileReadBufferCapacityInK int
parquetParallelWriters int64
}
}

func NewManager(conf *config.Config) *Manager {
m := &Manager{}

conf.RegisterIntConfigVariable(10240, &m.config.maxStagingFileReadBufferCapacityInK, false, 1, "Warehouse.maxStagingFileReadBufferCapacityInK")
conf.RegisterInt64ConfigVariable(8, &m.config.parquetParallelWriters, true, 1, "Warehouse.parquetParallelWriters")

return m
}

type LoadFileWriter interface {
type Writer interface {
WriteGZ(s string) error
Write(p []byte) (int, error)
WriteRow(r []interface{}) error
Close() error
GetLoadFile() *os.File
}

func Init() {
loadConfig()
func (m *Manager) NewWriter(loadFileType, outputFilePath string, schema model.TableSchema, destType string) (Writer, error) {
switch loadFileType {
case warehouseutils.LoadFileTypeParquet:
return createParquetWriter(schema, outputFilePath, destType, m.config.parquetParallelWriters)
default:
return misc.CreateGZ(outputFilePath)
}
}

type Loader interface {
IsLoadTimeColumn(columnName string) bool
GetLoadTimeFormat(columnName string) string
AddColumn(columnName, columnType string, val interface{})
AddRow(columnNames, values []string)
AddEmptyColumn(columnName string)
WriteToString() (string, error)
Write() error
}

func (m *Manager) NewLoader(destinationType, loadFileType string, w Writer) Loader {
switch loadFileType {
case warehouseutils.LoadFileTypeJson:
return newJSONLoader(destinationType, w)
case warehouseutils.LoadFileTypeParquet:
return newParquetLoader(destinationType, w)
default:
return newCSVLoader(destinationType, w)
}
}

type Reader interface {
Read(columnNames []string) (record []string, err error)
}

func loadConfig() {
config.RegisterIntConfigVariable(10240, &maxStagingFileReadBufferCapacityInK, false, 1, "Warehouse.maxStagingFileReadBufferCapacityInK")
config.RegisterInt64ConfigVariable(8, &parquetParallelWriters, true, 1, "Warehouse.parquetParallelWriters")
func (m *Manager) NewReader(r io.Reader, provider string) Reader {
switch provider {
case warehouseutils.BQ:
return newJSONReader(r, m.config.maxStagingFileReadBufferCapacityInK)
default:
return newCsvReader(r)
}
}
24 changes: 15 additions & 9 deletions warehouse/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/config"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
Expand All @@ -25,7 +27,6 @@ import (

func TestReaderLoader(t *testing.T) {
misc.Init()
encoding.Init()

testCases := []struct {
name string
Expand Down Expand Up @@ -66,13 +67,16 @@ func TestReaderLoader(t *testing.T) {
writer, err := misc.CreateGZ(outputFilePath)
require.NoError(t, err)

em := encoding.NewManager(config.Default)

t.Cleanup(func() {
require.NoError(t, os.Remove(outputFilePath))
})

t.Run("add and write", func(t *testing.T) {
for i := 0; i < lines; i++ {
c := encoding.GetNewEventLoader(tc.destType, tc.loadFileType, writer)

c := em.NewLoader(tc.destType, tc.loadFileType, writer)

c.AddColumn("column1", "string", "value1")
c.AddColumn("column2", "string", "value2")
Expand Down Expand Up @@ -103,7 +107,7 @@ func TestReaderLoader(t *testing.T) {
require.NoError(t, gzipReader.Close())
})

r := encoding.NewEventReader(gzipReader, tc.provider)
r := em.NewReader(gzipReader, tc.provider)
for i := 0; i < lines; i++ {
output, err := r.Read([]string{"column1", "column2", "column3", "column4", "column5", "column6", "column7", "column8", "column9", "column10"})
require.NoError(t, err)
Expand All @@ -112,7 +116,7 @@ func TestReaderLoader(t *testing.T) {
})

t.Run("time column", func(t *testing.T) {
c := encoding.GetNewEventLoader(tc.destType, tc.loadFileType, writer)
c := em.NewLoader(tc.destType, tc.loadFileType, writer)

t.Run("GetLoadTimeFormat", func(t *testing.T) {
for column, format := range tc.timeColumnFormatMap {
Expand Down Expand Up @@ -156,7 +160,9 @@ func TestReaderLoader(t *testing.T) {
}
)

writer, err := encoding.CreateParquetWriter(schema, outputFilePath, destType)
em := encoding.NewManager(config.Default)

writer, err := em.NewWriter(loadFileType, outputFilePath, schema, destType)
require.NoError(t, err)

t.Cleanup(func() {
Expand All @@ -165,7 +171,7 @@ func TestReaderLoader(t *testing.T) {

t.Run("add and write", func(t *testing.T) {
for i := 0; i < lines; i++ {
c := encoding.GetNewEventLoader(destType, loadFileType, writer)
c := em.NewLoader(destType, loadFileType, writer)

// add columns in sorted order
c.AddColumn("column1", "bigint", 1234567890)
Expand Down Expand Up @@ -258,17 +264,17 @@ func TestReaderLoader(t *testing.T) {
})

t.Run("invalid file path", func(t *testing.T) {
_, err := encoding.CreateParquetWriter(schema, "", destType)
_, err := em.NewWriter(loadFileType, "", schema, destType)
require.EqualError(t, err, errors.New("open : no such file or directory").Error())
})

t.Run("unsupported dest type", func(t *testing.T) {
_, err := encoding.CreateParquetWriter(schema, outputFilePath, warehouseutils.BQ)
_, err := em.NewWriter(loadFileType, outputFilePath, schema, warehouseutils.BQ)
require.EqualError(t, err, errors.New("unsupported warehouse for parquet load files").Error())
})

t.Run("time column", func(t *testing.T) {
c := encoding.GetNewEventLoader(destType, loadFileType, writer)
c := em.NewLoader(destType, loadFileType, writer)

t.Run("GetLoadTimeFormat", func(t *testing.T) {
for column, format := range timeColumnFormatMap {
Expand Down
24 changes: 0 additions & 24 deletions warehouse/encoding/eventloader.go

This file was deleted.

18 changes: 0 additions & 18 deletions warehouse/encoding/eventreader.go

This file was deleted.

Loading

0 comments on commit f84572f

Please sign in to comment.