Skip to content

Commit

Permalink
master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 21, 2023
2 parents 8a7d14d + 188b95c commit 09b9d6c
Show file tree
Hide file tree
Showing 39 changed files with 647 additions and 546 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ replace (
)

require (
cloud.google.com/go/bigquery v1.53.0
cloud.google.com/go/bigquery v1.54.0
cloud.google.com/go/pubsub v1.33.0
cloud.google.com/go/storage v1.32.0
github.com/Azure/azure-storage-blob-go v0.15.0
Expand All @@ -34,7 +34,7 @@ require (
github.com/allisson/go-pglock/v2 v2.0.1
github.com/apache/pulsar-client-go v0.11.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/aws/aws-sdk-go v1.44.324
github.com/aws/aws-sdk-go v1.44.326
github.com/bugsnag/bugsnag-go/v2 v2.2.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.2.1
Expand All @@ -60,7 +60,7 @@ require (
github.com/lib/pq v1.10.9
github.com/linkedin/goavro/v2 v2.12.0
github.com/minio/minio-go v6.0.14+incompatible
github.com/minio/minio-go/v7 v7.0.61
github.com/minio/minio-go/v7 v7.0.62
github.com/mitchellh/mapstructure v1.5.0
github.com/mkmik/multierror v0.3.0
github.com/onsi/ginkgo/v2 v2.11.0
Expand All @@ -78,6 +78,7 @@ require (
github.com/snowflakedb/gosnowflake v1.6.23
github.com/sony/gobreaker v0.5.0
github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/cast v1.5.0
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.16.0
Expand Down Expand Up @@ -239,7 +240,6 @@ require (
github.com/shirou/gopsutil/v3 v3.23.4 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.9.3 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ cloud.google.com/go/bigquery v1.44.0/go.mod h1:0Y33VqXTEsbamHJvJHdFmtqHvMIY28aK1
cloud.google.com/go/bigquery v1.47.0/go.mod h1:sA9XOgy0A8vQK9+MWhEQTY6Tix87M/ZurWFIxmF9I/E=
cloud.google.com/go/bigquery v1.48.0/go.mod h1:QAwSz+ipNgfL5jxiaK7weyOhzdoAy1zFm0Nf1fysJac=
cloud.google.com/go/bigquery v1.49.0/go.mod h1:Sv8hMmTFFYBlt/ftw2uN6dFdQPzBlREY9yBh7Oy7/4Q=
cloud.google.com/go/bigquery v1.53.0 h1:K3wLbjbnSlxhuG5q4pntHv5AEbQM1QqHKGYgwFIqOTg=
cloud.google.com/go/bigquery v1.53.0/go.mod h1:3b/iXjRQGU4nKa87cXeg6/gogLjO8C6PmuM8i5Bi/u4=
cloud.google.com/go/bigquery v1.54.0 h1:ify6s7sy+kQuAimRnVTrPUzaeY0+X5GEsKt2C5CiA8w=
cloud.google.com/go/bigquery v1.54.0/go.mod h1:9Y5I3PN9kQWuid6183JFhOGOW3GcirA5LpsKCUn+2ec=
cloud.google.com/go/billing v1.4.0/go.mod h1:g9IdKBEFlItS8bTtlrZdVLWSSdSyFUZKXNS02zKMOZY=
cloud.google.com/go/billing v1.5.0/go.mod h1:mztb1tBc3QekhjSgmpf/CV4LzWXLzCArwpLmP2Gm88s=
cloud.google.com/go/billing v1.6.0/go.mod h1:WoXzguj+BeHXPbKfNWkqVtDdzORazmCjraY+vrxcyvI=
Expand Down Expand Up @@ -761,8 +761,8 @@ github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZve
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.43.31/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.324 h1:/uja9PtgeeqrZCPOJTenjMLNpciIMuzaRKooq+erG4A=
github.com/aws/aws-sdk-go v1.44.324/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go v1.44.326 h1:/6xD/9mKZ2RMTDfbhh9qCxw+CaTbJRvfHJ/NHPFbI38=
github.com/aws/aws-sdk-go v1.44.326/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU=
github.com/aws/aws-sdk-go-v2 v1.17.7 h1:CLSjnhJSTSogvqUGhIC6LqFKATMRexcxLZ0i/Nzk9Eg=
github.com/aws/aws-sdk-go-v2 v1.17.7/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
Expand Down Expand Up @@ -1559,8 +1559,8 @@ github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEp
github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o=
github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8=
github.com/minio/minio-go/v7 v7.0.34/go.mod h1:nCrRzjoSUQh8hgKKtu3Y708OLvRLtuASMg2/nvmbarw=
github.com/minio/minio-go/v7 v7.0.61 h1:87c+x8J3jxQ5VUGimV9oHdpjsAvy3fhneEBKuoKEVUI=
github.com/minio/minio-go/v7 v7.0.61/go.mod h1:BTu8FcrEw+HidY0zd/0eny43QnVNkXRPXrLXFuQBHXg=
github.com/minio/minio-go/v7 v7.0.62 h1:qNYsFZHEzl+NfH8UxW4jpmlKav1qUAgfY30YNRneVhc=
github.com/minio/minio-go/v7 v7.0.62/go.mod h1:Q6X7Qjb7WMhvG65qKf4gUgA5XaiSox74kR1uAEjxRS4=
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
Expand Down
2 changes: 0 additions & 2 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
"github.com/rudderlabs/rudder-server/warehouse"
"github.com/rudderlabs/rudder-server/warehouse/encoding"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/rudderlabs/rudder-server/warehouse/validations"
)
Expand Down Expand Up @@ -330,7 +329,6 @@ func runAllInit() {
diagnostics.Init()
backendconfig.Init()
warehouseutils.Init()
encoding.Init()
pgnotifier.Init()
jobsdb.Init()
warehouse.Init4()
Expand Down
23 changes: 12 additions & 11 deletions warehouse/encoding/csvloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,56 +10,57 @@ import (
"github.com/rudderlabs/rudder-server/utils/misc"
)

// CsvLoader is common for non-BQ warehouses.
// csvLoader is common for non-BQ warehouses.
// If you need any custom logic, either extend this or use destType and if/else/switch.
type CsvLoader struct {
type csvLoader struct {
destType string
csvRow []string
buff bytes.Buffer
csvWriter *csv.Writer
fileWriter LoadFileWriter
}

func NewCSVLoader(destType string, writer LoadFileWriter) *CsvLoader {
loader := &CsvLoader{destType: destType, fileWriter: writer}
func newCSVLoader(writer LoadFileWriter, destType string) *csvLoader {
loader := &csvLoader{destType: destType, fileWriter: writer}
loader.csvRow = []string{}
loader.buff = bytes.Buffer{}
loader.csvWriter = csv.NewWriter(&loader.buff)
return loader
}

func (loader *CsvLoader) IsLoadTimeColumn(columnName string) bool {
func (loader *csvLoader) IsLoadTimeColumn(columnName string) bool {
return columnName == warehouseutils.ToProviderCase(loader.destType, UUIDTsColumn)
}

func (*CsvLoader) GetLoadTimeFormat(string) string {
func (*csvLoader) GetLoadTimeFormat(string) string {
return misc.RFC3339Milli
}

func (loader *CsvLoader) AddColumn(_, _ string, val interface{}) {
func (loader *csvLoader) AddColumn(_, _ string, val interface{}) {
valString := fmt.Sprintf("%v", val)
loader.csvRow = append(loader.csvRow, valString)
}

func (loader *CsvLoader) AddRow(_, row []string) {
func (loader *csvLoader) AddRow(_, row []string) {
loader.csvRow = append(loader.csvRow, row...)
}

func (loader *CsvLoader) AddEmptyColumn(columnName string) {
func (loader *csvLoader) AddEmptyColumn(columnName string) {
loader.AddColumn(columnName, "", "")
}

func (loader *CsvLoader) WriteToString() (string, error) {
func (loader *csvLoader) WriteToString() (string, error) {
err := loader.csvWriter.Write(loader.csvRow)
if err != nil {
return "", fmt.Errorf("csvWriter write: %w", err)
}

loader.csvWriter.Flush()

return loader.buff.String(), nil
}

func (loader *CsvLoader) Write() error {
func (loader *csvLoader) Write() error {
eventData, err := loader.WriteToString()
if err != nil {
return fmt.Errorf("writing to string: %w", err)
Expand Down
8 changes: 4 additions & 4 deletions warehouse/encoding/csvreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"io"
)

type CsvReader struct {
type csvReader struct {
reader *csv.Reader
}

func (csv *CsvReader) Read([]string) (record []string, err error) {
func (csv *csvReader) Read([]string) (record []string, err error) {
record, err = csv.reader.Read()
return
}

func NewCsvReader(r io.Reader) *CsvReader {
return &CsvReader{reader: csv.NewReader(r)}
func newCsvReader(r io.Reader) *csvReader {
return &csvReader{reader: csv.NewReader(r)}
}
72 changes: 63 additions & 9 deletions warehouse/encoding/encoding.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package encoding

import (
"io"
"os"

"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"

"github.com/rudderlabs/rudder-go-kit/config"
)

Expand All @@ -13,11 +18,23 @@ const (
BQUuidTSFormat = "2006-01-02 15:04:05 Z"
)

var (
maxStagingFileReadBufferCapacityInK int
parquetParallelWriters int64
)
type Factory struct {
config struct {
maxStagingFileReadBufferCapacityInK int
parquetParallelWriters int64
}
}

func NewFactory(conf *config.Config) *Factory {
m := &Factory{}

conf.RegisterIntConfigVariable(10240, &m.config.maxStagingFileReadBufferCapacityInK, false, 1, "Warehouse.maxStagingFileReadBufferCapacityInK")
conf.RegisterInt64ConfigVariable(8, &m.config.parquetParallelWriters, true, 1, "Warehouse.parquetParallelWriters")

return m
}

// LoadFileWriter is an interface for writing events to a load file
type LoadFileWriter interface {
WriteGZ(s string) error
Write(p []byte) (int, error)
Expand All @@ -26,11 +43,48 @@ type LoadFileWriter interface {
GetLoadFile() *os.File
}

func Init() {
loadConfig()
func (m *Factory) NewLoadFileWriter(loadFileType, outputFilePath string, schema model.TableSchema, destType string) (LoadFileWriter, error) {
switch loadFileType {
case warehouseutils.LoadFileTypeParquet:
return createParquetWriter(outputFilePath, schema, destType, m.config.parquetParallelWriters)
default:
return misc.CreateGZ(outputFilePath)
}
}

// EventLoader is an interface for loading events into a load file
// It's used to load singular BatchRouterEvent events into a load file
type EventLoader interface {
IsLoadTimeColumn(columnName string) bool
GetLoadTimeFormat(columnName string) string
AddColumn(columnName, columnType string, val interface{})
AddRow(columnNames, values []string)
AddEmptyColumn(columnName string)
WriteToString() (string, error)
Write() error
}

func (m *Factory) NewEventLoader(w LoadFileWriter, loadFileType, destinationType string) EventLoader {
switch loadFileType {
case warehouseutils.LoadFileTypeJson:
return newJSONLoader(w, destinationType)
case warehouseutils.LoadFileTypeParquet:
return newParquetLoader(w, destinationType)
default:
return newCSVLoader(w, destinationType)
}
}

// EventReader is an interface for reading events from a load file
type EventReader interface {
Read(columnNames []string) (record []string, err error)
}

func loadConfig() {
config.RegisterIntConfigVariable(10240, &maxStagingFileReadBufferCapacityInK, false, 1, "Warehouse.maxStagingFileReadBufferCapacityInK")
config.RegisterInt64ConfigVariable(8, &parquetParallelWriters, true, 1, "Warehouse.parquetParallelWriters")
func (m *Factory) NewEventReader(r io.Reader, destType string) EventReader {
switch destType {
case warehouseutils.BQ:
return newJSONReader(r, m.config.maxStagingFileReadBufferCapacityInK)
default:
return newCsvReader(r)
}
}
Loading

0 comments on commit 09b9d6c

Please sign in to comment.