Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: onboard klaviyo bulk upload destination #4682

Open
wants to merge 27 commits into
base: master
Choose a base branch
from

Conversation

yashasvibajpai
Copy link
Contributor

@yashasvibajpai yashasvibajpai commented May 15, 2024

Description

Onboarding a new batch router bulk upload destination Klaviyo Bulk Upload

Linear Ticket

https://linear.app/rudderstack/issue/INT-1957/klaviyo-implement-bulk-upload-api

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

Summary by CodeRabbit

  • New Features

    • Introduced functionality for handling bulk uploads to Klaviyo, including creating payloads, chunking data, polling job statuses, extracting profiles, uploading data, and transforming jobs.
  • Tests

    • Added comprehensive test suite for Klaviyo bulk upload functionality, covering uploading, polling, getting upload stats, and extracting profiles.
  • Chores

    • Updated batch destinations to include "KLAVIYO_BULK_UPLOAD".

@yashasvibajpai yashasvibajpai self-assigned this May 15, 2024
Copy link
Contributor

coderabbitai bot commented May 15, 2024

Walkthrough

The recent changes introduce comprehensive support for bulk uploads to Klaviyo, including creating payloads, chunking data, polling job statuses, extracting profiles, uploading data, and transforming jobs. The changes span across multiple files, adding new interfaces, structs, and methods, as well as corresponding mock implementations and tests. Additionally, the new functionality is integrated into the existing batch router framework.

Changes

File Path Summary
mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go Added GoMock mocks for Uploader, HttpClient, Poller, and UploadStats interfaces.
router/batchrouter/asyncdestinationmanager/common/utils.go Added "KLAVIYO_BULK_UPLOAD" to asyncDestinations slice.
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go Introduced Klaviyo bulk upload functionality: payload creation, data chunking, job polling, profile extraction, data uploading, and job transformation.
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_suite_test.go Added test suite setup for Klaviyo bulk upload functionality using Ginkgo and Gomega.
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go Added tests for Klaviyo bulk upload manager: uploading, polling, getting upload stats, and extracting profiles.
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go Introduced interfaces and data structures for uploading, polling, and extracting profiles for Klaviyo bulk uploads.
router/batchrouter/asyncdestinationmanager/manager.go Added import for klaviyobulkupload and case for "KLAVIYO_BULK_UPLOAD" in newRegularManager function.
utils/misc/misc.go Modified BatchDestinations function to include "KLAVIYO_BULK_UPLOAD" in the list of batch destinations.

Sequence Diagram(s) (Beta)

sequenceDiagram
    participant Client
    participant Manager
    participant KlaviyoBulkUploader
    participant KlaviyoAPI

    Client->>Manager: newRegularManager("KLAVIYO_BULK_UPLOAD")
    Manager->>KlaviyoBulkUploader: NewManager(destination)
    KlaviyoBulkUploader-->>Manager: KlaviyoBulkUploader instance

    Client->>KlaviyoBulkUploader: Upload(asyncDestStruct)
    KlaviyoBulkUploader->>KlaviyoAPI: Upload data
    KlaviyoAPI-->>KlaviyoBulkUploader: Upload response
    KlaviyoBulkUploader-->>Client: AsyncUploadOutput

    Client->>KlaviyoBulkUploader: Poll(pollInput)
    KlaviyoBulkUploader->>KlaviyoAPI: Poll job status
    KlaviyoAPI-->>KlaviyoBulkUploader: PollStatusResponse
    KlaviyoBulkUploader-->>Client: PollStatusResponse

    Client->>KlaviyoBulkUploader: GetUploadStats(UploadStatsInput)
    KlaviyoBulkUploader->>KlaviyoAPI: Get upload stats
    KlaviyoAPI-->>KlaviyoBulkUploader: UploadStatsResponse
    KlaviyoBulkUploader-->>Client: UploadStatsResponse

Poem

In the code, a rabbit hops with glee,
Bringing Klaviyo bulk uploads, oh so free!
With payloads and profiles, it does weave,
Data chunked and transformed, you'll believe.
Polling jobs, stats in sight,
Klaviyo's future, shining bright.
🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@yashasvibajpai yashasvibajpai changed the title feat: onboard klaviyo bulk upload destination, initial commitx2 feat: onboard klaviyo bulk upload destination May 21, 2024
@yashasvibajpai yashasvibajpai added WIP work in progress Do not merge labels May 24, 2024
@yashasvibajpai yashasvibajpai marked this pull request as ready for review May 24, 2024 19:36
Copy link

codecov bot commented May 25, 2024

Codecov Report

Attention: Patch coverage is 10.68966% with 259 lines in your changes missing coverage. Please review.

Project coverage is 74.27%. Comparing base (b124ce4) to head (f9b826c).
Report is 1 commits behind head on master.

Files Patch % Lines
...tionmanager/klaviyobulkupload/klaviyobulkupload.go 7.55% 256 Missing and 1 partial ⚠️
...ter/batchrouter/asyncdestinationmanager/manager.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #4682      +/-   ##
==========================================
- Coverage   74.63%   74.27%   -0.37%     
==========================================
  Files         414      415       +1     
  Lines       48632    48913     +281     
==========================================
+ Hits        36297    36328      +31     
- Misses       9957    10210     +253     
+ Partials     2378     2375       -3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@yashasvibajpai yashasvibajpai removed Do not merge WIP work in progress labels May 27, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between d3cc894 and 269a4cf.

Files selected for processing (8)
  • mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/common/utils.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_suite_test.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/manager.go (2 hunks)
  • utils/misc/misc.go (1 hunks)
Files skipped from review due to trivial changes (2)
  • mocks/router/klaviyobulkupload/klaviyobulkupload_mock.go
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_suite_test.go
Additional context used
golangci-lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go

163-163: Error return value of json.Unmarshal is not checked (errcheck)

GitHub Check: lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go

[failure] 163-163:
Error return value of json.Unmarshal is not checked (errcheck)

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

[failure] 28-28:
SEPERATOR is a misspelling of SEPARATOR (misspell)


[failure] 103-103:
SEPERATOR is a misspelling of SEPARATOR (misspell)


[failure] 176-176:
SEPERATOR is a misspelling of SEPARATOR (misspell)


[failure] 184-184:
SEPERATOR is a misspelling of SEPARATOR (misspell)


