Skip to content

Commit

Permalink
chore: uploader can append (#3805)
Browse files Browse the repository at this point in the history
* chore: uploader can append

* chore: - clickhouse custom mockUploader

* chore: - datalake schema repo custom mockUploader

* chore: - postgres integration custom mockUploader

* chore: - custom mockUploaders

* fix: snowflake test

* fix: restoring dummyUploader

* chore: deltalake append update

* fix: snowflake test

* chore: temp kit to fix data race

* chore: update of temp kit

* chore: go-kit v0.15.9

* chore: go mod tidy
  • Loading branch information
fracasula committed Sep 14, 2023
1 parent 60749af commit 56ab090
Show file tree
Hide file tree
Showing 14 changed files with 648 additions and 390 deletions.
162 changes: 70 additions & 92 deletions warehouse/integrations/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,28 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/compose-test/compose"

"github.com/rudderlabs/rudder-server/warehouse/integrations/clickhouse"

"github.com/rudderlabs/rudder-server/testhelper/workspaceConfig"

"github.com/rudderlabs/compose-test/testcompose"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/validations"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/stats"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/workspaceConfig"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/integrations/clickhouse"
"github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper"
mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"github.com/rudderlabs/rudder-server/warehouse/validations"
)

func TestIntegration(t *testing.T) {
Expand Down Expand Up @@ -358,48 +353,11 @@ func TestClickhouse_UseS3CopyEngineForLoading(t *testing.T) {
}
}

type mockUploader struct {
minioPort string
tableSchema model.TableSchema
metadata []warehouseutils.LoadFile
}

func (*mockUploader) GetSchemaInWarehouse() model.Schema { return model.Schema{} }
func (*mockUploader) GetLocalSchema(context.Context) (model.Schema, error) {
return model.Schema{}, nil
}
func (*mockUploader) UpdateLocalSchema(context.Context, model.Schema) error { return nil }
func (*mockUploader) ShouldOnDedupUseNewRecord() bool { return false }
func (*mockUploader) UseRudderStorage() bool { return false }
func (*mockUploader) GetLoadFileGenStartTIme() time.Time { return time.Time{} }
func (*mockUploader) GetLoadFileType() string { return "JSON" }
func (*mockUploader) GetFirstLastEvent() (time.Time, time.Time) { return time.Time{}, time.Time{} }
func (*mockUploader) GetTableSchemaInWarehouse(_ string) model.TableSchema {
return model.TableSchema{}
}

func (*mockUploader) GetSingleLoadFile(_ context.Context, _ string) (warehouseutils.LoadFile, error) {
return warehouseutils.LoadFile{}, nil
}

func (m *mockUploader) GetSampleLoadFileLocation(_ context.Context, _ string) (string, error) {
minioHostPort := fmt.Sprintf("localhost:%s", m.minioPort)

sampleLocation := m.metadata[0].Location
sampleLocation = strings.Replace(sampleLocation, minioHostPort, "minio:9000", 1)
return sampleLocation, nil
}

func (m *mockUploader) GetTableSchemaInUpload(string) model.TableSchema {
return m.tableSchema
}

func (m *mockUploader) GetLoadFilesMetadata(context.Context, warehouseutils.GetLoadFilesOptions) []warehouseutils.LoadFile {
return m.metadata
}

func TestClickhouse_LoadTableRoundTrip(t *testing.T) {
c := testcompose.New(t, compose.FilePaths([]string{"testdata/docker-compose.clickhouse.yml", "../testdata/docker-compose.minio.yml"}))
c := testcompose.New(t, compose.FilePaths([]string{
"testdata/docker-compose.clickhouse.yml",
"../testdata/docker-compose.minio.yml",
}))
c.Start(context.Background())

misc.Init()
Expand Down Expand Up @@ -485,28 +443,6 @@ func TestClickhouse_LoadTableRoundTrip(t *testing.T) {
},
},
}
mockUploader := &mockUploader{
tableSchema: model.TableSchema{
"alter_test_bool": "boolean",
"alter_test_datetime": "datetime",
"alter_test_float": "float",
"alter_test_int": "int",
"alter_test_string": "string",
"id": "string",
"received_at": "datetime",
"test_array_bool": "array(boolean)",
"test_array_datetime": "array(datetime)",
"test_array_float": "array(float)",
"test_array_int": "array(int)",
"test_array_string": "array(string)",
"test_bool": "boolean",
"test_datetime": "datetime",
"test_float": "float",
"test_int": "int",
"test_string": "string",
},
minioPort: strconv.Itoa(minioPort),
}

