Skip to content

Commit

Permalink
chore: master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jul 4, 2023
1 parent dbf6865 commit 2e1883f
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 135 deletions.
160 changes: 86 additions & 74 deletions warehouse/slave.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,18 @@ type uploadProcessingResult struct {
}

type uploadResult struct {
tableName string
location string
totalRows int
contentLength int64
stagingFileID int64
destinationRevisionID string
useRudderStorage bool
TableName string
Location string
TotalRows int
ContentLength int64
StagingFileID int64
DestinationRevisionID string
UseRudderStorage bool
}

type asyncJobRunResult struct {
result bool
id string
Result bool
Id string
}

// jobRun Temporary store for processing staging file to load file
Expand All @@ -76,12 +76,40 @@ type jobRun struct {
tableEventCountMap map[string]int
stagingFileReader *gzip.Reader
whIdentifier string
stats stats.Stats
since func(time.Time) time.Duration
numLoadFileUploadWorkers int
slaveUploadTimeout time.Duration
logger logger.Logger
loadObjectFolder string

stats stats.Stats
uploadTimeStat stats.Measurement
totalUploadTimeStat stats.Measurement
downloadStagingFileStat stats.Measurement
processingStagingFileStat stats.Measurement
bytesProcessedStagingFileStat stats.Measurement
}

func newJobRun(job Payload, conf *config.Config, log logger.Logger, stat stats.Stats) jobRun {
jr := jobRun{
job: job,
whIdentifier: warehouseutils.GetWarehouseIdentifier(job.DestinationType, job.SourceID, job.DestinationID),
stats: stat,
since: time.Since,
logger: log,
}

jr.numLoadFileUploadWorkers = conf.GetInt("Warehouse.numLoadFileUploadWorkers", 8)
jr.slaveUploadTimeout = conf.GetDuration("Warehouse.slaveUploadTimeout", 10, conf.GetDuration("Warehouse.slaveUploadTimeoutInMin", 10, time.Minute))
jr.loadObjectFolder = conf.GetString("WAREHOUSE_BUCKET_LOAD_OBJECTS_FOLDER_NAME", "rudder-warehouse-load-objects")

jr.uploadTimeStat = jr.timerStat("load_file_upload_time")
jr.totalUploadTimeStat = jr.timerStat("load_file_total_upload_time")
jr.downloadStagingFileStat = jr.timerStat("download_staging_file_time")
jr.processingStagingFileStat = jr.timerStat("process_staging_file_time")
jr.bytesProcessedStagingFileStat = jr.counterStat("bytes_processed_in_staging_file")

return jr
}