[failure] 331-331:
seperated is a misspelling of separated (misspell)


[failure] 377-377:
SEPERATOR is a misspelling of SEPARATOR (misspell)

Additional comments not posted (7)
router/batchrouter/asyncdestinationmanager/common/utils.go (1)

6-6: The addition of "KLAVIYO_BULK_UPLOAD" to the asyncDestinations slice correctly registers Klaviyo as a new asynchronous destination.

router/batchrouter/asyncdestinationmanager/manager.go (1)

11-11: The import of the Klaviyo bulk upload package and the addition of the "KLAVIYO_BULK_UPLOAD" case in the newRegularManager function are correctly implemented to support the new destination type.

Also applies to: 29-30

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go (2)

10-36: The interfaces defined for Uploader, HttpClient, Poller, ProfileExtractor, and UploadStats are well-structured and essential for the operations of the Klaviyo bulk uploader.


38-158: The data structures UploadResp, PollResp, UploadStatusResp, Payload, Data, Attributes, Profiles, Relationships, Lists, List, Input, and Profile are correctly defined and align with the requirements for handling Klaviyo bulk upload data.

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go (1)

32-37: The test functions TestNewManagerSuccess, TestUpload, TestFileReadSuccess, TestPoll, TestGetUploadStats, and TestExtractProfileValidInput are comprehensive and correctly validate the functionalities of the Klaviyo bulk uploader.

Also applies to: 39-58, 60-94, 96-121, 123-156, 158-169

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1)

61-67: The functions createFinalPayload, NewManager, chunkBySizeAndElements, Poll, GetUploadStats, generateKlaviyoErrorOutput, ExtractProfile, Upload, and Transform are well-implemented and crucial for the functionality of the Klaviyo bulk uploader.

Also applies to: 69-97, 99-178, 180-254, 256-268, 270-293, 295-390, 392-394

utils/misc/misc.go (1)

102-102: The addition of "KLAVIYO_BULK_UPLOAD" to the batchDestinations slice correctly implements the required support for the new Klaviyo bulk upload functionality.