t.Log("Preparing load files metadata")
f, err := os.Open(tc.fileName)
Expand All @@ -530,13 +466,32 @@ func TestClickhouse_LoadTableRoundTrip(t *testing.T) {
require.NoError(t, err)

ctx := context.Background()

uploadOutput, err := fm.Upload(ctx, f, fmt.Sprintf("test_prefix_%d", i))
require.NoError(t, err)

mockUploader.metadata = append(mockUploader.metadata, warehouseutils.LoadFile{
Location: uploadOutput.Location,
})
mockUploader := newMockUploader(t,
strconv.Itoa(minioPort),
model.TableSchema{
"alter_test_bool": "boolean",
"alter_test_datetime": "datetime",
"alter_test_float": "float",
"alter_test_int": "int",
"alter_test_string": "string",
"id": "string",
"received_at": "datetime",
"test_array_bool": "array(boolean)",
"test_array_datetime": "array(datetime)",
"test_array_float": "array(float)",
"test_array_int": "array(int)",
"test_array_string": "array(string)",
"test_bool": "boolean",
"test_datetime": "datetime",
"test_float": "float",
"test_int": "int",
"test_string": "string",
},
[]warehouseutils.LoadFile{{Location: uploadOutput.Location}},
)

t.Log("Setting up clickhouse")
err = ch.Setup(ctx, warehouse, mockUploader)
Expand Down Expand Up @@ -742,7 +697,7 @@ func TestClickhouse_TestConnection(t *testing.T) {
},
}

err := ch.Setup(ctx, warehouse, &mockUploader{})
err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil))
require.NoError(t, err)

ch.SetConnectionTimeout(tc.timeout)
Expand Down Expand Up @@ -842,7 +797,7 @@ func TestClickhouse_LoadTestTable(t *testing.T) {
payload[k] = v
}

err := ch.Setup(ctx, warehouse, &mockUploader{})
err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil))
require.NoError(t, err)

err = ch.CreateSchema(ctx)
Expand Down Expand Up @@ -905,7 +860,7 @@ func TestClickhouse_FetchSchema(t *testing.T) {
},
}

err := ch.Setup(ctx, warehouse, &mockUploader{})
err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil))
require.NoError(t, err)

err = ch.CreateSchema(ctx)
Expand Down Expand Up @@ -950,7 +905,7 @@ func TestClickhouse_FetchSchema(t *testing.T) {
},
}

err := ch.Setup(ctx, warehouse, &mockUploader{})
err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil))
require.NoError(t, err)

schema, unrecognizedSchema, err := ch.FetchSchema(ctx)
Expand All @@ -976,7 +931,7 @@ func TestClickhouse_FetchSchema(t *testing.T) {
},
}

err := ch.Setup(ctx, warehouse, &mockUploader{})
err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil))
require.NoError(t, err)

schema, unrecognizedSchema, err := ch.FetchSchema(ctx)
Expand All @@ -1002,7 +957,7 @@ func TestClickhouse_FetchSchema(t *testing.T) {
},
}

err := ch.Setup(ctx, warehouse, &mockUploader{})
err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil))
require.NoError(t, err)

err = ch.CreateSchema(ctx)
Expand Down Expand Up @@ -1031,7 +986,7 @@ func TestClickhouse_FetchSchema(t *testing.T) {
},
}

err := ch.Setup(ctx, warehouse, &mockUploader{})
err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil))
require.NoError(t, err)

err = ch.CreateSchema(ctx)
Expand Down Expand Up @@ -1233,3 +1188,26 @@ func initializeClickhouseClusterMode(t testing.TB, clusterDBs []*sql.DB, tables
}
}
}

