-
Notifications
You must be signed in to change notification settings - Fork 311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: onboard klaviyo bulk upload destination #4682
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe recent changes introduce comprehensive support for bulk uploads to Klaviyo, including creating payloads, chunking data, polling job statuses, extracting profiles, uploading data, and transforming jobs. The changes span across multiple files, adding new interfaces, structs, and methods, as well as corresponding mock implementations and tests. Additionally, the new functionality is integrated into the existing batch router framework. Changes
Sequence Diagram(s) (Beta)sequenceDiagram
participant Client
participant Manager
participant KlaviyoBulkUploader
participant KlaviyoAPI
Client->>Manager: newRegularManager("KLAVIYO_BULK_UPLOAD")
Manager->>KlaviyoBulkUploader: NewManager(destination)
KlaviyoBulkUploader-->>Manager: KlaviyoBulkUploader instance
Client->>KlaviyoBulkUploader: Upload(asyncDestStruct)
KlaviyoBulkUploader->>KlaviyoAPI: Upload data
KlaviyoAPI-->>KlaviyoBulkUploader: Upload response
KlaviyoBulkUploader-->>Client: AsyncUploadOutput
Client->>KlaviyoBulkUploader: Poll(pollInput)
KlaviyoBulkUploader->>KlaviyoAPI: Poll job status
KlaviyoAPI-->>KlaviyoBulkUploader: PollStatusResponse
KlaviyoBulkUploader-->>Client: PollStatusResponse
Client->>KlaviyoBulkUploader: GetUploadStats(UploadStatsInput)
KlaviyoBulkUploader->>KlaviyoAPI: Get upload stats
KlaviyoAPI-->>KlaviyoBulkUploader: UploadStatsResponse
KlaviyoBulkUploader-->>Client: UploadStatsResponse
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4682 +/- ##
==========================================
- Coverage 73.31% 72.94% -0.37%
==========================================
Files 416 417 +1
Lines 48763 49041 +278
==========================================
+ Hits 35749 35772 +23
- Misses 10678 10931 +253
- Partials 2336 2338 +2 ☔ View full report in Codecov by Sentry. |
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
Outdated
Show resolved
Hide resolved
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
Outdated
Show resolved
Hide resolved
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
Show resolved
Hide resolved
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
Show resolved
Hide resolved
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
Outdated
Show resolved
Hide resolved
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (8)
- mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go (1 hunks)
- router/batchrouter/asyncdestinationmanager/common/utils.go (1 hunks)
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1 hunks)
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_suite_test.go (1 hunks)
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go (1 hunks)
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go (1 hunks)
- router/batchrouter/asyncdestinationmanager/manager.go (2 hunks)
- utils/misc/misc.go (1 hunks)
Files skipped from review due to trivial changes (2)
- mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_suite_test.go
Additional context used
golangci-lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go
163-163: Error return value of
json.Unmarshal
is not checked (errcheck)
GitHub Check: lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go
[failure] 163-163:
Error return value ofjson.Unmarshal
is not checked (errcheck)router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
[failure] 28-28:
SEPERATOR
is a misspelling ofSEPARATOR
(misspell)
[failure] 103-103:
SEPERATOR
is a misspelling ofSEPARATOR
(misspell)
[failure] 176-176:
SEPERATOR
is a misspelling ofSEPARATOR
(misspell)
[failure] 184-184:
SEPERATOR
is a misspelling ofSEPARATOR
(misspell)
[failure] 331-331:
seperated
is a misspelling ofseparated
(misspell)
[failure] 377-377:
SEPERATOR
is a misspelling ofSEPARATOR
(misspell)
Additional comments not posted (7)
router/batchrouter/asyncdestinationmanager/common/utils.go (1)
6-6
: The addition of "KLAVIYO_BULK_UPLOAD" to theasyncDestinations
slice correctly registers Klaviyo as a new asynchronous destination.router/batchrouter/asyncdestinationmanager/manager.go (1)
11-11
: The import of the Klaviyo bulk upload package and the addition of the "KLAVIYO_BULK_UPLOAD" case in thenewRegularManager
function are correctly implemented to support the new destination type.Also applies to: 29-30
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go (2)
10-36
: The interfaces defined forUploader
,HttpClient
,Poller
,ProfileExtractor
, andUploadStats
are well-structured and essential for the operations of the Klaviyo bulk uploader.
38-158
: The data structuresUploadResp
,PollResp
,UploadStatusResp
,Payload
,Data
,Attributes
,Profiles
,Relationships
,Lists
,List
,Input
, andProfile
are correctly defined and align with the requirements for handling Klaviyo bulk upload data.router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go (1)
32-37
: The test functionsTestNewManagerSuccess
,TestUpload
,TestFileReadSuccess
,TestPoll
,TestGetUploadStats
, andTestExtractProfileValidInput
are comprehensive and correctly validate the functionalities of the Klaviyo bulk uploader.Also applies to: 39-58, 60-94, 96-121, 123-156, 158-169
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1)
61-67
: The functionscreateFinalPayload
,NewManager
,chunkBySizeAndElements
,Poll
,GetUploadStats
,generateKlaviyoErrorOutput
,ExtractProfile
,Upload
, andTransform
are well-implemented and crucial for the functionality of the Klaviyo bulk uploader.Also applies to: 69-97, 99-178, 180-254, 256-268, 270-293, 295-390, 392-394
utils/misc/misc.go (1)
102-102
: The addition of"KLAVIYO_BULK_UPLOAD"
to thebatchDestinations
slice correctly implements the required support for the new Klaviyo bulk upload functionality.
|
||
inputPayloadJSON := `{"message":{"body":{"FORM":{},"JSON":{"data":{"attributes":{"profiles":{"data":[{"attributes":{"anonymous_id":111222334,"email":"qwe122@mail.com","first_name":"Testqwe0122","jobIdentifier":"111222334:1","last_name":"user0122","location":{"city":"delhi","country":"India","ip":"213.5.6.41"},"phone_number":"+919912000123"},"id":111222334,"type":"profile"}]}},"relationships":{"lists":{"data":[{"id":"UKth4J","type":"list"}]}},"type":"profile-bulk-import-job"}},"JSON_ARRAY":{},"XML":{}},"endpoint":"","files":{},"headers":{},"method":"POST","params":{},"type":"REST","userId":"","version":"1"},"metadata":{"job_id":1}}` | ||
var inputPayload klaviyobulkupload.Input | ||
json.Unmarshal([]byte(inputPayloadJSON), &inputPayload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle the error returned by json.Unmarshal
to ensure robust error handling in tests.
- json.Unmarshal([]byte(inputPayloadJSON), &inputPayload)
+ err := json.Unmarshal([]byte(inputPayloadJSON), &inputPayload)
+ if err != nil {
+ t.Errorf("json.Unmarshal failed: %v", err)
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
json.Unmarshal([]byte(inputPayloadJSON), &inputPayload) | |
json.Unmarshal([]byte(inputPayloadJSON), &inputPayload) | |
err := json.Unmarshal([]byte(inputPayloadJSON), &inputPayload) | |
if err != nil { | |
t.Errorf("json.Unmarshal failed: %v", err) | |
} |
Tools
golangci-lint
163-163: Error return value of
json.Unmarshal
is not checked (errcheck)
GitHub Check: lint
[failure] 163-163:
Error return value ofjson.Unmarshal
is not checked (errcheck)
const ( | ||
KlaviyoAPIURL = "https://a.klaviyo.com/api/profile-bulk-import-jobs/" | ||
BATCHSIZE = 25 | ||
MAXPAYLOADSIZE = 5000000 | ||
IMPORT_ID_SEPERATOR = ":" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct the spelling of "SEPARATOR" in the constant names to maintain professionalism in the codebase.
- IMPORT_ID_SEPERATOR = ":"
+ IMPORT_ID_SEPARATOR = ":"
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
const ( | |
KlaviyoAPIURL = "https://a.klaviyo.com/api/profile-bulk-import-jobs/" | |
BATCHSIZE = 25 | |
MAXPAYLOADSIZE = 5000000 | |
IMPORT_ID_SEPERATOR = ":" | |
) | |
const ( | |
KlaviyoAPIURL = "https://a.klaviyo.com/api/profile-bulk-import-jobs/" | |
BATCHSIZE = 25 | |
MAXPAYLOADSIZE = 5000000 | |
IMPORT_ID_SEPARATOR = ":" | |
) |
Tools
GitHub Check: lint
[failure] 28-28:
SEPERATOR
is a misspelling ofSEPARATOR
(misspell)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1 hunks)
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go
Additional context used
golangci-lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
33-33: undefined: Profile (typecheck)
61-61: undefined: KlaviyoBulkUploader (typecheck)
69-69: undefined: Profile (typecheck)
99-99: undefined: KlaviyoBulkUploader (typecheck)
180-180: undefined: KlaviyoBulkUploader (typecheck)
256-256: undefined: KlaviyoBulkUploader (typecheck)
270-270: undefined: KlaviyoBulkUploader (typecheck)
295-295: undefined: KlaviyoBulkUploader (typecheck)
392-392: undefined: KlaviyoBulkUploader (typecheck)
34-34: undefined: Payload (typecheck)
35-35: undefined: Data (typecheck)
37-37: undefined: Attributes (typecheck)
38-38: undefined: Profiles (typecheck)
46-46: undefined: Relationships (typecheck)
47-47: undefined: Lists (typecheck)
48-48: undefined: List (typecheck)
62-62: undefined: KlaviyoBulkUploader (typecheck)
70-70: undefined: Profile (typecheck)
71-71: undefined: Profile (typecheck)
141-141: undefined: PollResp (typecheck)
218-218: undefined: UploadStatusResp (typecheck)
277-277: undefined: Profile (typecheck)
312-312: undefined: Profile (typecheck)
315-315: undefined: Input (typecheck)
322-322: undefined: Profile (typecheck)
369-369: undefined: UploadResp (typecheck)
GitHub Check: lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
[failure] 331-331:
seperated
is a misspelling ofseparated
(misspell)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
Show resolved
Hide resolved
func (kbu *KlaviyoBulkUploader) Transform(job *jobsdb.JobT) (string, error) { | ||
return common.GetMarshalledData(string(job.EventPayload), job.JobID) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undefined type KlaviyoBulkUploader
.
The method Transform
uses KlaviyoBulkUploader
as a receiver type, which is undefined. Ensure this type is properly defined or imported.
Tools
golangci-lint
392-392: undefined: KlaviyoBulkUploader (typecheck)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KlaviyoBulkUploader is defined in types.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yashasvibajpai, thank you for pointing that out. I've verified that the KlaviyoBulkUploader
type is indeed defined in types.go
within the klaviyobulkupload
package. I appreciate your clarification.
If there's anything else you need, feel free to ask!
Analysis chain
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package
rg "type KlaviyoBulkUploader" --glob "router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go"
Length of output: 212
Learnings added
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394
Timestamp: 2024-06-04T21:54:20.163Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
Note: Learnings are effective only in the context of specific code segments. To apply general coding guidelines, please configure review instructions.
Learnings used
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67
Timestamp: 2024-06-04T21:50:56.257Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is defined in types.go. can you recheck?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yashasvibajpai, I've rechecked the types.go
file within the klaviyobulkupload
package and can confirm that the KlaviyoBulkUploader
type is indeed defined there as a struct. Here's the relevant line from the file:
type KlaviyoBulkUploader struct {
// struct fields...
}
Thank you for your patience. If there's anything else you need, feel free to ask!
Analysis chain
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package
rg "type KlaviyoBulkUploader" --glob "router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go"
Length of output: 212
Learnings used
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394
Timestamp: 2024-06-04T21:54:20.163Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67
Timestamp: 2024-06-04T21:50:56.257Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatusResponse { | ||
client := &http.Client{} | ||
destConfig := kbu.destinationConfig | ||
privateApiKey, _ := destConfig["privateApiKey"].(string) | ||
importIds := strings.Split(pollInput.ImportId, IMPORT_ID_SEPARATOR) | ||
importStatuses := make(map[string]string) | ||
failedImports := make([]string, 0) | ||
for _, importId := range importIds { | ||
importStatuses[importId] = "queued" | ||
} | ||
|
||
for { | ||
allComplete := true | ||
for importId, status := range importStatuses { | ||
if status != "complete" { | ||
allComplete = false | ||
pollUrl := KlaviyoAPIURL + importId | ||
req, err := http.NewRequest("GET", pollUrl, nil) | ||
if err != nil { | ||
return common.PollStatusResponse{ | ||
Complete: false, | ||
InProgress: false, | ||
HasFailed: true, | ||
Error: err.Error(), | ||
} | ||
} | ||
req.Header.Set("Content-Type", "application/json") | ||
req.Header.Set("Authorization", "Klaviyo-API-Key "+privateApiKey) | ||
req.Header.Set("revision", "2024-05-15") | ||
resp, err := client.Do(req) | ||
if err != nil { | ||
return common.PollStatusResponse{ | ||
Complete: false, | ||
InProgress: false, | ||
StatusCode: 0, | ||
HasFailed: true, | ||
Error: err.Error(), | ||
HasWarning: false, | ||
} | ||
} | ||
|
||
var pollBodyBytes []byte | ||
var pollresp PollResp | ||
pollBodyBytes, _ = io.ReadAll(resp.Body) | ||
defer func() { _ = resp.Body.Close() }() | ||
|
||
pollRespErr := json.Unmarshal(pollBodyBytes, &pollresp) | ||
if pollRespErr != nil { | ||
return common.PollStatusResponse{ | ||
Complete: false, | ||
InProgress: false, | ||
StatusCode: 0, | ||
HasFailed: true, | ||
Error: pollRespErr.Error(), | ||
HasWarning: false, | ||
} | ||
} | ||
// Update the status in the map | ||
importStatuses[importId] = pollresp.Data.Attributes.Status | ||
|
||
// If Failed_count > 0, add the importId to failedImports | ||
if pollresp.Data.Attributes.Failed_count > 0 { | ||
failedImports = append(failedImports, importId) | ||
} | ||
} | ||
} | ||
if allComplete { | ||
break | ||
} | ||
} | ||
|
||
return common.PollStatusResponse{ | ||
Complete: true, | ||
HasFailed: true, | ||
HasWarning: false, | ||
StatusCode: 200, | ||
InProgress: false, | ||
FailedJobURLs: strings.Join(failedImports, IMPORT_ID_SEPARATOR), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undefined type KlaviyoBulkUploader
.
The receiver type KlaviyoBulkUploader
for the method Poll
is undefined in this scope. Ensure this type is correctly defined or imported.
Tools
golangci-lint
99-99: undefined: KlaviyoBulkUploader (typecheck)
141-141: undefined: PollResp (typecheck)
func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput { | ||
startTime := time.Now() | ||
destination := asyncDestStruct.Destination | ||
filePath := asyncDestStruct.FileName | ||
importingJobIDs := asyncDestStruct.ImportingJobIDs | ||
destType := destination.DestinationDefinition.Name | ||
destinationID := destination.ID | ||
listId, _ := destination.Config["listId"].(string) | ||
statLabels := stats.Tags{ | ||
"module": "batch_router", | ||
"destType": destType, | ||
} | ||
file, err := os.Open(filePath) | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while opening file. ", err, asyncDestStruct.ImportingJobIDs, destinationID) | ||
} | ||
defer file.Close() | ||
var combinedProfiles []Profile | ||
scanner := bufio.NewScanner(file) | ||
for scanner.Scan() { | ||
var input Input | ||
line := scanner.Text() | ||
err := json.Unmarshal([]byte(line), &input) | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while parsing JSON.", err, importingJobIDs, destinationID) | ||
} | ||
profileStructure := kbu.ExtractProfile(input) | ||
if profileStructure == (Profile{}) { | ||
return kbu.generateKlaviyoErrorOutput("Error while extracting profile. No profile data passed", err, importingJobIDs, destinationID) | ||
} | ||
combinedProfiles = append(combinedProfiles, profileStructure) | ||
} | ||
|
||
chunks, _ := chunkBySizeAndElements(combinedProfiles, MAXPAYLOADSIZE, BATCHSIZE) | ||
eventsSuccessStat := stats.Default.NewTaggedStat("success_job_count", stats.CountType, statLabels) | ||
|
||
var importIds []string // DelimitedImportIds is : seperated importIds | ||
var DelimitedUploadRespErr string | ||
for _, chunk := range chunks { | ||
combinedPayload := createFinalPayload(chunk, listId) | ||
|
||
// Convert combined payload to JSON | ||
outputJSON, err := json.MarshalIndent(combinedPayload, "", " ") | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while marshaling combined JSON.", err, importingJobIDs, destinationID) | ||
} | ||
uploadURL := KlaviyoAPIURL | ||
client := &http.Client{} | ||
req, err := http.NewRequest("POST", uploadURL, bytes.NewBuffer(outputJSON)) | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while creating request.", err, importingJobIDs, destinationID) | ||
} | ||
|
||
req.Header.Set("Content-Type", "application/json") | ||
req.Header.Set("Authorization", "Klaviyo-API-Key "+destination.Config["privateApiKey"].(string)) | ||
req.Header.Set("revision", "2024-05-15") | ||
|
||
uploadTimeStat := stats.Default.NewTaggedStat("async_upload_time", stats.TimerType, statLabels) | ||
payloadSizeStat := stats.Default.NewTaggedStat("payload_size", stats.HistogramType, statLabels) | ||
payloadSizeStat.Observe(float64(len(outputJSON))) | ||
|
||
resp, err := client.Do(req) | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while sending request.", err, importingJobIDs, destinationID) | ||
} | ||
|
||
var bodyBytes []byte | ||
bodyBytes, _ = io.ReadAll(resp.Body) | ||
defer func() { _ = resp.Body.Close() }() | ||
uploadTimeStat.Since(startTime) | ||
|
||
if resp.StatusCode != 202 { | ||
return kbu.generateKlaviyoErrorOutput("Got non 202 as statusCode.", fmt.Errorf(string(bodyBytes)), importingJobIDs, destinationID) | ||
} | ||
var uploadresp UploadResp | ||
uploadRespErr := json.Unmarshal((bodyBytes), &uploadresp) | ||
if uploadRespErr != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while unmarshaling response.", uploadRespErr, importingJobIDs, destinationID) | ||
} | ||
importIds = append(importIds, uploadresp.Data.Id) | ||
} | ||
importParameters, err := json.Marshal(common.ImportParameters{ | ||
ImportId: strings.Join(importIds, IMPORT_ID_SEPARATOR), | ||
}) | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while marshaling parameters.", err, importingJobIDs, destinationID) | ||
} | ||
eventsSuccessStat.Count(len(asyncDestStruct.ImportingJobIDs)) | ||
|
||
return common.AsyncUploadOutput{ | ||
ImportingParameters: importParameters, | ||
ImportingJobIDs: importingJobIDs, | ||
SuccessResponse: DelimitedUploadRespErr, | ||
DestinationID: destination.ID, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undefined type KlaviyoBulkUploader
.
The method Upload
uses KlaviyoBulkUploader
as a receiver type, which is not defined in this file or imported. This will cause a compilation error.
Tools
golangci-lint
295-295: undefined: KlaviyoBulkUploader (typecheck)
312-312: undefined: Profile (typecheck)
315-315: undefined: Input (typecheck)
322-322: undefined: Profile (typecheck)
369-369: undefined: UploadResp (typecheck)
GitHub Check: lint
[failure] 331-331:
seperated
is a misspelling ofseparated
(misspell)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1 hunks)
Additional context used
Learnings (1)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (2)
User: yashasvibajpai PR: rudderlabs/rudder-server#4682 File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394 Timestamp: 2024-06-04T21:54:20.163Z Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
User: yashasvibajpai PR: rudderlabs/rudder-server#4682 File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67 Timestamp: 2024-06-04T21:50:56.257Z Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
golangci-lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
33-33: undefined: Profile (typecheck)
61-61: undefined: KlaviyoBulkUploader (typecheck)
69-69: undefined: Profile (typecheck)
99-99: undefined: KlaviyoBulkUploader (typecheck)
180-180: undefined: KlaviyoBulkUploader (typecheck)
256-256: undefined: KlaviyoBulkUploader (typecheck)
270-270: undefined: KlaviyoBulkUploader (typecheck)
295-295: undefined: KlaviyoBulkUploader (typecheck)
398-398: undefined: KlaviyoBulkUploader (typecheck)
34-34: undefined: Payload (typecheck)
35-35: undefined: Data (typecheck)
37-37: undefined: Attributes (typecheck)
38-38: undefined: Profiles (typecheck)
46-46: undefined: Relationships (typecheck)
47-47: undefined: Lists (typecheck)
48-48: undefined: List (typecheck)
62-62: undefined: KlaviyoBulkUploader (typecheck)
70-70: undefined: Profile (typecheck)
71-71: undefined: Profile (typecheck)
141-141: undefined: PollResp (typecheck)
218-218: undefined: UploadStatusResp (typecheck)
277-277: undefined: Profile (typecheck)
312-312: undefined: Profile (typecheck)
315-315: undefined: Input (typecheck)
322-322: undefined: Profile (typecheck)
374-374: undefined: UploadResp (typecheck)
GitHub Check: lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
[failure] 335-335:
seperated
is a misspelling ofseparated
(misspell)
Additional comments not posted (7)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (7)
61-67
: EnsureKlaviyoBulkUploader
is defined.As previously noted, ensure the
KlaviyoBulkUploader
type is defined or imported to avoid compilation issues.Tools
golangci-lint
61-61: undefined: KlaviyoBulkUploader (typecheck)
62-62: undefined: KlaviyoBulkUploader (typecheck)
99-178
: Verify the definition ofKlaviyoBulkUploader
andPollResp
.As previously noted, ensure the
KlaviyoBulkUploader
andPollResp
types are defined or imported to avoid compilation issues.Tools
golangci-lint
99-99: undefined: KlaviyoBulkUploader (typecheck)
141-141: undefined: PollResp (typecheck)
180-254
: Check for undefinedKlaviyoBulkUploader
andUploadStatusResp
.As previously noted, ensure the
KlaviyoBulkUploader
andUploadStatusResp
types are defined or imported to avoid compilation issues.Tools
golangci-lint
180-180: undefined: KlaviyoBulkUploader (typecheck)
218-218: undefined: UploadStatusResp (typecheck)
256-268
: Confirm the definition ofKlaviyoBulkUploader
.As previously noted, ensure the
KlaviyoBulkUploader
type is defined or imported to avoid compilation issues.Tools
golangci-lint
256-256: undefined: KlaviyoBulkUploader (typecheck)
295-396
: Review the use ofKlaviyoBulkUploader
,Profile
,Input
, andUploadResp
.As previously noted, ensure the
KlaviyoBulkUploader
,Profile
,Input
, andUploadResp
types are defined or imported to avoid compilation issues.Tools
golangci-lint
295-295: undefined: KlaviyoBulkUploader (typecheck)
312-312: undefined: Profile (typecheck)
315-315: undefined: Input (typecheck)
322-322: undefined: Profile (typecheck)
374-374: undefined: UploadResp (typecheck)
GitHub Check: lint
[failure] 335-335:
seperated
is a misspelling ofseparated
(misspell)
398-400
: EnsureKlaviyoBulkUploader
is defined forTransform
.As previously noted, ensure the
KlaviyoBulkUploader
type is defined or imported to avoid compilation issues.Tools
golangci-lint
398-398: undefined: KlaviyoBulkUploader (typecheck)
335-335
: Correct spelling mistake in comment.- var importIds []string // DelimitedImportIds is : seperated importIds + var importIds []string // DelimitedImportIds is : separated importIdsLikely invalid or redundant comment.
Tools
GitHub Check: lint
[failure] 335-335:
seperated
is a misspelling ofseparated
(misspell)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1 hunks)
Additional context used
Learnings (1)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (2)
User: yashasvibajpai PR: rudderlabs/rudder-server#4682 File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394 Timestamp: 2024-06-04T21:54:20.163Z Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
User: yashasvibajpai PR: rudderlabs/rudder-server#4682 File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67 Timestamp: 2024-06-04T21:50:56.257Z Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
golangci-lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
33-33: undefined: Profile (typecheck)
61-61: undefined: KlaviyoBulkUploader (typecheck)
69-69: undefined: Profile (typecheck)
99-99: undefined: KlaviyoBulkUploader (typecheck)
189-189: undefined: KlaviyoBulkUploader (typecheck)
265-265: undefined: KlaviyoBulkUploader (typecheck)
279-279: undefined: KlaviyoBulkUploader (typecheck)
304-304: undefined: KlaviyoBulkUploader (typecheck)
416-416: undefined: KlaviyoBulkUploader (typecheck)
34-34: undefined: Payload (typecheck)
35-35: undefined: Data (typecheck)
37-37: undefined: Attributes (typecheck)
38-38: undefined: Profiles (typecheck)
46-46: undefined: Relationships (typecheck)
47-47: undefined: Lists (typecheck)
48-48: undefined: List (typecheck)
62-62: undefined: KlaviyoBulkUploader (typecheck)
70-70: undefined: Profile (typecheck)
71-71: undefined: Profile (typecheck)
141-141: undefined: PollResp (typecheck)
227-227: undefined: UploadStatusResp (typecheck)
286-286: undefined: Profile (typecheck)
323-323: undefined: Profile (typecheck)
326-326: undefined: Input (typecheck)
333-333: undefined: Profile (typecheck)
387-387: undefined: UploadResp (typecheck)
GitHub Check: lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
[failure] 346-346:
seperated
is a misspelling ofseparated
(misspell)
Additional comments not posted (7)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (7)
346-346
: Correct spelling mistake.- var importIds []string // DelimitedImportIds is : seperated importIds + var importIds []string // DelimitedImportIds is : separated importIdsLikely invalid or redundant comment.
Tools
GitHub Check: lint
[failure] 346-346:
seperated
is a misspelling ofseparated
(misspell)
99-187
: Verify the definition ofKlaviyoBulkUploader
.The method
Poll
usesKlaviyoBulkUploader
as a receiver type, which is undefined. Ensure this type is properly defined or imported.Verification successful
The
KlaviyoBulkUploader
type is indeed defined in thetypes.go
file within theklaviyobulkupload
package. Here is the relevant definition:type KlaviyoBulkUploader struct { destName string destinationConfig map[string]interface{} logger logger.Logger Client *http.Client jobIdToIdentifierMap map[string]int64 }This confirms that the
KlaviyoBulkUploader
type is properly defined and thePoll
method can use it as a receiver.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package ast-grep --lang go --pattern $'type KlaviyoBulkUploader struct { $$$ }'Length of output: 817
Tools
golangci-lint
99-99: undefined: KlaviyoBulkUploader (typecheck)
141-141: undefined: PollResp (typecheck)
304-414
: EnsureKlaviyoBulkUploader
is defined forUpload
.The method
Upload
usesKlaviyoBulkUploader
as a receiver type, which is not defined in this file or imported. This will cause a compilation error.Tools
golangci-lint
304-304: undefined: KlaviyoBulkUploader (typecheck)
323-323: undefined: Profile (typecheck)
326-326: undefined: Input (typecheck)
333-333: undefined: Profile (typecheck)
387-387: undefined: UploadResp (typecheck)
GitHub Check: lint
[failure] 346-346:
seperated
is a misspelling ofseparated
(misspell)
189-263
: EnsureKlaviyoBulkUploader
is defined forGetUploadStats
.The receiver type
KlaviyoBulkUploader
for the methodGetUploadStats
is undefined. This needs to be defined or imported to avoid compilation issues.Verification successful
The
KlaviyoBulkUploader
type is correctly defined intypes.go
within theklaviyobulkupload
package and is used as a receiver in theGetUploadStats
method.
types.go
:
type KlaviyoBulkUploader struct { ... }
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package ast-grep --lang go --pattern $'type KlaviyoBulkUploader struct { $$$ }'Length of output: 817
Tools
golangci-lint
189-189: undefined: KlaviyoBulkUploader (typecheck)
227-227: undefined: UploadStatusResp (typecheck)
61-67
: EnsureKlaviyoBulkUploader
is defined.The type
KlaviyoBulkUploader
is used but not defined or imported. This will cause a compilation error. Ensure that this type is properly defined or imported.Verification successful
The
KlaviyoBulkUploader
type is properly defined intypes.go
within theklaviyobulkupload
package. The fieldsdestName
,destinationConfig
, andlogger
match the usage in theNewManager
function.
destName
is defined asstring
.destinationConfig
is defined asmap[string]interface{}
.logger
is defined aslogger.Logger
.This confirms that the type is correctly defined and used.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package ast-grep --lang go --pattern $'type KlaviyoBulkUploader struct { $$$ }'Length of output: 817
Tools
golangci-lint
61-61: undefined: KlaviyoBulkUploader (typecheck)
62-62: undefined: KlaviyoBulkUploader (typecheck)
265-277
: Verify the definition ofKlaviyobulkUploader
forgenerateKlaviyoErrorOutput
.The method
generateKlaviyoErrorOutput
usesKlaviyoBulkUploader
as a receiver type, which is not defined in this file or imported. This will cause a compilation error.Verification successful
The
KlaviyoBulkUploader
type is defined intypes.go
within theklaviyobulkupload
package and is used as a receiver in thegenerateKlaviyoErrorOutput
method. There is no compilation error related to the definition ofKlaviyoBulkUploader
.
KlaviyoBulkUploader
is defined inrouter/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go
.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package ast-grep --lang go --pattern $'type KlaviyoBulkUploader struct { $$$ }'Length of output: 817
Tools
golangci-lint
265-265: undefined: KlaviyoBulkUploader (typecheck)
416-418
: Verify the definition ofKlaviyoBulkUploader
forTransform
.The method
Transform
usesKlaviyoBulkUploader
as a receiver type, which is undefined. Ensure this type is properly defined or imported.Tools
golangci-lint
416-416: undefined: KlaviyoBulkUploader (typecheck)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1 hunks)
Additional context used
Learnings (1)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (2)
User: yashasvibajpai PR: rudderlabs/rudder-server#4682 File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394 Timestamp: 2024-06-04T21:54:20.163Z Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
User: yashasvibajpai PR: rudderlabs/rudder-server#4682 File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67 Timestamp: 2024-06-04T21:50:56.257Z Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
golangci-lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
33-33: undefined: Profile (typecheck)
61-61: undefined: KlaviyoBulkUploader (typecheck)
69-69: undefined: Profile (typecheck)
99-99: undefined: KlaviyoBulkUploader (typecheck)
189-189: undefined: KlaviyoBulkUploader (typecheck)
265-265: undefined: KlaviyoBulkUploader (typecheck)
279-279: undefined: KlaviyoBulkUploader (typecheck)
304-304: undefined: KlaviyoBulkUploader (typecheck)
412-412: undefined: KlaviyoBulkUploader (typecheck)
34-34: undefined: Payload (typecheck)
35-35: undefined: Data (typecheck)
37-37: undefined: Attributes (typecheck)
38-38: undefined: Profiles (typecheck)
46-46: undefined: Relationships (typecheck)
47-47: undefined: Lists (typecheck)
48-48: undefined: List (typecheck)
62-62: undefined: KlaviyoBulkUploader (typecheck)
70-70: undefined: Profile (typecheck)
71-71: undefined: Profile (typecheck)
141-141: undefined: PollResp (typecheck)
227-227: undefined: UploadStatusResp (typecheck)
286-286: undefined: Profile (typecheck)
323-323: undefined: Profile (typecheck)
326-326: undefined: Input (typecheck)
333-333: undefined: Profile (typecheck)
384-384: undefined: UploadResp (typecheck)
GitHub Check: lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go
[failure] 343-343:
seperated
is a misspelling ofseparated
(misspell)
Additional comments not posted (2)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (2)
61-67
: EnsureKlaviyoBulkUploader
is defined.The type
KlaviyoBulkUploader
is used but not defined or imported. This will cause a compilation error. Ensure that this type is properly defined or imported.Skipped due to learnings
User: yashasvibajpai PR: rudderlabs/rudder-server#4682 File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67 Timestamp: 2024-06-04T21:50:56.257Z Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
User: yashasvibajpai PR: rudderlabs/rudder-server#4682 File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394 Timestamp: 2024-06-04T21:54:20.163Z Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
Tools
golangci-lint
61-61: undefined: KlaviyoBulkUploader (typecheck)
62-62: undefined: KlaviyoBulkUploader (typecheck)
412-414
: Undefined typeKlaviyoBulkUploader
inTransform
.The method
Transform
usesKlaviyoBulkUploader
as a receiver type, which is undefined. Ensure this type is properly defined or imported.Skipped due to learnings
User: yashasvibajpai PR: rudderlabs/rudder-server#4682 File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394 Timestamp: 2024-06-04T21:54:20.163Z Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
User: yashasvibajpai PR: rudderlabs/rudder-server#4682 File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67 Timestamp: 2024-06-04T21:50:56.257Z Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
Tools
golangci-lint
412-412: undefined: KlaviyoBulkUploader (typecheck)
func createFinalPayload(combinedProfiles []Profile, listId string) Payload { | ||
payload := Payload{ | ||
Data: Data{ | ||
Type: "profile-bulk-import-job", | ||
Attributes: Attributes{ | ||
Profiles: Profiles{ | ||
Data: combinedProfiles, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
if listId != "" { | ||
payload.Data.Relationships = &Relationships{ | ||
Lists: Lists{ | ||
Data: []List{ | ||
{ | ||
Type: "list", | ||
ID: listId, | ||
}, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
return payload | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address undefined types in createFinalPayload
.
The types Profile
, Payload
, Data
, Attributes
, Profiles
, Relationships
, Lists
, and List
are used but not defined or imported in this file. This will cause compilation errors. Ensure these types are correctly defined or imported.
Tools
golangci-lint
33-33: undefined: Profile (typecheck)
34-34: undefined: Payload (typecheck)
35-35: undefined: Data (typecheck)
37-37: undefined: Attributes (typecheck)
38-38: undefined: Profiles (typecheck)
46-46: undefined: Relationships (typecheck)
47-47: undefined: Lists (typecheck)
48-48: undefined: List (typecheck)
func chunkBySizeAndElements(combinedProfiles []Profile, maxBytes, maxElements int) ([][]Profile, error) { | ||
var chunks [][]Profile | ||
var chunk []Profile | ||
var chunkSize int | ||
|
||
for _, profile := range combinedProfiles { | ||
profileJSON, err := json.Marshal(profile) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to marshal profile: %w", err) | ||
} | ||
|
||
profileSize := len(profileJSON) | ||
|
||
if (chunkSize+profileSize > maxBytes || len(chunk) == maxElements) && len(chunk) > 0 { | ||
chunks = append(chunks, chunk) | ||
chunk = nil | ||
chunkSize = 0 | ||
} | ||
|
||
chunk = append(chunk, profile) | ||
chunkSize += profileSize | ||
} | ||
|
||
if len(chunk) > 0 { | ||
chunks = append(chunks, chunk) | ||
} | ||
|
||
return chunks, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle undefined type Profile
in chunkBySizeAndElements
.
The type Profile
is used but not defined or imported in this file. This will lead to compilation errors. Ensure this type is correctly defined or imported.
Tools
golangci-lint
69-69: undefined: Profile (typecheck)
70-70: undefined: Profile (typecheck)
71-71: undefined: Profile (typecheck)
func (kbu *KlaviyoBulkUploader) ExtractProfile(input Input) Profile { | ||
Message := input.Message | ||
Body := Message.Body | ||
Json := Body.JSON | ||
Data := Json.Data | ||
Attributes := Data.Attributes | ||
if len(Attributes.Profiles.Data) == 0 { | ||
return Profile{} | ||
} | ||
profileObject := Attributes.Profiles.Data[0] | ||
|
||
jobIdentifier := profileObject.Attributes.JobIdentifier | ||
jobIdentifierArray := strings.Split(jobIdentifier, ":") | ||
jobIdentifierValue, _ := strconv.ParseInt(jobIdentifierArray[1], 10, 64) | ||
if kbu.jobIdToIdentifierMap == nil { | ||
kbu.jobIdToIdentifierMap = make(map[string]int64) | ||
} | ||
kbu.jobIdToIdentifierMap[jobIdentifierArray[0]] = jobIdentifierValue | ||
|
||
// delete jobIdentifier from the attributes map as it is not required in the final payload | ||
profileObject.Attributes.JobIdentifier = "" | ||
|
||
return profileObject | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address undefined types Profile
and Input
in ExtractProfile
.
The types Profile
and Input
are used but not defined or imported in this file. This will lead to compilation errors. Ensure these types are correctly defined or imported.
Tools
golangci-lint
279-279: undefined: KlaviyoBulkUploader (typecheck)
286-286: undefined: Profile (typecheck)
destType := destination.DestinationDefinition.Name | ||
destinationID := destination.ID | ||
listId, _ := destination.Config["listId"].(string) | ||
statLabels := stats.Tags{ | ||
"module": "batch_router", | ||
"destType": destType, | ||
} | ||
file, err := os.Open(filePath) | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while opening file. ", err, asyncDestStruct.ImportingJobIDs, destinationID) | ||
} | ||
defer file.Close() | ||
var combinedProfiles []Profile | ||
scanner := bufio.NewScanner(file) | ||
for scanner.Scan() { | ||
var input Input | ||
line := scanner.Text() | ||
err := json.Unmarshal([]byte(line), &input) | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while parsing JSON.", err, importingJobIDs, destinationID) | ||
} | ||
profileStructure := kbu.ExtractProfile(input) | ||
if profileStructure == (Profile{}) { | ||
return kbu.generateKlaviyoErrorOutput("Error while extracting profile. No profile data passed", err, importingJobIDs, destinationID) | ||
} | ||
combinedProfiles = append(combinedProfiles, profileStructure) | ||
} | ||
|
||
chunks, _ := chunkBySizeAndElements(combinedProfiles, MAXPAYLOADSIZE, BATCHSIZE) | ||
|
||
eventsSuccessStat := stats.Default.NewTaggedStat("success_job_count", stats.CountType, statLabels) | ||
|
||
var importIds []string // DelimitedImportIds is : seperated importIds | ||
var DelimitedUploadRespErr string | ||
|
||
for idx, chunk := range chunks { | ||
combinedPayload := createFinalPayload(chunk, listId) | ||
|
||
// Convert combined payload to JSON | ||
outputJSON, err := json.MarshalIndent(combinedPayload, "", " ") | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while marshaling combined JSON.", err, importingJobIDs, destinationID) | ||
} | ||
uploadURL := KlaviyoAPIURL | ||
client := &http.Client{} | ||
req, err := http.NewRequest("POST", uploadURL, bytes.NewBuffer(outputJSON)) | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while creating request.", err, importingJobIDs, destinationID) | ||
} | ||
|
||
req.Header.Set("Content-Type", "application/json") | ||
req.Header.Set("Authorization", "Klaviyo-API-Key "+destination.Config["privateApiKey"].(string)) | ||
req.Header.Set("revision", "2024-05-15") | ||
|
||
uploadTimeStat := stats.Default.NewTaggedStat("async_upload_time", stats.TimerType, statLabels) | ||
payloadSizeStat := stats.Default.NewTaggedStat("payload_size", stats.HistogramType, statLabels) | ||
payloadSizeStat.Observe(float64(len(outputJSON))) | ||
|
||
resp, err := client.Do(req) | ||
if err != nil { | ||
failedJobs = append(failedJobs, importingJobIDs[idx]) | ||
kbu.logger.Error("Error while sending request.", err) | ||
} | ||
|
||
var bodyBytes []byte | ||
bodyBytes, _ = io.ReadAll(resp.Body) | ||
defer func() { _ = resp.Body.Close() }() | ||
uploadTimeStat.Since(startTime) | ||
|
||
if resp.StatusCode != 202 { | ||
failedJobs = append(failedJobs, importingJobIDs[idx]) | ||
kbu.logger.Error("Got non 202 as statusCode.", fmt.Errorf(string(bodyBytes))) | ||
} | ||
var uploadresp UploadResp | ||
uploadRespErr := json.Unmarshal((bodyBytes), &uploadresp) | ||
if uploadRespErr != nil { | ||
failedJobs = append(failedJobs, importingJobIDs[idx]) | ||
kbu.logger.Error("Error while unmarshaling response.", uploadRespErr) | ||
} | ||
importIds = append(importIds, uploadresp.Data.Id) | ||
} | ||
importParameters, err := json.Marshal(common.ImportParameters{ | ||
ImportId: strings.Join(importIds, IMPORT_ID_SEPARATOR), | ||
}) | ||
if err != nil { | ||
return kbu.generateKlaviyoErrorOutput("Error while marshaling parameters.", err, importingJobIDs, destinationID) | ||
} | ||
successJobs, _ = lo.Difference(importingJobIDs, failedJobs) | ||
eventsSuccessStat.Count(len(asyncDestStruct.ImportingJobIDs)) | ||
|
||
return common.AsyncUploadOutput{ | ||
ImportingParameters: importParameters, | ||
SucceededJobIDs: successJobs, | ||
FailedJobIDs: failedJobs, | ||
FailedCount: len(failedJobs), | ||
ImportingJobIDs: successJobs, | ||
SuccessResponse: DelimitedUploadRespErr, | ||
DestinationID: destination.ID, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undefined type KlaviyoBulkUploader
in Upload
.
The method Upload
uses KlaviyoBulkUploader
as a receiver type, which is not defined in this file or imported. This will cause a compilation error.
Tools
golangci-lint
304-304: undefined: KlaviyoBulkUploader (typecheck)
323-323: undefined: Profile (typecheck)
326-326: undefined: Input (typecheck)
333-333: undefined: Profile (typecheck)
384-384: undefined: UploadResp (typecheck)
GitHub Check: lint
[failure] 343-343:
seperated
is a misspelling ofseparated
(misspell)
func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatusResponse { | ||
client := &http.Client{} | ||
destConfig := kbu.destinationConfig | ||
privateApiKey, _ := destConfig["privateApiKey"].(string) | ||
importIds := strings.Split(pollInput.ImportId, IMPORT_ID_SEPARATOR) | ||
importStatuses := make(map[string]string) | ||
failedImports := make([]string, 0) | ||
for _, importId := range importIds { | ||
importStatuses[importId] = "queued" | ||
} | ||
|
||
for { | ||
allComplete := true | ||
for importId, status := range importStatuses { | ||
if status != "complete" { | ||
allComplete = false | ||
pollUrl := KlaviyoAPIURL + importId | ||
req, err := http.NewRequest("GET", pollUrl, nil) | ||
if err != nil { | ||
return common.PollStatusResponse{ | ||
Complete: true, | ||
InProgress: false, | ||
HasFailed: true, | ||
Error: err.Error(), | ||
} | ||
} | ||
req.Header.Set("Content-Type", "application/json") | ||
req.Header.Set("Authorization", "Klaviyo-API-Key "+privateApiKey) | ||
req.Header.Set("revision", "2024-05-15") | ||
resp, err := client.Do(req) | ||
if err != nil { | ||
return common.PollStatusResponse{ | ||
Complete: true, | ||
InProgress: false, | ||
StatusCode: 0, | ||
HasFailed: true, | ||
Error: err.Error(), | ||
HasWarning: false, | ||
} | ||
} | ||
|
||
var pollBodyBytes []byte | ||
var pollresp PollResp | ||
pollBodyBytes, _ = io.ReadAll(resp.Body) | ||
defer func() { _ = resp.Body.Close() }() | ||
|
||
pollRespErr := json.Unmarshal(pollBodyBytes, &pollresp) | ||
if pollRespErr != nil { | ||
return common.PollStatusResponse{ | ||
Complete: true, | ||
InProgress: false, | ||
StatusCode: 0, | ||
HasFailed: true, | ||
Error: pollRespErr.Error(), | ||
HasWarning: false, | ||
} | ||
} | ||
// Update the status in the map | ||
importStatuses[importId] = pollresp.Data.Attributes.Status | ||
|
||
// If Failed_count > 0, add the importId to failedImports | ||
if pollresp.Data.Attributes.Failed_count > 0 { | ||
failedImports = append(failedImports, importId) | ||
} | ||
} | ||
} | ||
if allComplete { | ||
break | ||
} | ||
} | ||
if len(failedImports) == 0 { | ||
return common.PollStatusResponse{ | ||
Complete: true, | ||
HasFailed: false, | ||
HasWarning: false, | ||
StatusCode: 200, | ||
InProgress: false, | ||
FailedJobURLs: "", | ||
} | ||
} | ||
return common.PollStatusResponse{ | ||
Complete: true, | ||
HasFailed: true, | ||
HasWarning: false, | ||
StatusCode: 200, | ||
InProgress: false, | ||
FailedJobURLs: strings.Join(failedImports, IMPORT_ID_SEPARATOR), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verify the use of KlaviyoBulkUploader
in Poll
.
The method Poll
uses KlaviyoBulkUploader
as a receiver type, which is undefined. Ensure this type is properly defined or imported.
Tools
golangci-lint
99-99: undefined: KlaviyoBulkUploader (typecheck)
141-141: undefined: PollResp (typecheck)
func (kbu *KlaviyoBulkUploader) generateKlaviyoErrorOutput(errorString string, err error, importingJobIds []int64, destinationID string) common.AsyncUploadOutput { | ||
eventsAbortedStat := stats.Default.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{ | ||
"module": "batch_router", | ||
"destType": "KLAVIYO_BULK_UPLOAD", | ||
}) | ||
eventsAbortedStat.Count(len(importingJobIds)) | ||
return common.AsyncUploadOutput{ | ||
AbortCount: len(importingJobIds), | ||
DestinationID: destinationID, | ||
AbortJobIDs: importingJobIds, | ||
AbortReason: fmt.Sprintf("%s %v", errorString, err.Error()), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undefined type KlaviyoBulkUploader
in generateKlaviyoErrorOutput
.
The method generateKlaviyoErrorOutput
uses KlaviyoBulkUploader
as a receiver type, which is not defined in this file or imported. This will cause a compilation error.
Tools
golangci-lint
265-265: undefined: KlaviyoBulkUploader (typecheck)
|
||
eventsSuccessStat := stats.Default.NewTaggedStat("success_job_count", stats.CountType, statLabels) | ||
|
||
var importIds []string // DelimitedImportIds is : seperated importIds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct spelling mistake.
- var importIds []string // DelimitedImportIds is : seperated importIds
+ var importIds []string // DelimitedImportIds is : separated importIds
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
var importIds []string // DelimitedImportIds is : seperated importIds | |
var importIds []string // DelimitedImportIds is : separated importIds |
Tools
GitHub Check: lint
[failure] 343-343:
seperated
is a misspelling ofseparated
(misspell)
func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUploadStatsInput) common.GetUploadStatsResponse { | ||
client := &http.Client{} | ||
destConfig := kbu.destinationConfig | ||
privateApiKey, _ := destConfig["privateApiKey"].(string) | ||
pollResultImportIds := strings.Split(UploadStatsInput.FailedJobURLs, IMPORT_ID_SEPARATOR) | ||
|
||
// make a map of jobId to error reason | ||
jobIdToErrorMap := make(map[int64]string) | ||
|
||
importingList := UploadStatsInput.ImportingList | ||
jobIDs := []int64{} | ||
for _, job := range importingList { | ||
jobIDs = append(jobIDs, job.JobID) | ||
} | ||
|
||
ErrorMap := kbu.jobIdToIdentifierMap | ||
var successKeys []int64 | ||
|
||
var failedJobIds []int64 | ||
for _, pollResultImportId := range pollResultImportIds { | ||
importErrorUrl := KlaviyoAPIURL + pollResultImportId + "/import-errors" | ||
req, err := http.NewRequest("GET", importErrorUrl, nil) | ||
if err != nil { | ||
return common.GetUploadStatsResponse{} | ||
} | ||
req.Header.Set("Content-Type", "application/json") | ||
req.Header.Set("Authorization", | ||
"Klaviyo-API-Key "+privateApiKey) | ||
req.Header.Set("revision", "2024-05-15") | ||
resp, err := client.Do(req) | ||
if err != nil { | ||
return common.GetUploadStatsResponse{ | ||
StatusCode: 400, | ||
Error: err.Error(), | ||
} | ||
} | ||
|
||
var uploadStatsBodyBytes []byte | ||
var uploadStatsResp UploadStatusResp | ||
uploadStatsBodyBytes, _ = io.ReadAll(resp.Body) | ||
defer func() { _ = resp.Body.Close() }() | ||
|
||
uploadStatsBodyBytesErr := json.Unmarshal(uploadStatsBodyBytes, &uploadStatsResp) | ||
if uploadStatsBodyBytesErr != nil { | ||
return common.GetUploadStatsResponse{ | ||
StatusCode: 400, | ||
Error: uploadStatsBodyBytesErr.Error(), | ||
} | ||
} | ||
// Iterate over the Data array and get the jobId and error detail and store in jobIdToErrorMap | ||
for _, item := range uploadStatsResp.Data { | ||
orgPayload := item.Attributes.OriginalPayload | ||
var identifierId string | ||
if orgPayload.Id != "" { | ||
identifierId = orgPayload.Id | ||
} else { | ||
identifierId = orgPayload.AnonymousId | ||
} | ||
jobId := ErrorMap[identifierId] | ||
failedJobIds = append(failedJobIds, jobId) | ||
errorDetail := item.Attributes.Detail | ||
jobIdToErrorMap[jobId] = errorDetail | ||
} | ||
} | ||
successKeys, _ = lo.Difference(jobIDs, failedJobIds) | ||
return common.GetUploadStatsResponse{ | ||
StatusCode: 200, | ||
Error: "The import job failed", | ||
Metadata: common.EventStatMeta{ | ||
FailedKeys: failedJobIds, | ||
FailedReasons: jobIdToErrorMap, | ||
SucceededKeys: successKeys, | ||
}, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undefined type KlaviyoBulkUploader
in GetUploadStats
.
The receiver type KlaviyoBulkUploader
for the method GetUploadStats
is undefined. This needs to be defined or imported to avoid compilation issues.
Tools
golangci-lint
189-189: undefined: KlaviyoBulkUploader (typecheck)
227-227: undefined: UploadStatusResp (typecheck)
* chore: update flaky test for error reporting aggregation test * chore: use ElementsMatch func in require for assertion * chore: update the logic of forming errors property * chore: update for better readability --------- Co-authored-by: Sai Sankeerth <sanpj2292@github.com>
4155fa2
to
b124ce4
Compare
Description
Onboarding a new batch router bulk upload destination Klaviyo Bulk Upload
Linear Ticket
https://linear.app/rudderstack/issue/INT-1957/klaviyo-implement-bulk-upload-api
Resolves INT-2056
Security
Summary by CodeRabbit
New Features
Tests
Chores