inputPayloadJSON := `{"message":{"body":{"FORM":{},"JSON":{"data":{"attributes":{"profiles":{"data":[{"attributes":{"anonymous_id":111222334,"email":"qwe122@mail.com","first_name":"Testqwe0122","jobIdentifier":"111222334:1","last_name":"user0122","location":{"city":"delhi","country":"India","ip":"213.5.6.41"},"phone_number":"+919912000123"},"id":111222334,"type":"profile"}]}},"relationships":{"lists":{"data":[{"id":"UKth4J","type":"list"}]}},"type":"profile-bulk-import-job"}},"JSON_ARRAY":{},"XML":{}},"endpoint":"","files":{},"headers":{},"method":"POST","params":{},"type":"REST","userId":"","version":"1"},"metadata":{"job_id":1}}`
var inputPayload klaviyobulkupload.Input
json.Unmarshal([]byte(inputPayloadJSON), &inputPayload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle the error returned by json.Unmarshal to ensure robust error handling in tests.

- json.Unmarshal([]byte(inputPayloadJSON), &inputPayload)
+ err := json.Unmarshal([]byte(inputPayloadJSON), &inputPayload)
+ if err != nil {
+     t.Errorf("json.Unmarshal failed: %v", err)
+ }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
json.Unmarshal([]byte(inputPayloadJSON), &inputPayload)
json.Unmarshal([]byte(inputPayloadJSON), &inputPayload)
err := json.Unmarshal([]byte(inputPayloadJSON), &inputPayload)
if err != nil {
t.Errorf("json.Unmarshal failed: %v", err)
}
Tools
golangci-lint

163-163: Error return value of json.Unmarshal is not checked (errcheck)

GitHub Check: lint

[failure] 163-163:
Error return value of json.Unmarshal is not checked (errcheck)

Comment on lines 24 to 29
const (
KlaviyoAPIURL = "https://a.klaviyo.com/api/profile-bulk-import-jobs/"
BATCHSIZE = 25
MAXPAYLOADSIZE = 5000000
IMPORT_ID_SEPERATOR = ":"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct the spelling of "SEPARATOR" in the constant names to maintain professionalism in the codebase.

- IMPORT_ID_SEPERATOR = ":"
+ IMPORT_ID_SEPARATOR = ":"
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
const (
KlaviyoAPIURL = "https://a.klaviyo.com/api/profile-bulk-import-jobs/"
BATCHSIZE = 25
MAXPAYLOADSIZE = 5000000
IMPORT_ID_SEPERATOR = ":"
)
const (
KlaviyoAPIURL = "https://a.klaviyo.com/api/profile-bulk-import-jobs/"
BATCHSIZE = 25
MAXPAYLOADSIZE = 5000000
IMPORT_ID_SEPARATOR = ":"
)
Tools
GitHub Check: lint

[failure] 28-28:
SEPERATOR is a misspelling of SEPARATOR (misspell)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 269a4cf and d27701d.

Files selected for processing (2)
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1 hunks)
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload_test.go
Additional context used
golangci-lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

33-33: undefined: Profile (typecheck)


61-61: undefined: KlaviyoBulkUploader (typecheck)


69-69: undefined: Profile (typecheck)


99-99: undefined: KlaviyoBulkUploader (typecheck)


180-180: undefined: KlaviyoBulkUploader (typecheck)


256-256: undefined: KlaviyoBulkUploader (typecheck)


270-270: undefined: KlaviyoBulkUploader (typecheck)


295-295: undefined: KlaviyoBulkUploader (typecheck)


392-392: undefined: KlaviyoBulkUploader (typecheck)


34-34: undefined: Payload (typecheck)


35-35: undefined: Data (typecheck)


37-37: undefined: Attributes (typecheck)


38-38: undefined: Profiles (typecheck)


46-46: undefined: Relationships (typecheck)


47-47: undefined: Lists (typecheck)


48-48: undefined: List (typecheck)


62-62: undefined: KlaviyoBulkUploader (typecheck)


70-70: undefined: Profile (typecheck)


71-71: undefined: Profile (typecheck)


141-141: undefined: PollResp (typecheck)


218-218: undefined: UploadStatusResp (typecheck)


277-277: undefined: Profile (typecheck)


312-312: undefined: Profile (typecheck)


315-315: undefined: Input (typecheck)


322-322: undefined: Profile (typecheck)


369-369: undefined: UploadResp (typecheck)

GitHub Check: lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

[failure] 331-331:
seperated is a misspelling of separated (misspell)

Comment on lines +392 to +394
func (kbu *KlaviyoBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(string(job.EventPayload), job.JobID)
}
Copy link
Contributor

@coderabbitai coderabbitai bot Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined type KlaviyoBulkUploader.

The method Transform uses KlaviyoBulkUploader as a receiver type, which is undefined. Ensure this type is properly defined or imported.

Tools
golangci-lint

392-392: undefined: KlaviyoBulkUploader (typecheck)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KlaviyoBulkUploader is defined in types.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yashasvibajpai, thank you for pointing that out. I've verified that the KlaviyoBulkUploader type is indeed defined in types.go within the klaviyobulkupload package. I appreciate your clarification.

If there's anything else you need, feel free to ask!

Analysis chain
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package
rg "type KlaviyoBulkUploader" --glob "router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go"

Length of output: 212


Learnings added
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394
Timestamp: 2024-06-04T21:54:20.163Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.

Note: Learnings are effective only in the context of specific code segments. To apply general coding guidelines, please configure review instructions.


Learnings used
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67
Timestamp: 2024-06-04T21:50:56.257Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is defined in types.go. can you recheck?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yashasvibajpai, I've rechecked the types.go file within the klaviyobulkupload package and can confirm that the KlaviyoBulkUploader type is indeed defined there as a struct. Here's the relevant line from the file:

type KlaviyoBulkUploader struct {
    // struct fields...
}

Thank you for your patience. If there's anything else you need, feel free to ask!

Analysis chain
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package
rg "type KlaviyoBulkUploader" --glob "router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go"

Length of output: 212


Learnings used
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394
Timestamp: 2024-06-04T21:54:20.163Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.

User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67
Timestamp: 2024-06-04T21:50:56.257Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.

Comment on lines 99 to 178
func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatusResponse {
client := &http.Client{}
destConfig := kbu.destinationConfig
privateApiKey, _ := destConfig["privateApiKey"].(string)
importIds := strings.Split(pollInput.ImportId, IMPORT_ID_SEPARATOR)
importStatuses := make(map[string]string)
failedImports := make([]string, 0)
for _, importId := range importIds {
importStatuses[importId] = "queued"
}

for {
allComplete := true
for importId, status := range importStatuses {
if status != "complete" {
allComplete = false
pollUrl := KlaviyoAPIURL + importId
req, err := http.NewRequest("GET", pollUrl, nil)
if err != nil {
return common.PollStatusResponse{
Complete: false,
InProgress: false,
HasFailed: true,
Error: err.Error(),
}
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Klaviyo-API-Key "+privateApiKey)
req.Header.Set("revision", "2024-05-15")
resp, err := client.Do(req)
if err != nil {
return common.PollStatusResponse{
Complete: false,
InProgress: false,
StatusCode: 0,
HasFailed: true,
Error: err.Error(),
HasWarning: false,
}
}

var pollBodyBytes []byte
var pollresp PollResp
pollBodyBytes, _ = io.ReadAll(resp.Body)
defer func() { _ = resp.Body.Close() }()

pollRespErr := json.Unmarshal(pollBodyBytes, &pollresp)
if pollRespErr != nil {
return common.PollStatusResponse{
Complete: false,
InProgress: false,
StatusCode: 0,
HasFailed: true,
Error: pollRespErr.Error(),
HasWarning: false,
}
}
// Update the status in the map
importStatuses[importId] = pollresp.Data.Attributes.Status

// If Failed_count > 0, add the importId to failedImports
if pollresp.Data.Attributes.Failed_count > 0 {
failedImports = append(failedImports, importId)
}
}
}
if allComplete {
break
}
}

return common.PollStatusResponse{
Complete: true,
HasFailed: true,
HasWarning: false,
StatusCode: 200,
InProgress: false,
FailedJobURLs: strings.Join(failedImports, IMPORT_ID_SEPARATOR),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined type KlaviyoBulkUploader.

The receiver type KlaviyoBulkUploader for the method Poll is undefined in this scope. Ensure this type is correctly defined or imported.

Tools
golangci-lint

99-99: undefined: KlaviyoBulkUploader (typecheck)


141-141: undefined: PollResp (typecheck)

Comment on lines 295 to 390
func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationStruct) common.AsyncUploadOutput {
startTime := time.Now()
destination := asyncDestStruct.Destination
filePath := asyncDestStruct.FileName
importingJobIDs := asyncDestStruct.ImportingJobIDs
destType := destination.DestinationDefinition.Name
destinationID := destination.ID
listId, _ := destination.Config["listId"].(string)
statLabels := stats.Tags{
"module": "batch_router",
"destType": destType,
}
file, err := os.Open(filePath)
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while opening file. ", err, asyncDestStruct.ImportingJobIDs, destinationID)
}
defer file.Close()
var combinedProfiles []Profile
scanner := bufio.NewScanner(file)
for scanner.Scan() {
var input Input
line := scanner.Text()
err := json.Unmarshal([]byte(line), &input)
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while parsing JSON.", err, importingJobIDs, destinationID)
}
profileStructure := kbu.ExtractProfile(input)
if profileStructure == (Profile{}) {
return kbu.generateKlaviyoErrorOutput("Error while extracting profile. No profile data passed", err, importingJobIDs, destinationID)
}
combinedProfiles = append(combinedProfiles, profileStructure)
}

chunks, _ := chunkBySizeAndElements(combinedProfiles, MAXPAYLOADSIZE, BATCHSIZE)
eventsSuccessStat := stats.Default.NewTaggedStat("success_job_count", stats.CountType, statLabels)

var importIds []string // DelimitedImportIds is : seperated importIds
var DelimitedUploadRespErr string
for _, chunk := range chunks {
combinedPayload := createFinalPayload(chunk, listId)

// Convert combined payload to JSON
outputJSON, err := json.MarshalIndent(combinedPayload, "", " ")
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while marshaling combined JSON.", err, importingJobIDs, destinationID)
}
uploadURL := KlaviyoAPIURL
client := &http.Client{}
req, err := http.NewRequest("POST", uploadURL, bytes.NewBuffer(outputJSON))
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while creating request.", err, importingJobIDs, destinationID)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Klaviyo-API-Key "+destination.Config["privateApiKey"].(string))
req.Header.Set("revision", "2024-05-15")

uploadTimeStat := stats.Default.NewTaggedStat("async_upload_time", stats.TimerType, statLabels)
payloadSizeStat := stats.Default.NewTaggedStat("payload_size", stats.HistogramType, statLabels)
payloadSizeStat.Observe(float64(len(outputJSON)))

resp, err := client.Do(req)
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while sending request.", err, importingJobIDs, destinationID)
}