func newMockUploader(
t testing.TB,
minioPort string,
tableSchema model.TableSchema,
metadata []warehouseutils.LoadFile,
) *mockuploader.MockUploader {
var sampleLocation string
if len(metadata) > 0 {
minioHostPort := fmt.Sprintf("localhost:%s", minioPort)
sampleLocation = strings.Replace(metadata[0].Location, minioHostPort, "minio:9000", 1)
}

ctrl := gomock.NewController(t)
u := mockuploader.NewMockUploader(ctrl)
u.EXPECT().GetSampleLoadFileLocation(gomock.Any(), gomock.Any()).Return(sampleLocation, nil).AnyTimes()
u.EXPECT().GetTableSchemaInUpload(gomock.Any()).Return(tableSchema).AnyTimes()
u.EXPECT().GetLoadFilesMetadata(gomock.Any(), gomock.Any()).Return(metadata).AnyTimes()
u.EXPECT().UseRudderStorage().Return(false).AnyTimes()
u.EXPECT().GetSchemaInWarehouse().Return(nil).AnyTimes()

return u
}
69 changes: 18 additions & 51 deletions warehouse/integrations/datalake/schema-repository/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,16 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"

schemarepository "github.com/rudderlabs/rudder-server/warehouse/integrations/datalake/schema-repository"
mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type mockUploader struct {
mockError error
localSchema model.Schema
}

func (*mockUploader) GetSchemaInWarehouse() model.Schema { return model.Schema{} }
func (*mockUploader) ShouldOnDedupUseNewRecord() bool { return false }
func (*mockUploader) UseRudderStorage() bool { return false }
func (*mockUploader) GetLoadFileGenStartTIme() time.Time { return time.Time{} }
func (*mockUploader) GetLoadFileType() string { return "JSON" }
func (*mockUploader) GetFirstLastEvent() (time.Time, time.Time) { return time.Time{}, time.Time{} }
func (*mockUploader) GetTableSchemaInUpload(string) model.TableSchema { return nil }
func (*mockUploader) GetSampleLoadFileLocation(context.Context, string) (string, error) {
return "", nil
}

func (*mockUploader) GetLoadFilesMetadata(context.Context, warehouseutils.GetLoadFilesOptions) []warehouseutils.LoadFile {
return nil
}

func (*mockUploader) GetTableSchemaInWarehouse(string) model.TableSchema {
return model.TableSchema{}
}

func (*mockUploader) GetSingleLoadFile(context.Context, string) (warehouseutils.LoadFile, error) {
return warehouseutils.LoadFile{}, nil
}

func (m *mockUploader) GetLocalSchema(context.Context) (model.Schema, error) {
return m.localSchema, nil
}

func (m *mockUploader) UpdateLocalSchema(context.Context, model.Schema) error {
return m.mockError
}

func TestLocalSchemaRepository_CreateTable(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -87,10 +51,7 @@ func TestLocalSchemaRepository_CreateTable(t *testing.T) {
t.Parallel()

warehouse := model.Warehouse{}
uploader := &mockUploader{
mockError: tc.mockError,
localSchema: tc.localSchema,
}
uploader := newMockUploader(t, tc.mockError, tc.localSchema)

s, err := schemarepository.NewLocalSchemaRepository(warehouse, uploader)
require.NoError(t, err)
Expand Down Expand Up @@ -149,10 +110,7 @@ func TestLocalSchemaRepository_AddColumns(t *testing.T) {
t.Parallel()

warehouse := model.Warehouse{}
uploader := &mockUploader{
mockError: tc.mockError,
localSchema: tc.localSchema,
}
uploader := newMockUploader(t, tc.mockError, tc.localSchema)

s, err := schemarepository.NewLocalSchemaRepository(warehouse, uploader)
require.NoError(t, err)
Expand Down Expand Up @@ -223,10 +181,7 @@ func TestLocalSchemaRepository_AlterColumn(t *testing.T) {
t.Parallel()

warehouse := model.Warehouse{}
uploader := &mockUploader{
mockError: tc.mockError,
localSchema: tc.localSchema,
}
uploader := newMockUploader(t, tc.mockError, tc.localSchema)

s, err := schemarepository.NewLocalSchemaRepository(warehouse, uploader)
require.NoError(t, err)
Expand All @@ -242,3 +197,15 @@ func TestLocalSchemaRepository_AlterColumn(t *testing.T) {
})
}
}

func newMockUploader(
t testing.TB,
updateLocalSchemaErr error,
localSchema model.Schema,
) *mockuploader.MockUploader {
ctrl := gomock.NewController(t)
u := mockuploader.NewMockUploader(ctrl)
u.EXPECT().UpdateLocalSchema(gomock.Any(), gomock.Any()).Return(updateLocalSchemaErr).AnyTimes()
u.EXPECT().GetLocalSchema(gomock.Any()).Return(localSchema, nil).AnyTimes()
return u
}

0 comments on commit 56ab090

Please sign in to comment.