Skip to content

Commit

Permalink
chore: reduce parquet file size datalake (#3035)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Mar 7, 2023
1 parent a4243de commit 4cb5907
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 333 deletions.
11 changes: 5 additions & 6 deletions services/filemanager/digitalOceanSpacesManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/samber/lo"

SpacesManager "github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/rudderlabs/rudder-server/utils/misc"
)
Expand Down Expand Up @@ -179,15 +181,12 @@ func (manager *DOSpacesManager) DeleteObjects(ctx context.Context, keys []string
svc := s3.New(sess)

batchSize := 1000 // max accepted by DeleteObjects API
for i := 0; i < len(objects); i += batchSize {
j := i + batchSize
if j > len(objects) {
j = len(objects)
}
chunks := lo.Chunk(objects, batchSize)
for _, chunk := range chunks {
input := &s3.DeleteObjectsInput{
Bucket: aws.String(manager.Config.Bucket),
Delete: &s3.Delete{
Objects: objects[i:j],
Objects: chunk,
},
}

Expand Down
11 changes: 5 additions & 6 deletions services/filemanager/s3manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
awsS3Manager "github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/mitchellh/mapstructure"
"github.com/samber/lo"

appConfig "github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/utils/awsutils"
)
Expand Down Expand Up @@ -122,15 +124,12 @@ func (manager *S3Manager) DeleteObjects(ctx context.Context, keys []string) (err
svc := s3.New(sess)

batchSize := 1000 // max accepted by DeleteObjects API
for i := 0; i < len(objects); i += batchSize {
j := i + batchSize
if j > len(objects) {
j = len(objects)
}
chunks := lo.Chunk(objects, batchSize)
for _, chunk := range chunks {
input := &s3.DeleteObjectsInput{
Bucket: aws.String(manager.Config.Bucket),
Delete: &s3.Delete{
Objects: objects[i:j],
Objects: chunk,
},
}

Expand Down
46 changes: 22 additions & 24 deletions warehouse/internal/loadfiles/loadfiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"strings"
"time"

schemarepository "github.com/rudderlabs/rudder-server/warehouse/integrations/datalake/schema-repository"
"github.com/samber/lo"

"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

jsoniter "github.com/json-iterator/go"
"github.com/rudderlabs/rudder-server/config"
Expand All @@ -15,11 +18,11 @@ import (
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/timeutil"
schemarepository "github.com/rudderlabs/rudder-server/warehouse/integrations/datalake/schema-repository"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
"github.com/rudderlabs/rudder-server/warehouse/logfield"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary
Expand Down Expand Up @@ -109,7 +112,7 @@ func WithConfig(ld *LoadFileGenerator, config *config.Config) {
}

// CreateLoadFiles for the staging files that have not been successfully processed.
func (lf *LoadFileGenerator) CreateLoadFiles(ctx context.Context, job model.UploadJob) (int64, int64, error) {
func (lf *LoadFileGenerator) CreateLoadFiles(ctx context.Context, job *model.UploadJob) (int64, int64, error) {
stagingFiles := job.StagingFiles

var toProcessStagingFiles []*model.StagingFile
Expand All @@ -124,11 +127,11 @@ func (lf *LoadFileGenerator) CreateLoadFiles(ctx context.Context, job model.Uplo
}

// ForceCreateLoadFiles creates load files for the staging files, regardless if they are already successfully processed.
func (lf *LoadFileGenerator) ForceCreateLoadFiles(ctx context.Context, job model.UploadJob) (int64, int64, error) {
func (lf *LoadFileGenerator) ForceCreateLoadFiles(ctx context.Context, job *model.UploadJob) (int64, int64, error) {
return lf.createFromStaging(ctx, job, job.StagingFiles)
}

func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job model.UploadJob, toProcessStagingFiles []*model.StagingFile) (int64, int64, error) {
func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job *model.UploadJob, toProcessStagingFiles []*model.StagingFile) (int64, int64, error) {
destID := job.Upload.DestinationID
destType := job.Upload.DestinationType

Expand All @@ -153,7 +156,6 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job model.Up
}

stagingFileIDs := repo.StagingFileIDs(toProcessStagingFiles)

err = lf.LoadRepo.DeleteByStagingFiles(ctx, stagingFileIDs)
if err != nil {
return 0, 0, fmt.Errorf("deleting previous load files: %w", err)
Expand All @@ -177,15 +179,11 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job model.Up
var g errgroup.Group

var sampleError error
for i := 0; i < len(toProcessStagingFiles); i += publishBatchSize {
j := i + publishBatchSize
if j > len(toProcessStagingFiles) {
j = len(toProcessStagingFiles)
}

chunks := lo.Chunk(toProcessStagingFiles, publishBatchSize)
for _, chunk := range chunks {
// td : add prefix to payload for s3 dest
var messages []pgnotifier.JobPayload
for _, stagingFile := range toProcessStagingFiles[i:j] {
for _, stagingFile := range chunk {
payload := WorkerJobRequest{
UploadID: job.Upload.ID,
StagingFileID: stagingFile.ID,
Expand Down Expand Up @@ -221,9 +219,6 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job model.Up
}

schema := &job.Upload.UploadSchema
if job.Upload.LoadFileType == warehouseutils.LOAD_FILE_TYPE_PARQUET {
schema = &job.Upload.MergedSchema
}

lf.Logger.Infof("[WH]: Publishing %d staging files for %s:%s to PgNotifier", len(messages), destType, destID)
messagePayload := pgnotifier.MessagePayload{
Expand All @@ -237,13 +232,16 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job model.Up
}
// set messages to nil to release mem allocated
messages = nil
batchStartIdx := i
batchEndIdx := j

startId := chunk[0].ID
endId := chunk[len(chunk)-1].ID
g.Go(func() error {
responses := <-ch
lf.Logger.Infof("[WH]: Received responses for staging files %d:%d for %s:%s from PgNotifier", toProcessStagingFiles[batchStartIdx].ID, toProcessStagingFiles[batchEndIdx-1].ID, destType, destID)

lf.Logger.Infow("Received responses for staging files %d:%d for %s:%s from PgNotifier",
"startId", startId,
"endID", endId,
logfield.DestinationID, destType,
logfield.DestinationType, destID,
)
var loadFiles []model.LoadFile
var successfulStagingFileIDs []int64
for _, resp := range responses {
Expand Down Expand Up @@ -303,7 +301,7 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job model.Up
return 0, 0, err
}

loadFiles, err := lf.LoadRepo.GetByStagingFiles(ctx, repo.StagingFileIDs(toProcessStagingFiles))
loadFiles, err := lf.LoadRepo.GetByStagingFiles(ctx, stagingFileIDs)
if err != nil {
return 0, 0, fmt.Errorf("getting load files: %w", err)
}
Expand All @@ -330,7 +328,7 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job model.Up
return loadFiles[0].ID, loadFiles[len(loadFiles)-1].ID, nil
}

func (lf *LoadFileGenerator) destinationRevisionIDMap(ctx context.Context, job model.UploadJob) (revisionIDMap map[string]backendconfig.DestinationT, err error) {
func (lf *LoadFileGenerator) destinationRevisionIDMap(ctx context.Context, job *model.UploadJob) (revisionIDMap map[string]backendconfig.DestinationT, err error) {
revisionIDMap = make(map[string]backendconfig.DestinationT)

// TODO: ensure DestinationRevisionID is populated
Expand Down
14 changes: 7 additions & 7 deletions warehouse/internal/loadfiles/loadfiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestCreateLoadFiles(t *testing.T) {
StagingFiles: stagingFiles,
}

startID, endID, err := lf.CreateLoadFiles(ctx, job)
startID, endID, err := lf.CreateLoadFiles(ctx, &job)
require.NoError(t, err)
require.Equal(t, int64(1), startID)
require.Equal(t, int64(20), endID)
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestCreateLoadFiles(t *testing.T) {
}
stagingFiles[0].Status = warehouseutils.StagingFileFailedState

startID, endID, err := lf.CreateLoadFiles(ctx, job)
startID, endID, err := lf.CreateLoadFiles(ctx, &job)
require.NoError(t, err)
require.Equal(t, int64(21), startID)
require.Equal(t, int64(22), endID)
Expand All @@ -139,7 +139,7 @@ func TestCreateLoadFiles(t *testing.T) {
stagingFile.Status = warehouseutils.StagingFileSucceededState
}

startID, endID, err := lf.ForceCreateLoadFiles(ctx, job)
startID, endID, err := lf.ForceCreateLoadFiles(ctx, &job)
require.NoError(t, err)
require.Equal(t, int64(23), startID)
require.Equal(t, int64(42), endID)
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestCreateLoadFiles_Failure(t *testing.T) {
t.Log("empty location should cause worker failure")
stagingFiles[0].Location = ""

startID, endID, err := lf.CreateLoadFiles(ctx, model.UploadJob{
startID, endID, err := lf.CreateLoadFiles(ctx, &model.UploadJob{
Warehouse: warehouse,
Upload: upload,
StagingFiles: stagingFiles,
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestCreateLoadFiles_Failure(t *testing.T) {
stagingFiles[i].Location = ""
}

startID, endID, err := lf.CreateLoadFiles(ctx, model.UploadJob{
startID, endID, err := lf.CreateLoadFiles(ctx, &model.UploadJob{
Warehouse: warehouse,
Upload: upload,
StagingFiles: stagingFiles,
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestCreateLoadFiles_DestinationHistory(t *testing.T) {
},
}

startID, endID, err := lf.CreateLoadFiles(ctx, job)
startID, endID, err := lf.CreateLoadFiles(ctx, &job)
require.NoError(t, err)
require.Equal(t, int64(1), startID)
require.Equal(t, int64(2), endID)
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestCreateLoadFiles_DestinationHistory(t *testing.T) {
t.Run("invalid revision ID", func(t *testing.T) {
stagingFile.DestinationRevisionID = "invalid_revision_id"

startID, endID, err := lf.CreateLoadFiles(ctx, job)
startID, endID, err := lf.CreateLoadFiles(ctx, &job)
require.EqualError(t, err, "populating destination revision ID: revision \"invalid_revision_id\" not found")
require.Zero(t, startID)
require.Zero(t, endID)
Expand Down
2 changes: 0 additions & 2 deletions warehouse/internal/model/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ const (
GeneratedLoadFiles = "generated_load_files"
UpdatedTableUploadsCounts = "updated_table_uploads_counts"
CreatedRemoteSchema = "created_remote_schema"
ExportedUserTables = "exported_user_tables"
ExportedData = "exported_data"
ExportingData = "exporting_data"
ExportingDataFailed = "exporting_data_failed"
ExportedIdentities = "exported_identities"
Aborted = "aborted"
Failed = "failed"
)
Expand Down
16 changes: 8 additions & 8 deletions warehouse/internal/repo/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ const (
loadTableName = warehouseutils.WarehouseLoadFilesTable
loadTableColumns = `
id,
staging_file_id,
location,
source_id,
destination_id,
destination_type,
table_name,
total_events,
staging_file_id,
location,
source_id,
destination_id,
destination_type,
table_name,
total_events,
metadata
`
)
Expand Down Expand Up @@ -70,7 +70,7 @@ func (repo *LoadFiles) Insert(ctx context.Context, loadFiles []model.LoadFile) (
if err != nil {
return fmt.Errorf(`inserting load files: CopyIn: %w`, err)
}
defer stmt.Close()
defer func() { _ = stmt.Close() }()

for _, loadFile := range loadFiles {
metadata := fmt.Sprintf(`{"content_length": %d, "destination_revision_id": %q, "use_rudder_storage": %t}`, loadFile.ContentLength, loadFile.DestinationRevisionID, loadFile.UseRudderStorage)
Expand Down
3 changes: 2 additions & 1 deletion warehouse/internal/repo/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
"github.com/stretchr/testify/require"
)

func Test_LoadFiles(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion warehouse/internal/repo/staging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,6 @@ func TestStagingFileIDs(t *testing.T) {
ID: 3,
},
}

ids := repo.StagingFileIDs(sfs)
require.Equal(t, []int64{1, 2, 3}, ids)
}
Expand Down
6 changes: 2 additions & 4 deletions warehouse/internal/repo/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func TestUploads_Get(t *testing.T) {
LastAttemptAt: time.Time{},
Attempts: 0,
UploadSchema: model.Schema{},
MergedSchema: model.Schema{},
}

files := []model.StagingFile{
Expand Down Expand Up @@ -92,6 +91,7 @@ func TestUploads_Get(t *testing.T) {
ogUpload.SourceTaskRunID = "source_task_run_id"
ogUpload.SourceJobID = "source_job_id"
ogUpload.SourceJobRunID = "source_job_run_id"
ogUpload.MergedSchema = warehouseutils.SchemaT{}

t.Run("Get", func(t *testing.T) {
upload, err := repoUpload.Get(ctx, id)
Expand Down Expand Up @@ -197,7 +197,6 @@ func TestUploads_GetToProcess(t *testing.T) {
LastAttemptAt: time.Time{},
Attempts: 0,
UploadSchema: model.Schema{},
MergedSchema: model.Schema{},

UseRudderStorage: true,
SourceTaskRunID: sourceTaskRunID,
Expand Down Expand Up @@ -571,10 +570,10 @@ func TestUploads_Processing(t *testing.T) {
uploads[i].ID = id
uploads[i].Error = []byte("{}")
uploads[i].UploadSchema = model.Schema{}
uploads[i].MergedSchema = model.Schema{}
uploads[i].LoadFileType = "csv"
uploads[i].StagingFileStartID = int64(i + 1)
uploads[i].StagingFileEndID = int64(i + 1)
uploads[i].MergedSchema = warehouseutils.SchemaT{}
require.NoError(t, err)
}

Expand Down Expand Up @@ -674,7 +673,6 @@ func TestUploads_UploadMetadata(t *testing.T) {
LastAttemptAt: time.Time{},
Attempts: 0,
UploadSchema: nil,
MergedSchema: nil,
}
metadata := repo.ExtractUploadMetadata(upload)

Expand Down
39 changes: 0 additions & 39 deletions warehouse/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,42 +448,3 @@ func getTableSchemaDiff(tableName string, currentSchema, uploadSchema warehouseu
}
return diff
}

// returns the merged schema(uploadSchema+schemaInWarehousePreUpload) for all tables in uploadSchema
func mergeUploadAndLocalSchemas(uploadSchema, schemaInWarehousePreUpload warehouseutils.SchemaT) warehouseutils.SchemaT {
mergedSchema := warehouseutils.SchemaT{}
// iterate over all tables in uploadSchema
for uploadTableName, uploadTableSchema := range uploadSchema {
if _, ok := mergedSchema[uploadTableName]; !ok {
// init map if it does not exist
mergedSchema[uploadTableName] = map[string]string{}
}

// uploadSchema becomes the merged schema if the table does not exist in local Schema
localTableSchema, ok := schemaInWarehousePreUpload[uploadTableName]
if !ok {
mergedSchema[uploadTableName] = uploadTableSchema
continue
}

// iterate over all columns in localSchema and add them to merged schema
for localColName, localColType := range localTableSchema {
mergedSchema[uploadTableName][localColName] = localColType
}

// iterate over all columns in uploadSchema and add them to merged schema if required
for uploadColName, uploadColType := range uploadTableSchema {
localColType, ok := localTableSchema[uploadColName]
// add uploadCol to mergedSchema if the col does not exist in localSchema
if !ok {
mergedSchema[uploadTableName][uploadColName] = uploadColType
continue
}
// change type of uploadCol to text if it was string in localSchema
if uploadColType == "text" && localColType == "string" {
mergedSchema[uploadTableName][uploadColName] = uploadColType
}
}
}
return mergedSchema
}

0 comments on commit 4cb5907

Please sign in to comment.