var bodyBytes []byte
bodyBytes, _ = io.ReadAll(resp.Body)
defer func() { _ = resp.Body.Close() }()
uploadTimeStat.Since(startTime)

if resp.StatusCode != 202 {
return kbu.generateKlaviyoErrorOutput("Got non 202 as statusCode.", fmt.Errorf(string(bodyBytes)), importingJobIDs, destinationID)
}
var uploadresp UploadResp
uploadRespErr := json.Unmarshal((bodyBytes), &uploadresp)
if uploadRespErr != nil {
return kbu.generateKlaviyoErrorOutput("Error while unmarshaling response.", uploadRespErr, importingJobIDs, destinationID)
}
importIds = append(importIds, uploadresp.Data.Id)
}
importParameters, err := json.Marshal(common.ImportParameters{
ImportId: strings.Join(importIds, IMPORT_ID_SEPARATOR),
})
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while marshaling parameters.", err, importingJobIDs, destinationID)
}
eventsSuccessStat.Count(len(asyncDestStruct.ImportingJobIDs))

return common.AsyncUploadOutput{
ImportingParameters: importParameters,
ImportingJobIDs: importingJobIDs,
SuccessResponse: DelimitedUploadRespErr,
DestinationID: destination.ID,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined type KlaviyoBulkUploader.

The method Upload uses KlaviyoBulkUploader as a receiver type, which is not defined in this file or imported. This will cause a compilation error.

Tools
golangci-lint

295-295: undefined: KlaviyoBulkUploader (typecheck)


312-312: undefined: Profile (typecheck)


315-315: undefined: Input (typecheck)


322-322: undefined: Profile (typecheck)


369-369: undefined: UploadResp (typecheck)

GitHub Check: lint

[failure] 331-331:
seperated is a misspelling of separated (misspell)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between d27701d and 7fc0a12.

Files selected for processing (1)
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1 hunks)
Additional context used
Learnings (1)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (2)
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394
Timestamp: 2024-06-04T21:54:20.163Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67
Timestamp: 2024-06-04T21:50:56.257Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
golangci-lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

33-33: undefined: Profile (typecheck)


61-61: undefined: KlaviyoBulkUploader (typecheck)


69-69: undefined: Profile (typecheck)


99-99: undefined: KlaviyoBulkUploader (typecheck)


180-180: undefined: KlaviyoBulkUploader (typecheck)


256-256: undefined: KlaviyoBulkUploader (typecheck)


270-270: undefined: KlaviyoBulkUploader (typecheck)


295-295: undefined: KlaviyoBulkUploader (typecheck)


398-398: undefined: KlaviyoBulkUploader (typecheck)


34-34: undefined: Payload (typecheck)


35-35: undefined: Data (typecheck)


37-37: undefined: Attributes (typecheck)


38-38: undefined: Profiles (typecheck)


46-46: undefined: Relationships (typecheck)


47-47: undefined: Lists (typecheck)


48-48: undefined: List (typecheck)


62-62: undefined: KlaviyoBulkUploader (typecheck)


70-70: undefined: Profile (typecheck)


71-71: undefined: Profile (typecheck)


141-141: undefined: PollResp (typecheck)


218-218: undefined: UploadStatusResp (typecheck)


277-277: undefined: Profile (typecheck)


312-312: undefined: Profile (typecheck)


315-315: undefined: Input (typecheck)


322-322: undefined: Profile (typecheck)


374-374: undefined: UploadResp (typecheck)

GitHub Check: lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

[failure] 335-335:
seperated is a misspelling of separated (misspell)

Additional comments not posted (7)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (7)

61-67: Ensure KlaviyoBulkUploader is defined.

As previously noted, ensure the KlaviyoBulkUploader type is defined or imported to avoid compilation issues.

Tools
golangci-lint

61-61: undefined: KlaviyoBulkUploader (typecheck)


62-62: undefined: KlaviyoBulkUploader (typecheck)


99-178: Verify the definition of KlaviyoBulkUploader and PollResp.

As previously noted, ensure the KlaviyoBulkUploader and PollResp types are defined or imported to avoid compilation issues.

Tools
golangci-lint

99-99: undefined: KlaviyoBulkUploader (typecheck)


141-141: undefined: PollResp (typecheck)


180-254: Check for undefined KlaviyoBulkUploader and UploadStatusResp.

As previously noted, ensure the KlaviyoBulkUploader and UploadStatusResp types are defined or imported to avoid compilation issues.

Tools
golangci-lint

180-180: undefined: KlaviyoBulkUploader (typecheck)


218-218: undefined: UploadStatusResp (typecheck)


256-268: Confirm the definition of KlaviyoBulkUploader.

As previously noted, ensure the KlaviyoBulkUploader type is defined or imported to avoid compilation issues.

Tools
golangci-lint

256-256: undefined: KlaviyoBulkUploader (typecheck)


295-396: Review the use of KlaviyoBulkUploader, Profile, Input, and UploadResp.

As previously noted, ensure the KlaviyoBulkUploader, Profile, Input, and UploadResp types are defined or imported to avoid compilation issues.

Tools
golangci-lint

295-295: undefined: KlaviyoBulkUploader (typecheck)


312-312: undefined: Profile (typecheck)


315-315: undefined: Input (typecheck)


322-322: undefined: Profile (typecheck)


374-374: undefined: UploadResp (typecheck)

GitHub Check: lint

[failure] 335-335:
seperated is a misspelling of separated (misspell)


398-400: Ensure KlaviyoBulkUploader is defined for Transform.

As previously noted, ensure the KlaviyoBulkUploader type is defined or imported to avoid compilation issues.

Tools
golangci-lint

