Skip to content

Commit

Permalink
fix: marketo bulk upload's upload url preparation fix (#4358)
Browse files Browse the repository at this point in the history
  • Loading branch information
chandumlg committed Jan 31, 2024
1 parent 5d78ab6 commit f6396f9
Showing 1 changed file with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,18 @@ func (b *MarketoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatus
HasFailed: true,
}
}
bodyBytes, transformerConnectionStatus := misc.HTTPCallWithRetryWithTimeout(b.transformUrl+b.pollUrl, payload, b.timeout)

pollURL, err := url.JoinPath(b.transformUrl, b.pollUrl)
if err != nil {
b.logger.Errorf("Error in preparing poll url: %v", err)
return common.PollStatusResponse{
StatusCode: 500,
HasFailed: true,
Error: err.Error(),
}
}

bodyBytes, transformerConnectionStatus := misc.HTTPCallWithRetryWithTimeout(pollURL, payload, b.timeout)
if transformerConnectionStatus != 200 {
return common.PollStatusResponse{
StatusCode: transformerConnectionStatus,
Expand Down Expand Up @@ -218,27 +229,24 @@ func extractJobStats(keyMap map[string]interface{}, importingJobIDs []int64, sta

func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
destination := asyncDestStruct.Destination
resolveURL := func(base, relative string) string {
baseURL, err := url.Parse(base)
if err != nil {
b.logger.Error("Error in Parsing Base URL: %w", err)
}
relURL, err := url.Parse(relative)
if err != nil {
b.logger.Error("Error in Parsing Relative URL: %w", err)
}
destURL := baseURL.ResolveReference(relURL).String()
return destURL
}
destinationID := destination.ID
destinationUploadUrl := asyncDestStruct.DestinationUploadURL
url := resolveURL(b.transformUrl, destinationUploadUrl)
filePath := asyncDestStruct.FileName
destConfig := destination.Config
destType := destination.DestinationDefinition.Name
failedJobIDs := asyncDestStruct.FailedJobIDs
importingJobIDs := asyncDestStruct.ImportingJobIDs

destinationUploadUrl := asyncDestStruct.DestinationUploadURL
uploadURL, err := url.JoinPath(b.transformUrl, destinationUploadUrl)
if err != nil {
return common.AsyncUploadOutput{
FailedJobIDs: append(failedJobIDs, importingJobIDs...),
FailedReason: "BRT: Failed to prepare upload url " + err.Error(),
FailedCount: len(failedJobIDs) + len(importingJobIDs),
DestinationID: destinationID,
}
}

file, err := os.Open(filePath)
if err != nil {
panic("BRT: Read File Failed" + err.Error())
Expand Down Expand Up @@ -287,7 +295,7 @@ func (b *MarketoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStr
startTime := time.Now()
payloadSizeStat.Observe(float64(len(payload)))
b.logger.Debugf("[Async Destination Manager] File Upload Started for Dest Type %v", destType)
responseBody, statusCodeHTTP := misc.HTTPCallWithRetryWithTimeout(url, payload, config.GetDuration("HttpClient.marketoBulkUpload.timeout", 10, time.Minute))
responseBody, statusCodeHTTP := misc.HTTPCallWithRetryWithTimeout(uploadURL, payload, config.GetDuration("HttpClient.marketoBulkUpload.timeout", 10, time.Minute))
b.logger.Debugf("[Async Destination Manager] File Upload Finished for Dest Type %v", destType)
uploadTimeStat.Since(startTime)
var bodyBytes []byte
Expand Down

0 comments on commit f6396f9

Please sign in to comment.