func (jr *jobRun) setStagingFileReader() (reader *gzip.Reader, endOfFile bool) {
Expand Down Expand Up @@ -149,7 +177,8 @@ func (jr *jobRun) downloadStagingFile(ctx context.Context) error {
return err
}
file.Close()
jr.timerStat("download_staging_file_time").Since(downloadStart)

jr.downloadStagingFileStat.Since(downloadStart)

fi, err := os.Stat(filePath)
if err != nil {
Expand Down Expand Up @@ -201,7 +230,7 @@ func (jr *jobRun) uploadLoadFiles(ctx context.Context) ([]uploadResult, error) {

defer func() {
if err == nil {
jr.timerStat("load_file_total_upload_time").SendTiming(totalUploadTime.Load())
jr.totalUploadTimeStat.SendTiming(totalUploadTime.Load())
}
}()

Expand All @@ -216,22 +245,30 @@ func (jr *jobRun) uploadLoadFiles(ctx context.Context) ([]uploadResult, error) {
}

Check warning on line 245 in warehouse/slave.go

View check run for this annotation

Codecov / codecov/patch

warehouse/slave.go#L244-L245

Added lines #L244 - L245 were not covered by tests
defer func() { _ = file.Close() }()

var uploadLocation filemanager.UploadedFile

uploadStart := time.Now()

if slices.Contains(warehouseutils.TimeWindowDestinations, jr.job.DestinationType) {
return uploader.Upload(
uploadLocation, err = uploader.Upload(
ctx,
file,
warehouseutils.GetTablePathInObjectStorage(jr.job.DestinationNamespace, tableName),
jr.job.LoadFilePrefix,
)
} else {
uploadLocation, err = uploader.Upload(
ctx,
file,
jr.loadObjectFolder,
tableName,
jr.job.SourceID,
getBucketFolder(jr.job.UniqueLoadGenID, tableName),
)
}
return uploader.Upload(
ctx,
file,
jr.loadObjectFolder,
tableName,
jr.job.SourceID,
getBucketFolder(jr.job.UniqueLoadGenID, tableName),
)
jr.uploadTimeStat.SendTiming(jr.since(uploadStart))

return uploadLocation, err
}

process := func() <-chan *uploadProcessingResult {
Expand Down Expand Up @@ -272,13 +309,13 @@ func (jr *jobRun) uploadLoadFiles(ctx context.Context) ([]uploadResult, error) {

processStream <- &uploadProcessingResult{
result: uploadResult{
tableName: tableName,
location: uploadOutput.Location,
contentLength: loadFileStats.Size(),
totalRows: jr.tableEventCountMap[tableName],
stagingFileID: jr.job.StagingFileID,
destinationRevisionID: jr.job.DestinationRevisionID,
useRudderStorage: jr.job.UseRudderStorage,
TableName: tableName,
Location: uploadOutput.Location,
ContentLength: loadFileStats.Size(),
TotalRows: jr.tableEventCountMap[tableName],
StagingFileID: jr.job.StagingFileID,
DestinationRevisionID: jr.job.DestinationRevisionID,
UseRudderStorage: jr.job.UseRudderStorage,
},
}
return nil
Expand All @@ -295,16 +332,17 @@ func (jr *jobRun) uploadLoadFiles(ctx context.Context) ([]uploadResult, error) {
}

processStream := process()
output := make([]uploadResult, len(jr.outputFileWritersMap))

for i := 0; i < len(jr.outputFileWritersMap); i++ {
result := <-processStream
output := make([]uploadResult, 0, len(jr.outputFileWritersMap))

if err := result.err; err != nil {
for processedJob := range processStream {
if err := processedJob.err; err != nil {
return nil, fmt.Errorf("uploading load file to object storage: %w", err)
}

output[i] = result.result
output = append(output, processedJob.result)
}
if len(output) != len(jr.outputFileWritersMap) {
return nil, fmt.Errorf("matching number of load file upload outputs: expected %d, got %d", len(jr.outputFileWritersMap), len(output))
}

Check warning on line 346 in warehouse/slave.go

View check run for this annotation

Codecov / codecov/patch

warehouse/slave.go#L345-L346

Added lines #L345 - L346 were not covered by tests

return output, nil
Expand All @@ -315,7 +353,7 @@ func (jr *jobRun) GetWriter(tableName string) (encoding.LoadFileWriter, error) {
if !ok {
var err error
outputFilePath := jr.getLoadFilePath(tableName)
if jr.job.LoadFileType == warehouseutils.LOAD_FILE_TYPE_PARQUET {
if jr.job.LoadFileType == warehouseutils.LoadFileTypeParquet {
writer, err = encoding.CreateParquetWriter(jr.job.UploadSchema[tableName], outputFilePath, jr.job.DestinationType)
} else {
writer, err = misc.CreateGZ(outputFilePath)
Expand Down Expand Up @@ -408,25 +446,6 @@ func (job *Payload) getSortedColumnMapForAllTables() map[string][]string {
return sortedTableColumnMap
}

func (jobRun *JobRun) GetWriter(tableName string) (encoding.LoadFileWriter, error) {
writer, ok := jobRun.outputFileWritersMap[tableName]
if !ok {
var err error
outputFilePath := jobRun.getLoadFilePath(tableName)
if jobRun.job.LoadFileType == warehouseutils.LoadFileTypeParquet {
writer, err = encoding.CreateParquetWriter(jobRun.job.UploadSchema[tableName], outputFilePath, jobRun.job.DestinationType)
} else {
writer, err = misc.CreateGZ(outputFilePath)
}
if err != nil {
return nil, err
}
jobRun.outputFileWritersMap[tableName] = writer
jobRun.tableEventCountMap[tableName] = 0
}
return writer, nil
}

func (job *Payload) sendDownloadStagingFileFailedStat() {
tags := []warehouseutils.Tag{
{
Expand Down Expand Up @@ -552,22 +571,14 @@ func processClaimedUploadJob(ctx context.Context, claimedJob pgnotifier.Claim, w
// 5. Delete the staging and load files from tmp directory
func processStagingFile(ctx context.Context, job Payload, workerIndex int) (loadFileUploadOutputs []uploadResult, err error) {
processStartTime := time.Now()
jr := jobRun{
job: job,
whIdentifier: warehouseutils.GetWarehouseIdentifier(job.DestinationType, job.SourceID, job.DestinationID),
stats: stats.Default,
since: time.Since,
numLoadFileUploadWorkers: config.GetInt("Warehouse.numLoadFileUploadWorkers", 8),
slaveUploadTimeout: config.GetDuration("Warehouse.slaveUploadTimeout", 10, config.GetDuration("Warehouse.slaveUploadTimeoutInMin", 10, time.Minute)),
logger: pkgLogger,
loadObjectFolder: config.GetString("WAREHOUSE_BUCKET_LOAD_OBJECTS_FOLDER_NAME", "rudder-warehouse-load-objects"),
}

defer jr.counterStat("staging_files_processed", warehouseutils.Tag{Name: "worker_id", Value: strconv.Itoa(workerIndex)}).Count(1)
jr := newJobRun(job, config.Default, pkgLogger, stats.Default)

defer func() {
jr.counterStat("staging_files_processed", warehouseutils.Tag{Name: "worker_id", Value: strconv.Itoa(workerIndex)}).Count(1)
jr.timerStat("staging_files_total_processing_time", warehouseutils.Tag{Name: "worker_id", Value: strconv.Itoa(workerIndex)}).Since(processStartTime)
jr.cleanup()
}()
defer jr.cleanup()

jr.logger.Debugf("[WH]: Starting processing staging file: %v at %s for %s", job.StagingFileID, job.StagingFileLocation, jr.whIdentifier)

Expand Down Expand Up @@ -746,10 +757,11 @@ func processStagingFile(ctx context.Context, job Payload, workerIndex int) (load
}
jr.tableEventCountMap[tableName]++
}
jr.timerStat("process_staging_file_time").Since(processingStart)
jr.processingStagingFileStat.Since(processingStart)

jr.logger.Debugf("[WH]: Process %v bytes from downloaded staging file: %s", lineBytesCounter, job.StagingFileLocation)
jr.counterStat("bytes_processed_in_staging_file").Count(lineBytesCounter)
jr.bytesProcessedStagingFileStat.Count(lineBytesCounter)

for _, loadFile := range jr.outputFileWritersMap {
err = loadFile.Close()
if err != nil {
Expand Down Expand Up @@ -796,26 +808,26 @@ func processClaimedAsyncJob(ctx context.Context, claimedJob pgnotifier.Claim) {
func runAsyncJob(ctx context.Context, asyncjob jobs.AsyncJobPayload) (asyncJobRunResult, error) {
warehouse, err := getDestinationFromSlaveConnectionMap(asyncjob.DestinationID, asyncjob.SourceID)
if err != nil {
return asyncJobRunResult{id: asyncjob.Id, result: false}, err
return asyncJobRunResult{Id: asyncjob.Id, Result: false}, err

Check warning on line 811 in warehouse/slave.go

View check run for this annotation

Codecov / codecov/patch

warehouse/slave.go#L811

Added line #L811 was not covered by tests
}
destType := warehouse.Destination.DestinationDefinition.Name
whManager, err := manager.NewWarehouseOperations(destType)
if err != nil {
return asyncJobRunResult{id: asyncjob.Id, result: false}, err
return asyncJobRunResult{Id: asyncjob.Id, Result: false}, err

Check warning on line 816 in warehouse/slave.go

View check run for this annotation

Codecov / codecov/patch

warehouse/slave.go#L816

Added line #L816 was not covered by tests
}
whasyncjob := &jobs.WhAsyncJob{}

var metadata warehouseutils.DeleteByMetaData
err = json.Unmarshal(asyncjob.MetaData, &metadata)
if err != nil {
return asyncJobRunResult{id: asyncjob.Id, result: false}, err
return asyncJobRunResult{Id: asyncjob.Id, Result: false}, err

Check warning on line 823 in warehouse/slave.go

View check run for this annotation

Codecov / codecov/patch

warehouse/slave.go#L823

Added line #L823 was not covered by tests
}
whManager.SetConnectionTimeout(warehouseutils.GetConnectionTimeout(
destType, warehouse.Destination.ID,
))
err = whManager.Setup(ctx, warehouse, whasyncjob)
if err != nil {
return asyncJobRunResult{id: asyncjob.Id, result: false}, err
return asyncJobRunResult{Id: asyncjob.Id, Result: false}, err

Check warning on line 830 in warehouse/slave.go

View check run for this annotation

Codecov / codecov/patch

warehouse/slave.go#L830

Added line #L830 was not covered by tests
}
defer whManager.Cleanup(ctx)
tableNames := []string{asyncjob.TableName}
Expand All @@ -831,8 +843,8 @@ func runAsyncJob(ctx context.Context, asyncjob jobs.AsyncJobPayload) (asyncJobRu
err = whManager.DeleteBy(ctx, tableNames, params)
}
asyncJobRunResult := asyncJobRunResult{
result: err == nil,
id: asyncjob.Id,
Result: err == nil,
Id: asyncjob.Id,
}
return asyncJobRunResult, err
}
Loading

0 comments on commit 2e1883f

Please sign in to comment.