398-398: undefined: KlaviyoBulkUploader (typecheck)


335-335: Correct spelling mistake in comment.

- var importIds []string // DelimitedImportIds is : seperated importIds
+ var importIds []string // DelimitedImportIds is : separated importIds

Likely invalid or redundant comment.

Tools
GitHub Check: lint

[failure] 335-335:
seperated is a misspelling of separated (misspell)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 7fc0a12 and e0dd47a.

Files selected for processing (1)
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1 hunks)
Additional context used
Learnings (1)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (2)
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394
Timestamp: 2024-06-04T21:54:20.163Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67
Timestamp: 2024-06-04T21:50:56.257Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
golangci-lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

33-33: undefined: Profile (typecheck)


61-61: undefined: KlaviyoBulkUploader (typecheck)


69-69: undefined: Profile (typecheck)


99-99: undefined: KlaviyoBulkUploader (typecheck)


189-189: undefined: KlaviyoBulkUploader (typecheck)


265-265: undefined: KlaviyoBulkUploader (typecheck)


279-279: undefined: KlaviyoBulkUploader (typecheck)


304-304: undefined: KlaviyoBulkUploader (typecheck)


416-416: undefined: KlaviyoBulkUploader (typecheck)


34-34: undefined: Payload (typecheck)


35-35: undefined: Data (typecheck)


37-37: undefined: Attributes (typecheck)


38-38: undefined: Profiles (typecheck)


46-46: undefined: Relationships (typecheck)


47-47: undefined: Lists (typecheck)


48-48: undefined: List (typecheck)


62-62: undefined: KlaviyoBulkUploader (typecheck)


70-70: undefined: Profile (typecheck)


71-71: undefined: Profile (typecheck)


141-141: undefined: PollResp (typecheck)


227-227: undefined: UploadStatusResp (typecheck)


286-286: undefined: Profile (typecheck)


323-323: undefined: Profile (typecheck)


326-326: undefined: Input (typecheck)


333-333: undefined: Profile (typecheck)


387-387: undefined: UploadResp (typecheck)

GitHub Check: lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

[failure] 346-346:
seperated is a misspelling of separated (misspell)

Additional comments not posted (7)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (7)

346-346: Correct spelling mistake.

- var importIds []string // DelimitedImportIds is : seperated importIds
+ var importIds []string // DelimitedImportIds is : separated importIds

Likely invalid or redundant comment.

Tools
GitHub Check: lint

[failure] 346-346:
seperated is a misspelling of separated (misspell)


99-187: Verify the definition of KlaviyoBulkUploader.

The method Poll uses KlaviyoBulkUploader as a receiver type, which is undefined. Ensure this type is properly defined or imported.

Verification successful

The KlaviyoBulkUploader type is indeed defined in the types.go file within the klaviyobulkupload package. Here is the relevant definition:

type KlaviyoBulkUploader struct {
	destName             string
	destinationConfig    map[string]interface{}
	logger               logger.Logger
	Client               *http.Client
	jobIdToIdentifierMap map[string]int64
}

This confirms that the KlaviyoBulkUploader type is properly defined and the Poll method can use it as a receiver.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package
ast-grep --lang go --pattern $'type KlaviyoBulkUploader struct { $$$ }'

Length of output: 817

Tools
golangci-lint

99-99: undefined: KlaviyoBulkUploader (typecheck)


141-141: undefined: PollResp (typecheck)


304-414: Ensure KlaviyoBulkUploader is defined for Upload.

The method Upload uses KlaviyoBulkUploader as a receiver type, which is not defined in this file or imported. This will cause a compilation error.

Tools
golangci-lint

304-304: undefined: KlaviyoBulkUploader (typecheck)


323-323: undefined: Profile (typecheck)


326-326: undefined: Input (typecheck)


333-333: undefined: Profile (typecheck)


387-387: undefined: UploadResp (typecheck)

GitHub Check: lint

[failure] 346-346:
seperated is a misspelling of separated (misspell)


189-263: Ensure KlaviyoBulkUploader is defined for GetUploadStats.

The receiver type KlaviyoBulkUploader for the method GetUploadStats is undefined. This needs to be defined or imported to avoid compilation issues.

Verification successful

The KlaviyoBulkUploader type is correctly defined in types.go within the klaviyobulkupload package and is used as a receiver in the GetUploadStats method.

  • types.go:
    • type KlaviyoBulkUploader struct { ... }
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package
ast-grep --lang go --pattern $'type KlaviyoBulkUploader struct { $$$ }'

Length of output: 817

Tools
golangci-lint

189-189: undefined: KlaviyoBulkUploader (typecheck)


227-227: undefined: UploadStatusResp (typecheck)


61-67: Ensure KlaviyoBulkUploader is defined.

The type KlaviyoBulkUploader is used but not defined or imported. This will cause a compilation error. Ensure that this type is properly defined or imported.

Verification successful

The KlaviyoBulkUploader type is properly defined in types.go within the klaviyobulkupload package. The fields destName, destinationConfig, and logger match the usage in the NewManager function.

  • destName is defined as string.
  • destinationConfig is defined as map[string]interface{}.
  • logger is defined as logger.Logger.

This confirms that the type is correctly defined and used.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package
ast-grep --lang go --pattern $'type KlaviyoBulkUploader struct { $$$ }'

Length of output: 817

Tools
golangci-lint

61-61: undefined: KlaviyoBulkUploader (typecheck)


62-62: undefined: KlaviyoBulkUploader (typecheck)


265-277: Verify the definition of KlaviyobulkUploader for generateKlaviyoErrorOutput.

The method generateKlaviyoErrorOutput uses KlaviyoBulkUploader as a receiver type, which is not defined in this file or imported. This will cause a compilation error.

Verification successful

The KlaviyoBulkUploader type is defined in types.go within the klaviyobulkupload package and is used as a receiver in the generateKlaviyoErrorOutput method. There is no compilation error related to the definition of KlaviyoBulkUploader.

  • KlaviyoBulkUploader is defined in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/types.go.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Search for the definition of KlaviyoBulkUploader in types.go within the klaviyobulkupload package
ast-grep --lang go --pattern $'type KlaviyoBulkUploader struct { $$$ }'

Length of output: 817

Tools
golangci-lint

265-265: undefined: KlaviyoBulkUploader (typecheck)


