Skip to content

Commit

Permalink
chore: lint fixes for build
Browse files Browse the repository at this point in the history
  • Loading branch information
yashasvibajpai committed May 24, 2024
1 parent 50d4f60 commit b36167e
Showing 1 changed file with 28 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ import (
"time"

jsoniter "github.com/json-iterator/go"
"github.com/k0kubun/pp"
"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
"github.com/samber/lo"
)

var (
json = jsoniter.ConfigCompatibleWithStandardLibrary
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary

type KlaviyoBulkUploader struct {
destName string
Expand Down Expand Up @@ -92,7 +90,7 @@ func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStat
destConfig := kbu.destinationConfig
privateApiKey, _ := destConfig["privateApiKey"].(string)
importId := pollInput.ImportId
pp.Println("ImportId: ", importId)
// pp.Println("ImportId: ", importId)
// pp.Println("PrivateApiKey: ", privateApiKey)
pollUrl := "https://a.klaviyo.com/api/profile-bulk-import-jobs/" + importId
req, err := http.NewRequest("GET", pollUrl, nil)
Expand Down Expand Up @@ -137,28 +135,28 @@ func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStat
HasWarning: false,
}
}
totalCount := pollresp.Data.Attributes.Total_count
completedCount := pollresp.Data.Attributes.Completed_count
// totalCount := pollresp.Data.Attributes.Total_count
// completedCount := pollresp.Data.Attributes.Completed_count
failCount := pollresp.Data.Attributes.Failed_count
status := pollresp.Data.Attributes.Status
fmt.Println("Status: ", status)
fmt.Println("TotalCount: ", totalCount)
fmt.Println("CompletedCount: ", completedCount)
fmt.Println("FailCount: ", failCount)
// fmt.Println("Status: ", status)
// fmt.Println("TotalCount: ", totalCount)
// fmt.Println("CompletedCount: ", completedCount)
// fmt.Println("FailCount: ", failCount)

defer func() { _ = resp.Body.Close() }()

switch status {
case "queued", "processing":
pp.Println("inside processing")
// pp.Println("inside processing")
return common.PollStatusResponse{
Complete: false,
InProgress: true,
}
case "complete":
pp.Println("inside complete")
// pp.Println("inside complete")
if failCount > 0 {
pp.Println("inside fail")
// pp.Println("inside fail")
// pp.Println(importId)
return common.PollStatusResponse{
Complete: true,
Expand All @@ -169,7 +167,7 @@ func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStat
FailedJobURLs: importId,
}
} else {
pp.Println("inside success")
// pp.Println("inside success")
return common.PollStatusResponse{
Complete: true,
InProgress: false,
Expand All @@ -179,7 +177,7 @@ func (kbu *KlaviyoBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStat
}
}
default:
pp.Print("inside default")
// pp.Print("inside default")
return common.PollStatusResponse{
Complete: false,
StatusCode: 500,
Expand All @@ -202,13 +200,13 @@ func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUpload
for _, job := range importingList {
jobIDs = append(jobIDs, job.JobID)
}
pp.Println("JobIDs: ", jobIDs)
// pp.Println("JobIDs: ", jobIDs)

// pp.Println(privateApiKey)
// pp.Println(pollResult)
// pp.Println(importingList)
ErrorMap := kbu.jobIdToIdentifierMap
pp.Println("ErrorMap: ", ErrorMap)
// pp.Println("ErrorMap: ", ErrorMap)

importErrorUrl := "https://a.klaviyo.com/api/profile-bulk-import-jobs/" + pollResultImportId + "/import-errors"
req, err := http.NewRequest("GET", importErrorUrl, nil)
Expand Down Expand Up @@ -240,7 +238,7 @@ func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUpload

uploadStatsBodyBytesErr := json.Unmarshal(uploadStatsBodyBytes, &uploadStatsResp)
if uploadStatsBodyBytesErr != nil {
fmt.Println("Reached here!!!")
// fmt.Println("Reached here!!!")
return common.GetUploadStatsResponse{
StatusCode: 400,
Error: uploadStatsBodyBytesErr.Error(),
Expand All @@ -249,9 +247,9 @@ func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUpload
// Iterate over the Data array and get the jobId and error detail and store in jobIdToErrorMap
x := 1
for _, item := range uploadStatsResp.Data {
pp.Println("loop counter", x)
// pp.Println("loop counter", x)
x++
pp.Println("Item: ", item)
// pp.Println("Item: ", item)
orgPayload := item.Attributes.OriginalPayload
var identifierId string
if orgPayload.Id != "" {
Expand All @@ -262,8 +260,8 @@ func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUpload
jobId := ErrorMap[identifierId]
failedJobIds = append(failedJobIds, jobId)
errorDetail := item.Attributes.Detail
fmt.Print("JobId: ", jobId)
fmt.Println("--Error Detail: ", errorDetail)
// fmt.Print("JobId: ", jobId)
// fmt.Println("--Error Detail: ", errorDetail)

// Store the jobId and errorDetail in the map
jobIdToErrorMap[jobId] = errorDetail
Expand All @@ -286,9 +284,9 @@ func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUpload
// pp.Println("Failed Job Ids: ", failedJobIds)

// var uploadStatusResponse common.GetUploadStatsResponse
fmt.Println("Failed Job Ids: ", failedJobIds)
fmt.Println("JobIdToErrorMap: ", jobIdToErrorMap)
fmt.Println("SucceededKeys: ", successKeys)
// fmt.Println("Failed Job Ids: ", failedJobIds)
// fmt.Println("JobIdToErrorMap: ", jobIdToErrorMap)
// fmt.Println("SucceededKeys: ", successKeys)
return common.GetUploadStatsResponse{
StatusCode: 200,
Error: "The import job failed",
Expand All @@ -299,6 +297,7 @@ func (kbu *KlaviyoBulkUploader) GetUploadStats(UploadStatsInput common.GetUpload
},
}
}

func (kbu *KlaviyoBulkUploader) generateKlaviyoErrorOutput(errorString string, err error, importingJobIds []int64, destinationID string) common.AsyncUploadOutput {
eventsAbortedStat := stats.Default.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{
"module": "batch_router",
Expand Down Expand Up @@ -349,7 +348,7 @@ func (kbu *KlaviyoBulkUploader) ExtractProfiles(input map[string]interface{}) ([
}
attributes, _ := profileMap["attributes"].(map[string]interface{})
jobIdentifier := attributes["jobIdentifier"].(string)
pp.Println("Job Identifier:", jobIdentifier)
// pp.Println("Job Identifier:", jobIdentifier)
// split the jobIdentifier by : and store before : into jobId and after : into Identifier into the jobIdToIdentifierMap
jobIdentifierArray := strings.Split(jobIdentifier, ":")
jobIdentifierValue, _ := strconv.ParseInt(jobIdentifierArray[1], 10, 64)
Expand Down Expand Up @@ -473,7 +472,7 @@ func (kbu *KlaviyoBulkUploader) Upload(asyncDestStruct *common.AsyncDestinationS
return kbu.generateKlaviyoErrorOutput("Error while marshaling combined JSON.", err, importingJobIDs, destinationID)
}
outputFilePath := "combined_payload.json"
if err := os.WriteFile(outputFilePath, outputJSON, 0644); err != nil {
if err := os.WriteFile(outputFilePath, outputJSON, 0o644); err != nil {
return kbu.generateKlaviyoErrorOutput("Error while writing JSON to file.", err, importingJobIDs, destinationID)
}
// pp.Println(filePath)
Expand Down

0 comments on commit b36167e

Please sign in to comment.