Skip to content

Commit

Permalink
chore: warehouse separate packages
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 4, 2023
1 parent dba45cd commit 2dbda7b
Show file tree
Hide file tree
Showing 50 changed files with 336 additions and 373 deletions.
7 changes: 4 additions & 3 deletions router/batchrouter/handle_observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/rudderlabs/rudder-server/warehouse/router"

"github.com/tidwall/sjson"

"github.com/rudderlabs/rudder-go-kit/stats"
Expand All @@ -13,7 +15,6 @@ import (
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/warehouse"
)

func (brt *Handle) collectMetrics(ctx context.Context) {
Expand Down Expand Up @@ -133,14 +134,14 @@ func (brt *Handle) recordDeliveryStatus(batchDestination Connection, output Uplo
jobState = jobsdb.Failed.State
errorCode = "500"
if isWarehouse {
jobState = warehouse.GeneratingStagingFileFailedState
jobState = router.GeneratingStagingFileFailedState
}
errorResp, _ = json.Marshal(ErrorResponse{Error: err.Error()})
} else {
jobState = jobsdb.Succeeded.State
errorCode = "200"
if isWarehouse {
jobState = warehouse.GeneratedStagingFileState
jobState = router.GeneratedStagingFileState
}
errorResp = []byte(`{"success":"OK"}`)
}
Expand Down
11 changes: 7 additions & 4 deletions warehouse/admin.go → warehouse/admin/admin.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package warehouse
package admin

import (
"context"
"errors"
"fmt"
"strings"

"github.com/rudderlabs/rudder-server/warehouse/backend_config"
"github.com/rudderlabs/rudder-server/warehouse/router"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand All @@ -21,7 +24,7 @@ import (
)

type Admin struct {
bcManager *backendConfigManager
bcManager *backend_config.BackendConfigManager
logger logger.Logger
}

Expand All @@ -40,7 +43,7 @@ type ConfigurationTestOutput struct {
Error string
}

func RegisterAdmin(bcManager *backendConfigManager, logger logger.Logger) {
func RegisterAdmin(bcManager *backend_config.BackendConfigManager, logger logger.Logger) {
admin.RegisterAdminHandler("Warehouse", &Admin{
bcManager: bcManager,
logger: logger.Child("admin"),
Expand All @@ -49,7 +52,7 @@ func RegisterAdmin(bcManager *backendConfigManager, logger logger.Logger) {

// TriggerUpload sets uploads to start without delay
func (*Admin) TriggerUpload(off bool, reply *string) error {
startUploadAlways.Store(!off)
router.StartUploadAlways.Store(!off)
if off {
*reply = "Turned off explicit warehouse upload triggers.\nWarehouse uploads will continue to be done as per schedule in control plane."
} else {
Expand Down
8 changes: 5 additions & 3 deletions warehouse/grpc.go → warehouse/api/grpc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package warehouse
package api

import (
"context"
Expand All @@ -8,6 +8,8 @@ import (
"net/http"
"os"

"github.com/rudderlabs/rudder-server/warehouse/backend_config"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"golang.org/x/exp/slices"
Expand Down Expand Up @@ -53,7 +55,7 @@ type GRPC struct {
cpClient cpclient.InternalControlPlane
connectionManager *controlplane.ConnectionManager
tenantManager *multitenant.Manager
bcManager *backendConfigManager
bcManager *backend_config.BackendConfigManager
tableUploadsRepo *repo.TableUploads
stagingRepo *repo.StagingFiles
uploadRepo *repo.Uploads
Expand All @@ -78,7 +80,7 @@ func NewGRPCServer(
logger logger.Logger,
db *sqlmw.DB,
tenantManager *multitenant.Manager,
bcManager *backendConfigManager,
bcManager *backend_config.BackendConfigManager,
triggerStore *trigger.Store,
) (*GRPC, error) {
g := &GRPC{
Expand Down
6 changes: 4 additions & 2 deletions warehouse/grpc_test.go → warehouse/api/grpc_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package warehouse
package api

import (
"context"
Expand All @@ -11,6 +11,8 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-server/warehouse/backend_config"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -146,7 +148,7 @@ func TestGRPC(t *testing.T) {

triggerStore := trigger.NewStore()
tenantManager := multitenant.New(c, mockBackendConfig)
bcManager := newBackendConfigManager(c, db, tenantManager, logger.NOP)
bcManager := backend_config.New(c, db, tenantManager, logger.NOP)
grpcServer, err := NewGRPCServer(c, logger.NOP, db, tenantManager, bcManager, triggerStore)
require.NoError(t, err)

Expand Down
19 changes: 11 additions & 8 deletions warehouse/http.go → warehouse/api/http.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package warehouse
package api

import (
"context"
Expand All @@ -12,6 +12,9 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-server/warehouse/backend_config"
"github.com/rudderlabs/rudder-server/warehouse/mode"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"github.com/rudderlabs/rudder-server/services/notifier"
Expand Down Expand Up @@ -71,7 +74,7 @@ type Api struct {
notifier *notifier.Notifier
bcConfig backendconfig.BackendConfig
tenantManager *multitenant.Manager
bcManager *backendConfigManager
bcManager *backend_config.BackendConfigManager
asyncManager *jobs.AsyncJobWh
stagingRepo *repo.StagingFiles
uploadRepo *repo.Uploads
Expand All @@ -96,7 +99,7 @@ func NewApi(
db *sqlmw.DB,
notifier *notifier.Notifier,
tenantManager *multitenant.Manager,
bcManager *backendConfigManager,
bcManager *backend_config.BackendConfigManager,
asyncManager *jobs.AsyncJobWh,
triggerStore *trigger.Store,
) *Api {
Expand Down Expand Up @@ -126,11 +129,11 @@ func NewApi(
func (a *Api) Start(ctx context.Context) error {
srvMux := chi.NewRouter()

if isStandAlone(a.mode) {
if mode.IsStandAlone(a.mode) {
srvMux.Get("/health", a.healthHandler)
}
if !isDegraded(a.config.runningMode) {
if isMaster(a.mode) {
if !mode.IsDegraded(a.config.runningMode) {
if mode.IsMaster(a.mode) {
a.addMasterEndpoints(ctx, srvMux)

a.logger.Infow("Starting warehouse master service on" + strconv.Itoa(a.config.webPort))
Expand Down Expand Up @@ -185,15 +188,15 @@ func (a *Api) healthHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), a.config.healthTimeout)
defer cancel()

if !isDegraded(a.config.runningMode) {
if !mode.IsDegraded(a.config.runningMode) {
if !checkHealth(ctx, a.notifier.GetDBHandle()) {
http.Error(w, "Cannot connect to notifierService", http.StatusInternalServerError)
return
}
notifierService = "UP"
}

if isMaster(a.mode) {
if mode.IsMaster(a.mode) {
if !checkHealth(ctx, a.db.DB) {
http.Error(w, "Cannot connect to dbService", http.StatusInternalServerError)
return
Expand Down
9 changes: 6 additions & 3 deletions warehouse/http_test.go → warehouse/api/http_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package warehouse
package api

import (
"bytes"
Expand All @@ -12,6 +12,9 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-server/warehouse/backend_config"
"github.com/rudderlabs/rudder-server/warehouse/mode"

"github.com/rudderlabs/rudder-server/warehouse/trigger"

"github.com/rudderlabs/rudder-server/services/notifier"
Expand Down Expand Up @@ -174,7 +177,7 @@ func TestHTTPApi(t *testing.T) {

tenantManager := multitenant.New(c, mockBackendConfig)

bcManager := newBackendConfigManager(config.Default, db, tenantManager, logger.NOP)
bcManager := backend_config.New(config.Default, db, tenantManager, logger.NOP)

triggerStore := trigger.NewStore()

Expand Down Expand Up @@ -938,7 +941,7 @@ func TestHTTPApi(t *testing.T) {

c := config.New()
c.Set("Warehouse.webPort", webPort)
c.Set("Warehouse.runningMode", degradedMode)
c.Set("Warehouse.runningMode", mode.DegradedMode)

srvCtx, stopServer := context.WithCancel(ctx)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package warehouse
package backend_config

import (
"context"
Expand All @@ -7,6 +7,9 @@ import (
"strings"
"sync"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/utils/misc"

"github.com/rudderlabs/rudder-server/warehouse/multitenant"

"github.com/rudderlabs/rudder-server/warehouse/logfield"
Expand All @@ -23,19 +26,19 @@ import (
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func newBackendConfigManager(
func New(
c *config.Config,
db *sqlquerywrapper.DB,
tenantManager *multitenant.Manager,
log logger.Logger,
) *backendConfigManager {
) *BackendConfigManager {
if c == nil {
c = config.Default
}
if log == nil {
log = logger.NOP
}
bcm := &backendConfigManager{
bcm := &BackendConfigManager{
conf: c,
db: db,
schema: repo.NewWHSchemas(db),
Expand All @@ -56,8 +59,8 @@ func newBackendConfigManager(
return bcm
}

// backendConfigManager is used to handle the backend configuration in the Warehouse
type backendConfigManager struct {
// BackendConfigManager is used to handle the backend configuration in the Warehouse
type BackendConfigManager struct {
conf *config.Config
db *sqlquerywrapper.DB
schema *repo.WHSchema
Expand All @@ -82,7 +85,11 @@ type backendConfigManager struct {
sourceIDsByWorkspaceMu sync.RWMutex
}

func (s *backendConfigManager) Start(ctx context.Context) {
func (s *BackendConfigManager) InitialConfigFetched() chan struct{} {
return s.initialConfigFetched
}

func (s *BackendConfigManager) Start(ctx context.Context) {
ch := s.tenantManager.WatchConfig(ctx)
for {
select {
Expand All @@ -97,7 +104,7 @@ func (s *backendConfigManager) Start(ctx context.Context) {
}
}

func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.Warehouse {
func (s *BackendConfigManager) Subscribe(ctx context.Context) <-chan []model.Warehouse {
s.subscriptionsMu.Lock()
defer s.subscriptionsMu.Unlock()

Expand Down Expand Up @@ -129,7 +136,7 @@ func (s *backendConfigManager) Subscribe(ctx context.Context) <-chan []model.War
return ch
}

func (s *backendConfigManager) processData(ctx context.Context, data map[string]backendconfig.ConfigT) {
func (s *BackendConfigManager) processData(ctx context.Context, data map[string]backendconfig.ConfigT) {
defer s.closeInitialConfigFetchedOnce.Do(func() {
close(s.initialConfigFetched)
})
Expand Down Expand Up @@ -209,7 +216,7 @@ func (s *backendConfigManager) processData(ctx context.Context, data map[string]
// 1. user set name from destinationConfig
// 2. from existing record in wh_schemas with same source + dest combo
// 3. convert source name
func (s *backendConfigManager) namespace(ctx context.Context, source backendconfig.SourceT, destination backendconfig.DestinationT) string {
func (s *BackendConfigManager) namespace(ctx context.Context, source backendconfig.SourceT, destination backendconfig.DestinationT) string {
destType := destination.DestinationDefinition.Name
destConfig := destination.Config

Expand Down Expand Up @@ -249,7 +256,7 @@ func (s *backendConfigManager) namespace(ctx context.Context, source backendconf
return namespace
}

func (s *backendConfigManager) IsInitialized() bool {
func (s *BackendConfigManager) IsInitialized() bool {
select {
case <-s.initialConfigFetched:
return true
Expand All @@ -258,27 +265,27 @@ func (s *backendConfigManager) IsInitialized() bool {
}
}

func (s *backendConfigManager) Connections() map[string]map[string]model.Warehouse {
func (s *BackendConfigManager) Connections() map[string]map[string]model.Warehouse {
s.connectionsMapMu.RLock()
defer s.connectionsMapMu.RUnlock()
return s.connectionsMap
}

func (s *backendConfigManager) ConnectionSourcesMap(destID string) (map[string]model.Warehouse, bool) {
func (s *BackendConfigManager) ConnectionSourcesMap(destID string) (map[string]model.Warehouse, bool) {
s.connectionsMapMu.RLock()
defer s.connectionsMapMu.RUnlock()
m, ok := s.connectionsMap[destID]
return m, ok
}

func (s *backendConfigManager) SourceIDsByWorkspace() map[string][]string {
func (s *BackendConfigManager) SourceIDsByWorkspace() map[string][]string {
s.sourceIDsByWorkspaceMu.RLock()
defer s.sourceIDsByWorkspaceMu.RUnlock()
return s.sourceIDsByWorkspace
}

// WarehousesBySourceID gets all WHs for the given source ID
func (s *backendConfigManager) WarehousesBySourceID(sourceID string) []model.Warehouse {
func (s *BackendConfigManager) WarehousesBySourceID(sourceID string) []model.Warehouse {
s.warehousesMu.RLock()
defer s.warehousesMu.RUnlock()

Expand All @@ -288,7 +295,7 @@ func (s *backendConfigManager) WarehousesBySourceID(sourceID string) []model.War
}

// WarehousesByDestID gets all WHs for the given destination ID
func (s *backendConfigManager) WarehousesByDestID(destID string) []model.Warehouse {
func (s *BackendConfigManager) WarehousesByDestID(destID string) []model.Warehouse {
s.warehousesMu.RLock()
defer s.warehousesMu.RUnlock()

Expand All @@ -297,7 +304,7 @@ func (s *backendConfigManager) WarehousesByDestID(destID string) []model.Warehou
})
}

func (s *backendConfigManager) attachSSHTunnellingInfo(
func (s *BackendConfigManager) attachSSHTunnellingInfo(
ctx context.Context,
upstream backendconfig.DestinationT,
) backendconfig.DestinationT {
Expand Down Expand Up @@ -331,3 +338,15 @@ func deepCopy(src, dest interface{}) error {
}
return json.Unmarshal(buf, dest)
}

func persistSSLFileErrorStat(workspaceID, destType, destName, destID, sourceName, sourceID, errTag string) {
tags := stats.Tags{
"workspaceId": workspaceID,
"module": "warehouse",
"destType": destType,
"warehouseID": misc.GetTagName(destID, sourceName, destName, misc.TailTruncateStr(sourceID, 6)),
"destinationID": destID,
"errTag": errTag,
}
stats.Default.NewTaggedStat("persist_ssl_file_failure", stats.CountType, tags).Count(1)
}
Loading

0 comments on commit 2dbda7b

Please sign in to comment.