416-418: Verify the definition of KlaviyoBulkUploader for Transform.

The method Transform uses KlaviyoBulkUploader as a receiver type, which is undefined. Ensure this type is properly defined or imported.

Tools
golangci-lint

416-416: undefined: KlaviyoBulkUploader (typecheck)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between e0dd47a and f575e6a.

Files selected for processing (1)
  • router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (1 hunks)
Additional context used
Learnings (1)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (2)
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394
Timestamp: 2024-06-04T21:54:20.163Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67
Timestamp: 2024-06-04T21:50:56.257Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
golangci-lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

33-33: undefined: Profile (typecheck)


61-61: undefined: KlaviyoBulkUploader (typecheck)


69-69: undefined: Profile (typecheck)


99-99: undefined: KlaviyoBulkUploader (typecheck)


189-189: undefined: KlaviyoBulkUploader (typecheck)


265-265: undefined: KlaviyoBulkUploader (typecheck)


279-279: undefined: KlaviyoBulkUploader (typecheck)


304-304: undefined: KlaviyoBulkUploader (typecheck)


412-412: undefined: KlaviyoBulkUploader (typecheck)


34-34: undefined: Payload (typecheck)


35-35: undefined: Data (typecheck)


37-37: undefined: Attributes (typecheck)


38-38: undefined: Profiles (typecheck)


46-46: undefined: Relationships (typecheck)


47-47: undefined: Lists (typecheck)


48-48: undefined: List (typecheck)


62-62: undefined: KlaviyoBulkUploader (typecheck)


70-70: undefined: Profile (typecheck)


71-71: undefined: Profile (typecheck)


141-141: undefined: PollResp (typecheck)


227-227: undefined: UploadStatusResp (typecheck)


286-286: undefined: Profile (typecheck)


323-323: undefined: Profile (typecheck)


326-326: undefined: Input (typecheck)


333-333: undefined: Profile (typecheck)


384-384: undefined: UploadResp (typecheck)

GitHub Check: lint
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

[failure] 343-343:
seperated is a misspelling of separated (misspell)

Additional comments not posted (2)
router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go (2)

61-67: Ensure KlaviyoBulkUploader is defined.

The type KlaviyoBulkUploader is used but not defined or imported. This will cause a compilation error. Ensure that this type is properly defined or imported.

Skipped due to learnings
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67
Timestamp: 2024-06-04T21:50:56.257Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394
Timestamp: 2024-06-04T21:54:20.163Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
Tools
golangci-lint

61-61: undefined: KlaviyoBulkUploader (typecheck)


62-62: undefined: KlaviyoBulkUploader (typecheck)


412-414: Undefined type KlaviyoBulkUploader in Transform.

The method Transform uses KlaviyoBulkUploader as a receiver type, which is undefined. Ensure this type is properly defined or imported.

Skipped due to learnings
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:392-394
Timestamp: 2024-06-04T21:54:20.163Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package and is used as a receiver in various methods within the `klaviyobulkupload.go` file.
User: yashasvibajpai
PR: rudderlabs/rudder-server#4682
File: router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go:61-67
Timestamp: 2024-06-04T21:50:56.257Z
Learning: The `KlaviyoBulkUploader` type is defined in `types.go` within the `klaviyobulkupload` package.
Tools
golangci-lint

412-412: undefined: KlaviyoBulkUploader (typecheck)

Comment on lines +33 to +59
func createFinalPayload(combinedProfiles []Profile, listId string) Payload {
payload := Payload{
Data: Data{
Type: "profile-bulk-import-job",
Attributes: Attributes{
Profiles: Profiles{
Data: combinedProfiles,
},
},
},
}

if listId != "" {
payload.Data.Relationships = &Relationships{
Lists: Lists{
Data: []List{
{
Type: "list",
ID: listId,
},
},
},
}
}

return payload
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address undefined types in createFinalPayload.

The types Profile, Payload, Data, Attributes, Profiles, Relationships, Lists, and List are used but not defined or imported in this file. This will cause compilation errors. Ensure these types are correctly defined or imported.

Tools
golangci-lint

33-33: undefined: Profile (typecheck)


34-34: undefined: Payload (typecheck)


35-35: undefined: Data (typecheck)


37-37: undefined: Attributes (typecheck)


38-38: undefined: Profiles (typecheck)


46-46: undefined: Relationships (typecheck)


47-47: undefined: Lists (typecheck)


48-48: undefined: List (typecheck)

Comment on lines +69 to +97
func chunkBySizeAndElements(combinedProfiles []Profile, maxBytes, maxElements int) ([][]Profile, error) {
var chunks [][]Profile
var chunk []Profile
var chunkSize int

for _, profile := range combinedProfiles {
profileJSON, err := json.Marshal(profile)
if err != nil {
return nil, fmt.Errorf("failed to marshal profile: %w", err)
}

profileSize := len(profileJSON)

if (chunkSize+profileSize > maxBytes || len(chunk) == maxElements) && len(chunk) > 0 {
chunks = append(chunks, chunk)
chunk = nil
chunkSize = 0
}

chunk = append(chunk, profile)
chunkSize += profileSize
}

if len(chunk) > 0 {
chunks = append(chunks, chunk)
}

return chunks, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle undefined type Profile in chunkBySizeAndElements.

The type Profile is used but not defined or imported in this file. This will lead to compilation errors. Ensure this type is correctly defined or imported.

Tools
golangci-lint

69-69: undefined: Profile (typecheck)


70-70: undefined: Profile (typecheck)


71-71: undefined: Profile (typecheck)

Comment on lines +279 to +302
func (kbu *KlaviyoBulkUploader) ExtractProfile(input Input) Profile {
Message := input.Message
Body := Message.Body
Json := Body.JSON
Data := Json.Data
Attributes := Data.Attributes
if len(Attributes.Profiles.Data) == 0 {
return Profile{}
}
profileObject := Attributes.Profiles.Data[0]

jobIdentifier := profileObject.Attributes.JobIdentifier
jobIdentifierArray := strings.Split(jobIdentifier, ":")
jobIdentifierValue, _ := strconv.ParseInt(jobIdentifierArray[1], 10, 64)
if kbu.jobIdToIdentifierMap == nil {
kbu.jobIdToIdentifierMap = make(map[string]int64)
}
kbu.jobIdToIdentifierMap[jobIdentifierArray[0]] = jobIdentifierValue

// delete jobIdentifier from the attributes map as it is not required in the final payload
profileObject.Attributes.JobIdentifier = ""

return profileObject
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address undefined types Profile and Input in ExtractProfile.

The types Profile and Input are used but not defined or imported in this file. This will lead to compilation errors. Ensure these types are correctly defined or imported.

Tools
golangci-lint

279-279: undefined: KlaviyoBulkUploader (typecheck)


286-286: undefined: Profile (typecheck)

Comment on lines +304 to +410
destType := destination.DestinationDefinition.Name
destinationID := destination.ID
listId, _ := destination.Config["listId"].(string)
statLabels := stats.Tags{
"module": "batch_router",
"destType": destType,
}
file, err := os.Open(filePath)
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while opening file. ", err, asyncDestStruct.ImportingJobIDs, destinationID)
}
defer file.Close()
var combinedProfiles []Profile
scanner := bufio.NewScanner(file)
for scanner.Scan() {
var input Input
line := scanner.Text()
err := json.Unmarshal([]byte(line), &input)
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while parsing JSON.", err, importingJobIDs, destinationID)
}
profileStructure := kbu.ExtractProfile(input)
if profileStructure == (Profile{}) {
return kbu.generateKlaviyoErrorOutput("Error while extracting profile. No profile data passed", err, importingJobIDs, destinationID)
}
combinedProfiles = append(combinedProfiles, profileStructure)
}

chunks, _ := chunkBySizeAndElements(combinedProfiles, MAXPAYLOADSIZE, BATCHSIZE)

eventsSuccessStat := stats.Default.NewTaggedStat("success_job_count", stats.CountType, statLabels)

var importIds []string // DelimitedImportIds is : seperated importIds
var DelimitedUploadRespErr string

for idx, chunk := range chunks {
combinedPayload := createFinalPayload(chunk, listId)

// Convert combined payload to JSON
outputJSON, err := json.MarshalIndent(combinedPayload, "", " ")
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while marshaling combined JSON.", err, importingJobIDs, destinationID)
}
uploadURL := KlaviyoAPIURL
client := &http.Client{}
req, err := http.NewRequest("POST", uploadURL, bytes.NewBuffer(outputJSON))
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while creating request.", err, importingJobIDs, destinationID)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Klaviyo-API-Key "+destination.Config["privateApiKey"].(string))
req.Header.Set("revision", "2024-05-15")

