Skip to content

Commit

Permalink
chore: some more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 4, 2023
1 parent 2865a89 commit dba45cd
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 63 deletions.
3 changes: 2 additions & 1 deletion warehouse/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/rudderlabs/rudder-server/warehouse/trigger"
"net/http"
"os"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"golang.org/x/exp/slices"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc"
Expand Down
3 changes: 2 additions & 1 deletion warehouse/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"encoding/json"
"errors"
"github.com/rudderlabs/rudder-server/warehouse/trigger"
"net"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"github.com/golang/mock/gomock"
"github.com/ory/dockertest/v3"
"github.com/samber/lo"
Expand Down
7 changes: 4 additions & 3 deletions warehouse/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/rudderlabs/rudder-server/warehouse/trigger"
"net"
"net/http"
"strconv"
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"github.com/rudderlabs/rudder-server/services/notifier"

"github.com/bugsnag/bugsnag-go/v2"
Expand Down Expand Up @@ -128,7 +129,7 @@ func (a *Api) Start(ctx context.Context) error {
if isStandAlone(a.mode) {
srvMux.Get("/health", a.healthHandler)
}
if a.config.runningMode != degradedMode {
if !isDegraded(a.config.runningMode) {
if isMaster(a.mode) {
a.addMasterEndpoints(ctx, srvMux)

Expand Down Expand Up @@ -184,7 +185,7 @@ func (a *Api) healthHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), a.config.healthTimeout)
defer cancel()

if a.config.runningMode != degradedMode {
if !isDegraded(a.config.runningMode) {
if !checkHealth(ctx, a.notifier.GetDBHandle()) {
http.Error(w, "Cannot connect to notifierService", http.StatusInternalServerError)
return
Expand Down
15 changes: 8 additions & 7 deletions warehouse/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/rudderlabs/rudder-server/warehouse/trigger"
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"github.com/rudderlabs/rudder-server/services/notifier"

"github.com/rudderlabs/rudder-server/utils/httputil"
Expand Down Expand Up @@ -735,11 +736,11 @@ func TestHTTPApi(t *testing.T) {

t.Run("endpoints", func(t *testing.T) {
t.Run("normal mode", func(t *testing.T) {
wenPort, err := kithelper.GetFreePort()
webPort, err := kithelper.GetFreePort()
require.NoError(t, err)

c := config.New()
c.Set("Warehouse.webPort", wenPort)
c.Set("Warehouse.webPort", webPort)

srvCtx, stopServer := context.WithCancel(ctx)

Expand All @@ -752,7 +753,7 @@ func TestHTTPApi(t *testing.T) {
close(serverSetupCh)
}()

serverURL := fmt.Sprintf("http://localhost:%d", wenPort)
serverURL := fmt.Sprintf("http://localhost:%d", webPort)

t.Run("health", func(t *testing.T) {
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -932,11 +933,11 @@ func TestHTTPApi(t *testing.T) {
})

t.Run("degraded mode", func(t *testing.T) {
wenPort, err := kithelper.GetFreePort()
webPort, err := kithelper.GetFreePort()
require.NoError(t, err)

c := config.New()
c.Set("Warehouse.webPort", wenPort)
c.Set("Warehouse.webPort", webPort)
c.Set("Warehouse.runningMode", degradedMode)

srvCtx, stopServer := context.WithCancel(ctx)
Expand All @@ -950,7 +951,7 @@ func TestHTTPApi(t *testing.T) {
close(serverSetupCh)
}()

serverURL := fmt.Sprintf("http://localhost:%d", wenPort)
serverURL := fmt.Sprintf("http://localhost:%d", webPort)

t.Run("health endpoint should work", func(t *testing.T) {
require.Eventually(t, func() bool {
Expand Down
1 change: 0 additions & 1 deletion warehouse/internal/repo/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package repo_test

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sync/atomic"
Expand Down
4 changes: 4 additions & 0 deletions warehouse/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ func isSlave(mode string) bool {
func isStandAloneSlave(mode string) bool {
return mode == config.SlaveMode
}

func isDegraded(runningMode string) bool {
return runningMode == degradedMode
}
8 changes: 4 additions & 4 deletions warehouse/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"errors"
"fmt"
"github.com/rudderlabs/rudder-server/warehouse/trigger"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"github.com/lib/pq"

"github.com/rudderlabs/rudder-server/services/notifier"
Expand Down Expand Up @@ -717,11 +718,10 @@ func (r *router) uploadFrequencyExceeded(warehouse model.Warehouse, syncFrequenc
defer r.createJobMarkerMapLock.RUnlock()

lastCreatedAt, ok := r.createJobMarkerMap[warehouse.Identifier]
if ok && r.now().Sub(lastCreatedAt) > time.Duration(freqInS)*time.Second {
if !ok {
return true
}

return false
return r.now().Sub(lastCreatedAt) > time.Duration(freqInS)*time.Second
}

func (r *router) uploadFreqInS(syncFrequency string) int64 {
Expand Down
8 changes: 4 additions & 4 deletions warehouse/router_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func (r *router) canCreateUpload(ctx context.Context, warehouse model.Warehouse)

if r.config.warehouseSyncFreqIgnore {
if r.uploadFrequencyExceeded(warehouse, "") {
return false, fmt.Errorf("ignore sync freq: upload frequency exceeded")
return true, nil
}
return true, nil
return false, fmt.Errorf("ignore sync freq: upload frequency exceeded")
}

// gets exclude window start time and end time
Expand All @@ -58,9 +58,9 @@ func (r *router) canCreateUpload(ctx context.Context, warehouse model.Warehouse)
syncStartAt := warehouseutils.GetConfigValue(warehouseutils.SyncStartAt, warehouse)
if syncFrequency == "" || syncStartAt == "" {
if r.uploadFrequencyExceeded(warehouse, syncFrequency) {
return false, fmt.Errorf("upload frequency exceeded")
} else {
return true, nil
} else {
return false, fmt.Errorf("upload frequency exceeded")
}
}

Expand Down
41 changes: 28 additions & 13 deletions warehouse/router_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"github.com/rudderlabs/rudder-server/warehouse/trigger"
"strconv"
"testing"
"time"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"github.com/ory/dockertest/v3"

"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource"
Expand Down Expand Up @@ -232,8 +233,8 @@ func TestRouter_CanCreateUpload(t *testing.T) {
r.updateCreateJobMarker(w, now.Add(-time.Hour))

canCreate, err := r.canCreateUpload(context.Background(), w)
require.EqualError(t, err, "ignore sync freq: upload frequency exceeded")
require.False(t, canCreate)
require.NoError(t, err)
require.True(t, canCreate)
})

t.Run("upload frequency not exceeded", func(t *testing.T) {
Expand All @@ -251,11 +252,12 @@ func TestRouter_CanCreateUpload(t *testing.T) {
r.config.warehouseSyncFreqIgnore = true
r.createJobMarkerMap = make(map[string]time.Time)
r.triggerStore = trigger.NewStore()
r.updateCreateJobMarker(w, now.Add(-time.Hour))

r.updateCreateJobMarker(w, now)

canCreate, err := r.canCreateUpload(context.Background(), w)
require.NoError(t, err)
require.True(t, canCreate)
require.EqualError(t, err, "ignore sync freq: upload frequency exceeded")
require.False(t, canCreate)
})
})

Expand Down Expand Up @@ -291,13 +293,21 @@ func TestRouter_CanCreateUpload(t *testing.T) {
},
}

now := time.Now()

r := router{}
r.now = func() time.Time {
return now
}
r.config.uploadFreqInS = 1800
r.createJobMarkerMap = make(map[string]time.Time)
r.triggerStore = trigger.NewStore()
r.now = time.Now

r.updateCreateJobMarker(w, time.Now())

canCreate, err := r.canCreateUpload(context.Background(), w)
require.Nil(t, err)
require.True(t, canCreate)
require.EqualError(t, err, "upload frequency exceeded")
require.False(t, canCreate)
})

t.Run("no sync start at and frequency exceeded", func(t *testing.T) {
Expand All @@ -308,16 +318,21 @@ func TestRouter_CanCreateUpload(t *testing.T) {
},
}

now := time.Now()

r := router{}
r.now = time.Now
r.now = func() time.Time {
return now
}
r.config.uploadFreqInS = 1800
r.triggerStore = trigger.NewStore()
r.createJobMarkerMap = make(map[string]time.Time)

r.updateCreateJobMarker(w, time.Now())
r.updateCreateJobMarker(w, now.Add(-time.Hour))

canCreate, err := r.canCreateUpload(context.Background(), w)
require.EqualError(t, err, "upload frequency exceeded")
require.False(t, canCreate)
require.NoError(t, err)
require.True(t, canCreate)
})

t.Run("last created at", func(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions warehouse/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package warehouse
import (
"context"
"fmt"
"github.com/rudderlabs/rudder-server/warehouse/trigger"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/services/notifier"
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestRouter(t *testing.T) {

db := sqlmiddleware.New(pgResource.DB)

now := time.Date(2021, 1, 1, 0, 0, 3, 0, time.UTC)
now := time.Now()

repoUpload := repo.NewUploads(db, repo.WithNow(func() time.Time {
return now
Expand Down Expand Up @@ -185,7 +186,9 @@ func TestRouter(t *testing.T) {
}

r := router{}
r.now = time.Now
r.now = func() time.Time {
return now
}
r.dbHandle = db
r.uploadRepo = repoUpload
r.stagingRepo = repoStaging
Expand Down Expand Up @@ -244,7 +247,6 @@ func TestRouter(t *testing.T) {
})

t.Run("merge existing upload", func(t *testing.T) {
r.setDestInProgress(warehouse, 1)
r.updateCreateJobMarker(warehouse, now.Add(-time.Hour))

stagingFiles := append(stagingFiles, createStagingFiles(t, ctx, repoStaging, workspaceID, sourceID, destinationID)...)
Expand Down
6 changes: 4 additions & 2 deletions warehouse/trigger/trigger_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package trigger_test

import (
"github.com/rudderlabs/rudder-server/warehouse/trigger"
"github.com/stretchr/testify/require"
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/warehouse/trigger"
)

func TestTrigger(t *testing.T) {
Expand Down
15 changes: 6 additions & 9 deletions warehouse/warehouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/rudderlabs/rudder-server/warehouse/trigger"
"os"
"time"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"github.com/rudderlabs/rudder-server/services/notifier"

"github.com/rudderlabs/rudder-server/warehouse/encoding"
Expand Down Expand Up @@ -121,8 +122,8 @@ func NewApp(
}

func (a *App) Setup(ctx context.Context) error {
// do not setup warehouse service if rudder core is not in normal mode and warehouse is running in same process as rudder core
if !isStandAlone(a.config.runningMode) && !db.IsNormalMode() {
// do not set up warehouse service if rudder core is not in normal mode and warehouse is running in same process as rudder core
if !isStandAlone(a.config.warehouseMode) && !db.IsNormalMode() {
return nil
}

Expand Down Expand Up @@ -289,12 +290,8 @@ func (a *App) setupTables() error {

// Start starts the warehouse service
func (a *App) Start(ctx context.Context) error {
if !isStandAloneSlave(a.config.runningMode) {
return errors.New("warehouse service cannot start, database connection is not setup")
}

// do not start warehouse service if rudder core is not in normal mode and warehouse is running in same process as rudder core
if !isStandAlone(a.config.runningMode) && !db.IsNormalMode() {
if !isStandAlone(a.config.warehouseMode) && !db.IsNormalMode() {
a.logger.Infof("Skipping start of warehouse service...")
return nil
}
Expand All @@ -320,7 +317,7 @@ func (a *App) Start(ctx context.Context) error {
return nil
})

if a.config.runningMode == degradedMode {
if isDegraded(a.config.runningMode) {
a.logger.Infof("WH: Running warehouse service in degraded mode...")

g.Go(func() error {
Expand Down
Loading

0 comments on commit dba45cd

Please sign in to comment.