Skip to content

Commit

Permalink
Merge branch 'master' into fix.redshift-create-table
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Apr 7, 2023
2 parents d3502a4 + 7d82354 commit 16f495e
Show file tree
Hide file tree
Showing 31 changed files with 41 additions and 1,948 deletions.
109 changes: 2 additions & 107 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,12 @@ import (
"net/rpc"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"strings"
"sync"
"time"

"github.com/spf13/viper"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/services/db"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
)
Expand All @@ -67,17 +62,8 @@ func RegisterAdminHandler(name string, handler interface{}) {
_ = instance.rpcServer.RegisterName(name, handler) // @TODO fix ignored error
}

// RegisterStatusHandler expects object implementing PackageStatusHandler interface
func RegisterStatusHandler(name string, handler PackageStatusHandler) {
instance.statusHandlersMutex.Lock()
instance.statusHandlers[strings.ToLower(name)] = handler
instance.statusHandlersMutex.Unlock()
}

type Admin struct {
statusHandlersMutex sync.RWMutex
statusHandlers map[string]PackageStatusHandler
rpcServer *rpc.Server
rpcServer *rpc.Server
}

var (
Expand All @@ -87,103 +73,12 @@ var (

func Init() {
instance = &Admin{
statusHandlers: make(map[string]PackageStatusHandler),
rpcServer: rpc.NewServer(),
rpcServer: rpc.NewServer(),
}
_ = instance.rpcServer.Register(instance) // @TODO fix ignored error
pkgLogger = logger.NewLogger().Child("admin")
}

// Status reports overall server status by fetching status of all registered admin handlers
func (a *Admin) Status(_ struct{}, reply *string) (err error) {
defer func() {
if r := recover(); r != nil {
pkgLogger.Error(r)
err = fmt.Errorf("internal Rudder server error: %v", r)
}
}()
statusObj := make(map[string]interface{})
statusObj["server-mode"] = db.CurrentMode

a.statusHandlersMutex.RLock()
for moduleName, handler := range a.statusHandlers {
statusObj[moduleName] = handler.Status()
}
a.statusHandlersMutex.RUnlock()
formattedOutput, err := json.MarshalIndent(statusObj, "", " ")
*reply = string(formattedOutput)
return err
}

// PrintStack fetches stack traces of all running goroutines
func (*Admin) PrintStack(_ struct{}, reply *string) (err error) {
defer func() {
if r := recover(); r != nil {
pkgLogger.Error(r)
err = fmt.Errorf("internal Rudder server error: %v", r)
}
}()
byteArr := make([]byte, 2048*1024)
n := runtime.Stack(byteArr, true)
*reply = string(byteArr[:n])
return nil
}

// HeapDump creates heap profile at given path using pprof
func (*Admin) HeapDump(path, reply *string) (err error) {
defer func() {
if r := recover(); r != nil {
pkgLogger.Error(r)
err = fmt.Errorf("internal Rudder server error: %v", r)
}
}()
f, err := os.OpenFile(*path, os.O_RDWR|os.O_CREATE, 0o755)
if err != nil {
return err
}
defer func() { _ = f.Close() }()
_ = pprof.Lookup("heap").WriteTo(f, 1)
*reply = "Heap profile written to " + *path
return nil
}

// StartCpuProfile starts writing cpu profile at given path using pprof
func (*Admin) StartCpuProfile(path, reply *string) (err error) {
defer func() {
if r := recover(); r != nil {
pkgLogger.Error(r)
err = fmt.Errorf("internal Rudder server error: %v", r)
}
}()
f, err := os.OpenFile(*path, os.O_RDWR|os.O_CREATE, 0o755)
if err != nil {
return err
}
pkgLogger.Info("Starting cpu profile. Writing to ", *path)
err = pprof.StartCPUProfile(f)
if err != nil {
pkgLogger.Info("StartCPUProfile threw error. Cpu profiling may already be running or some other error occurred.")
*reply = err.Error()
} else {
*reply = "Cpu profile is being written to " + *path
}
return nil
}

// StopCpuProfile stops writing already cpu profile
func (*Admin) StopCpuProfile(_ struct{}, reply *string) (err error) {
defer func() {
if r := recover(); r != nil {
pkgLogger.Error(r)
err = fmt.Errorf("internal Rudder server error: %v", r)
}
}()
pkgLogger.Info("Stopping cpu profile")
pprof.StopCPUProfile()
*reply = "Cpu profile stopped."
return nil
}

// ServerConfig fetches current configuration as set in viper
func (*Admin) ServerConfig(_ struct{}, reply *string) (err error) {
defer func() {
Expand Down
6 changes: 0 additions & 6 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
}
a.log.Info("Embedded mode: Starting Rudder Core")

readonlyGatewayDB, err := setupReadonlyDBs()
if err != nil {
return err
}

g, ctx := errgroup.WithContext(ctx)

deploymentType, err := deployment.GetFromEnv()
Expand Down Expand Up @@ -271,7 +266,6 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
}
defer gatewayDB.Stop()

gw.SetReadonlyDB(readonlyGatewayDB)
err = gw.Setup(
ctx,
a.app, backendconfig.DefaultBackendConfig, gatewayDB,
Expand Down
6 changes: 0 additions & 6 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
}
a.log.Info("Gateway starting")

readonlyGatewayDB, err := setupReadonlyDBs()
if err != nil {
return err
}

deploymentType, err := deployment.GetFromEnv()
if err != nil {
return fmt.Errorf("failed to get deployment type: %v", err)
Expand Down Expand Up @@ -113,7 +108,6 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
if err != nil {
return fmt.Errorf("failed to create rate limiter: %w", err)
}
gw.SetReadonlyDB(readonlyGatewayDB)
rsourcesService, err := NewRsourcesService(deploymentType)
if err != nil {
return err
Expand Down
3 changes: 0 additions & 3 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
}
a.log.Info("Processor starting")

if _, err := setupReadonlyDBs(); err != nil {
return err
}
g, ctx := errgroup.WithContext(ctx)

deploymentType, err := deployment.GetFromEnv()
Expand Down
34 changes: 0 additions & 34 deletions app/apphandlers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@ import (
"context"
"fmt"
"net/http"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
"github.com/rudderlabs/rudder-server/app/cluster/state"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/validators"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -56,35 +51,6 @@ func rudderCoreWorkSpaceTableSetup() error {
return validators.CheckAndValidateWorkspaceToken()
}

func setupReadonlyDBs() (gw *jobsdb.ReadonlyHandleT, err error) {
if diagnostics.EnableServerStartMetric {
diagnostics.Diagnostics.Track(diagnostics.ServerStart, map[string]interface{}{
diagnostics.ServerStart: fmt.Sprint(time.Unix(misc.AppStartTime, 0)),
})
}
var gwDB, rtDB, batchrtDB, procerrDB jobsdb.ReadonlyHandleT

if err := gwDB.Setup("gw"); err != nil {
return nil, fmt.Errorf("setting up gw readonly db: %w", err)
}
gw = &gwDB

if err := rtDB.Setup("rt"); err != nil {
return nil, fmt.Errorf("setting up gw readonly db: %w", err)
}
if err := batchrtDB.Setup("batch_rt"); err != nil {
return nil, fmt.Errorf("setting up batch_rt readonly db: %w", err)
}
router.RegisterAdminHandlers(&rtDB, &batchrtDB)

if err := procerrDB.Setup("proc_error"); err != nil {
return nil, fmt.Errorf("setting up proc_error readonly db: %w", err)
}
processor.RegisterAdminHandlers(&procerrDB)

return
}

// NewRsourcesService produces a rsources.JobService through environment configuration (env variables & config file)
func NewRsourcesService(deploymentType deployment.Type) (rsources.JobService, error) {
var rsourcesConfig rsources.JobServiceConfig
Expand Down
3 changes: 0 additions & 3 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,9 @@ func initJobsDB() {
admin.Init()
jobsdb.Init()
jobsdb.Init2()
jobsdb.Init3()
archiver.Init()
router.Init()
router.InitRouterAdmin()
batchrouter.Init()
batchrouter.Init2()
Init()
}

Expand Down
93 changes: 0 additions & 93 deletions backend-config/admin.go

This file was deleted.

9 changes: 0 additions & 9 deletions backend-config/backend-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
adminpkg "github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/backend-config/internal/cache"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
Expand Down Expand Up @@ -245,13 +244,6 @@ func (bc *backendConfigImpl) pollConfigUpdate(ctx context.Context, workspaces st
}
}

func getConfig() map[string]ConfigT {
bc, _ := DefaultBackendConfig.(*backendConfigImpl)
bc.curSourceJSONLock.RLock()
defer bc.curSourceJSONLock.RUnlock()
return bc.curSourceJSON
}

/*
Subscribe subscribes a channel to a specific topic of backend config updates.
Expand Down Expand Up @@ -317,7 +309,6 @@ func Setup(configEnvHandler types.ConfigEnvI) (err error) {

DefaultBackendConfig = backendConfig

adminpkg.RegisterAdminHandler("BackendConfig", &admin{})
return nil
}

Expand Down
1 change: 0 additions & 1 deletion event-schema/event_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func jobsDBInit(es envSetter, pgResource *resource.PostgresResource) {

admin.Init()
jobsdb.Init()
jobsdb.Init3()
jobsdb.Init2()
}

Expand Down

0 comments on commit 16f495e

Please sign in to comment.