uploadTimeStat := stats.Default.NewTaggedStat("async_upload_time", stats.TimerType, statLabels)
payloadSizeStat := stats.Default.NewTaggedStat("payload_size", stats.HistogramType, statLabels)
payloadSizeStat.Observe(float64(len(outputJSON)))

resp, err := client.Do(req)
if err != nil {
failedJobs = append(failedJobs, importingJobIDs[idx])
kbu.logger.Error("Error while sending request.", err)
}

var bodyBytes []byte
bodyBytes, _ = io.ReadAll(resp.Body)
defer func() { _ = resp.Body.Close() }()
uploadTimeStat.Since(startTime)

if resp.StatusCode != 202 {
failedJobs = append(failedJobs, importingJobIDs[idx])
kbu.logger.Error("Got non 202 as statusCode.", fmt.Errorf(string(bodyBytes)))
}
var uploadresp UploadResp
uploadRespErr := json.Unmarshal((bodyBytes), &uploadresp)
if uploadRespErr != nil {
failedJobs = append(failedJobs, importingJobIDs[idx])
kbu.logger.Error("Error while unmarshaling response.", uploadRespErr)
}
importIds = append(importIds, uploadresp.Data.Id)
}
importParameters, err := json.Marshal(common.ImportParameters{
ImportId: strings.Join(importIds, IMPORT_ID_SEPARATOR),
})
if err != nil {
return kbu.generateKlaviyoErrorOutput("Error while marshaling parameters.", err, importingJobIDs, destinationID)
}
successJobs, _ = lo.Difference(importingJobIDs, failedJobs)
eventsSuccessStat.Count(len(asyncDestStruct.ImportingJobIDs))

