Skip to content

Commit

Permalink
chore: improve error logs during processor stash test (#4317)
Browse files Browse the repository at this point in the history
chore: improve error logs during processor stash test:
  • Loading branch information
achettyiitr committed Jan 16, 2024
1 parent 0771999 commit a09f39f
Showing 1 changed file with 16 additions and 17 deletions.
33 changes: 16 additions & 17 deletions processor/stash/stash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"

"github.com/google/uuid"
Expand All @@ -21,7 +23,6 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/testhelper"
)

var prefix = "proc_error_jobs_"
Expand All @@ -35,12 +36,10 @@ func TestStoreErrorsToObjectStorage(t *testing.T) {
// running minio container on docker
pool, err := dockertest.NewPool("")
require.NoError(t, err, "Failed to create docker pool")
cleanup := &testhelper.Cleanup{}
defer cleanup.Run()

minioResource := make([]*resource.MinioResource, uniqueWorkspaces)
for i := 0; i < uniqueWorkspaces; i++ {
minioResource[i], err = resource.SetupMinio(pool, cleanup)
minioResource[i], err = resource.SetupMinio(pool, t)
require.NoError(t, err)
}

Expand Down Expand Up @@ -107,7 +106,7 @@ func TestStoreErrorsToObjectStorage(t *testing.T) {
},
}

fileuploaderProvider := fileuploader.NewStaticProvider(storageSettings)
fileUploaderProvider := fileuploader.NewStaticProvider(storageSettings)

jobs := []*jobsdb.JobT{
{
Expand Down Expand Up @@ -137,7 +136,7 @@ func TestStoreErrorsToObjectStorage(t *testing.T) {
}

st := New()
st.fileuploader = fileuploaderProvider
st.fileuploader = fileUploaderProvider
st.logger = logger.NOP

jobsCount := countJobsByWorkspace(jobs)
Expand All @@ -156,16 +155,18 @@ func TestStoreErrorsToObjectStorage(t *testing.T) {
return true
}
if len(file) != 1 {
t.Log("file list: ", file, " err: ", err, "len: ", len(file))
fm, err = fileuploaderProvider.GetFileManager(workspace)
t.Logf("file list: %+v err: %v", lo.Map(file, func(item *filemanager.FileInfo, _ int) string {
return item.Key
}), err)
fm, err = fileUploaderProvider.GetFileManager(workspace)
require.NoError(t, err)
return false
}
return true
}, 20*time.Second, 1*time.Second, "no backup files found in backup store: ", err)

if storageSettings[workspace].Preferences.ProcErrors {
f := downloadFile(t, fm, file[0].Key, cleanup)
f := downloadFile(t, fm, file[0].Key)
jobsFromFile, err := readGzipJobFile(f.Name())
require.NoError(t, err)
require.NotZero(t, jobsCount[workspace], "jobsCount for workspace: ", workspace, " is zero")
Expand Down Expand Up @@ -203,7 +204,7 @@ func readGzipJobFile(filename string) ([]*jobsdb.JobT, error) {
if err != nil {
return []*jobsdb.JobT{}, err
}
defer gz.Close()
defer func() { _ = gz.Close() }()

sc := bufio.NewScanner(gz)
// default scanner buffer maxCapacity is 64K
Expand All @@ -212,12 +213,11 @@ func readGzipJobFile(filename string) ([]*jobsdb.JobT, error) {
buf := make([]byte, maxCapacity)
sc.Buffer(buf, maxCapacity)

jobs := []*jobsdb.JobT{}
var jobs []*jobsdb.JobT
for sc.Scan() {
lineByte := sc.Bytes()
uuid := uuid.MustParse("69359037-9599-48e7-b8f2-48393c019135")
job := &jobsdb.JobT{
UUID: uuid,
UUID: uuid.MustParse("69359037-9599-48e7-b8f2-48393c019135"),
JobID: gjson.GetBytes(lineByte, "job_id").Int(),
UserID: gjson.GetBytes(lineByte, "user_id").String(),
CustomVal: gjson.GetBytes(lineByte, "custom_val").String(),
Expand All @@ -231,7 +231,7 @@ func readGzipJobFile(filename string) ([]*jobsdb.JobT, error) {
return jobs, nil
}

func downloadFile(t *testing.T, fm filemanager.FileManager, fileToDownload string, cleanup *testhelper.Cleanup) *os.File {
func downloadFile(t *testing.T, fm filemanager.FileManager, fileToDownload string) *os.File {
file, err := os.CreateTemp("", "backedupfile")
require.NoError(t, err, "expected no error while creating temporary file")

Expand All @@ -240,12 +240,11 @@ func downloadFile(t *testing.T, fm filemanager.FileManager, fileToDownload strin

// reopening the file so to reset the pointer
// since file.Seek(0, io.SeekStart) doesn't work
file.Close()
_ = file.Close()
file, err = os.Open(file.Name())
require.NoError(t, err, "expected no error while reopening downloaded file")

require.NoError(t, err)
cleanup.Cleanup(func() {
t.Cleanup(func() {
_ = file.Close()
_ = os.Remove(file.Name())
})
Expand Down

0 comments on commit a09f39f

Please sign in to comment.