return common.AsyncUploadOutput{
ImportingParameters: importParameters,
SucceededJobIDs: successJobs,
FailedJobIDs: failedJobs,
FailedCount: len(failedJobs),
ImportingJobIDs: successJobs,
SuccessResponse: DelimitedUploadRespErr,
DestinationID: destination.ID,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined type KlaviyoBulkUploader in Upload.

The method Upload uses KlaviyoBulkUploader as a receiver type, which is not defined in this file or imported. This will cause a compilation error.

Tools
golangci-lint

304-304: undefined: KlaviyoBulkUploader (typecheck)


323-323: undefined: Profile (typecheck)


326-326: undefined: Input (typecheck)


333-333: undefined: Profile (typecheck)


384-384: undefined: UploadResp (typecheck)

GitHub Check: lint

[failure] 343-343:
seperated is a misspelling of separated (misspell)

Comment on lines +99 to +187
func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatusResponse {
client := &http.Client{}
destConfig := kbu.destinationConfig
privateApiKey, _ := destConfig["privateApiKey"].(string)
importIds := strings.Split(pollInput.ImportId, IMPORT_ID_SEPARATOR)
importStatuses := make(map[string]string)
failedImports := make([]string, 0)
for _, importId := range importIds {
importStatuses[importId] = "queued"
}

for {
allComplete := true
for importId, status := range importStatuses {
if status != "complete" {
allComplete = false
pollUrl := KlaviyoAPIURL + importId
req, err := http.NewRequest("GET", pollUrl, nil)
if err != nil {
return common.PollStatusResponse{
Complete: true,
InProgress: false,
HasFailed: true,
Error: err.Error(),
}
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Klaviyo-API-Key "+privateApiKey)
req.Header.Set("revision", "2024-05-15")
resp, err := client.Do(req)
if err != nil {
return common.PollStatusResponse{
Complete: true,
InProgress: false,
StatusCode: 0,
HasFailed: true,
Error: err.Error(),
HasWarning: false,
}
}

var pollBodyBytes []byte
var pollresp PollResp
pollBodyBytes, _ = io.ReadAll(resp.Body)
defer func() { _ = resp.Body.Close() }()

pollRespErr := json.Unmarshal(pollBodyBytes, &pollresp)
if pollRespErr != nil {
return common.PollStatusResponse{
Complete: true,
InProgress: false,
StatusCode: 0,
HasFailed: true,
Error: pollRespErr.Error(),
HasWarning: false,
}
}
// Update the status in the map
importStatuses[importId] = pollresp.Data.Attributes.Status

// If Failed_count > 0, add the importId to failedImports
if pollresp.Data.Attributes.Failed_count > 0 {
failedImports = append(failedImports, importId)
}
}
}
if allComplete {
break
}
}
if len(failedImports) == 0 {
return common.PollStatusResponse{
Complete: true,
HasFailed: false,
HasWarning: false,
StatusCode: 200,
InProgress: false,
FailedJobURLs: "",
}
}
return common.PollStatusResponse{
Complete: true,
HasFailed: true,
HasWarning: false,
StatusCode: 200,
InProgress: false,
FailedJobURLs: strings.Join(failedImports, IMPORT_ID_SEPARATOR),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify the use of KlaviyoBulkUploader in Poll.

The method Poll uses KlaviyoBulkUploader as a receiver type, which is undefined. Ensure this type is properly defined or imported.

Tools
golangci-lint

99-99: undefined: KlaviyoBulkUploader (typecheck)


141-141: undefined: PollResp (typecheck)

Comment on lines +265 to +277
func (kbu *KlaviyoBulkUploader) generateKlaviyoErrorOutput(errorString string, err error, importingJobIds []int64, destinationID string) common.AsyncUploadOutput {
eventsAbortedStat := stats.Default.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{
"module": "batch_router",
"destType": "KLAVIYO_BULK_UPLOAD",
})
eventsAbortedStat.Count(len(importingJobIds))
return common.AsyncUploadOutput{
AbortCount: len(importingJobIds),
DestinationID: destinationID,
AbortJobIDs: importingJobIds,
AbortReason: fmt.Sprintf("%s %v", errorString, err.Error()),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined type KlaviyoBulkUploader in generateKlaviyoErrorOutput.

The method generateKlaviyoErrorOutput uses KlaviyoBulkUploader as a receiver type, which is not defined in this file or imported. This will cause a compilation error.

Tools
golangci-lint

265-265: undefined: KlaviyoBulkUploader (typecheck)


eventsSuccessStat := stats.Default.NewTaggedStat("success_job_count", stats.CountType, statLabels)

var importIds []string // DelimitedImportIds is : seperated importIds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct spelling mistake.

- var importIds []string // DelimitedImportIds is : seperated importIds
+ var importIds []string // DelimitedImportIds is : separated importIds
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
var importIds []string // DelimitedImportIds is : seperated importIds
var importIds []string // DelimitedImportIds is : separated importIds
Tools
GitHub Check: lint

[failure] 343-343:
seperated is a misspelling of separated (misspell)

Comment on lines +189 to +263
func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUploadStatsInput) common.GetUploadStatsResponse {
client := &http.Client{}
destConfig := kbu.destinationConfig
privateApiKey, _ := destConfig["privateApiKey"].(string)
pollResultImportIds := strings.Split(UploadStatsInput.FailedJobURLs, IMPORT_ID_SEPARATOR)

// make a map of jobId to error reason
jobIdToErrorMap := make(map[int64]string)

importingList := UploadStatsInput.ImportingList
jobIDs := []int64{}
for _, job := range importingList {
jobIDs = append(jobIDs, job.JobID)
}

ErrorMap := kbu.jobIdToIdentifierMap
var successKeys []int64

var failedJobIds []int64
for _, pollResultImportId := range pollResultImportIds {
importErrorUrl := KlaviyoAPIURL + pollResultImportId + "/import-errors"
req, err := http.NewRequest("GET", importErrorUrl, nil)
if err != nil {
return common.GetUploadStatsResponse{}
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization",
"Klaviyo-API-Key "+privateApiKey)
req.Header.Set("revision", "2024-05-15")
resp, err := client.Do(req)
if err != nil {
return common.GetUploadStatsResponse{
StatusCode: 400,
Error: err.Error(),
}
}

var uploadStatsBodyBytes []byte
var uploadStatsResp UploadStatusResp
uploadStatsBodyBytes, _ = io.ReadAll(resp.Body)
defer func() { _ = resp.Body.Close() }()

uploadStatsBodyBytesErr := json.Unmarshal(uploadStatsBodyBytes, &uploadStatsResp)
if uploadStatsBodyBytesErr != nil {
return common.GetUploadStatsResponse{
StatusCode: 400,
Error: uploadStatsBodyBytesErr.Error(),
}
}
// Iterate over the Data array and get the jobId and error detail and store in jobIdToErrorMap
for _, item := range uploadStatsResp.Data {
orgPayload := item.Attributes.OriginalPayload
var identifierId string
if orgPayload.Id != "" {
identifierId = orgPayload.Id
} else {
identifierId = orgPayload.AnonymousId
}
jobId := ErrorMap[identifierId]
failedJobIds = append(failedJobIds, jobId)
errorDetail := item.Attributes.Detail
jobIdToErrorMap[jobId] = errorDetail
}
}
successKeys, _ = lo.Difference(jobIDs, failedJobIds)
return common.GetUploadStatsResponse{
StatusCode: 200,
Error: "The import job failed",
Metadata: common.EventStatMeta{
FailedKeys: failedJobIds,
FailedReasons: jobIdToErrorMap,
SucceededKeys: successKeys,
},
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined type KlaviyoBulkUploader in GetUploadStats.

The receiver type KlaviyoBulkUploader for the method GetUploadStats is undefined. This needs to be defined or imported to avoid compilation issues.

Tools
golangci-lint

189-189: undefined: KlaviyoBulkUploader (typecheck)


227-227: undefined: UploadStatusResp (typecheck)

sanpj2292 and others added 2 commits June 6, 2024 17:47
* chore: update flaky test for error reporting aggregation test

* chore: use ElementsMatch func in require for assertion

* chore: update the logic of forming errors property

* chore: update for better readability

---------

Co-authored-by: Sai Sankeerth <sanpj2292@github.com>
@lvrach lvrach force-pushed the master branch 2 times, most recently from 4155fa2 to b124ce4 Compare June 6, 2024 15:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants