Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove readonlyjobsdb and some status handlers #3168

Merged
merged 3 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 2 additions & 107 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,12 @@ import (
"net/rpc"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"strings"
"sync"
"time"

"github.com/spf13/viper"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/services/db"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
)
Expand All @@ -67,17 +62,8 @@ func RegisterAdminHandler(name string, handler interface{}) {
_ = instance.rpcServer.RegisterName(name, handler) // @TODO fix ignored error
}

// RegisterStatusHandler expects object implementing PackageStatusHandler interface
func RegisterStatusHandler(name string, handler PackageStatusHandler) {
instance.statusHandlersMutex.Lock()
instance.statusHandlers[strings.ToLower(name)] = handler
instance.statusHandlersMutex.Unlock()
}

type Admin struct {
statusHandlersMutex sync.RWMutex
statusHandlers map[string]PackageStatusHandler
rpcServer *rpc.Server
rpcServer *rpc.Server
}

var (
Expand All @@ -87,103 +73,12 @@ var (

func Init() {
instance = &Admin{
statusHandlers: make(map[string]PackageStatusHandler),
rpcServer: rpc.NewServer(),
rpcServer: rpc.NewServer(),
}
_ = instance.rpcServer.Register(instance) // @TODO fix ignored error
pkgLogger = logger.NewLogger().Child("admin")
}

// Status reports overall server status by fetching status of all registered admin handlers
func (a *Admin) Status(_ struct{}, reply *string) (err error) {
defer func() {
if r := recover(); r != nil {
pkgLogger.Error(r)
err = fmt.Errorf("internal Rudder server error: %v", r)
}
}()
statusObj := make(map[string]interface{})
statusObj["server-mode"] = db.CurrentMode

a.statusHandlersMutex.RLock()
for moduleName, handler := range a.statusHandlers {
statusObj[moduleName] = handler.Status()
}
a.statusHandlersMutex.RUnlock()
formattedOutput, err := json.MarshalIndent(statusObj, "", " ")
*reply = string(formattedOutput)
return err
}

// PrintStack fetches stack traces of all running goroutines
func (*Admin) PrintStack(_ struct{}, reply *string) (err error) {
defer func() {
if r := recover(); r != nil {
pkgLogger.Error(r)
err = fmt.Errorf("internal Rudder server error: %v", r)
}
}()
byteArr := make([]byte, 2048*1024)
n := runtime.Stack(byteArr, true)
*reply = string(byteArr[:n])
return nil
}

// HeapDump creates heap profile at given path using pprof
func (*Admin) HeapDump(path, reply *string) (err error) {
defer func() {
if r := recover(); r != nil {
pkgLogger.Error(r)
err = fmt.Errorf("internal Rudder server error: %v", r)
}
}()
f, err := os.OpenFile(*path, os.O_RDWR|os.O_CREATE, 0o755)
if err != nil {
return err
}
defer func() { _ = f.Close() }()
_ = pprof.Lookup("heap").WriteTo(f, 1)
*reply = "Heap profile written to " + *path
return nil
}

// StartCpuProfile starts writing cpu profile at given path using pprof
func (*Admin) StartCpuProfile(path, reply *string) (err error) {
defer func() {
if r := recover(); r != nil {
pkgLogger.Error(r)
err = fmt.Errorf("internal Rudder server error: %v", r)
}
}()
f, err := os.OpenFile(*path, os.O_RDWR|os.O_CREATE, 0o755)
if err != nil {
return err
}
pkgLogger.Info("Starting cpu profile. Writing to ", *path)
err = pprof.StartCPUProfile(f)
if err != nil {
pkgLogger.Info("StartCPUProfile threw error. Cpu profiling may already be running or some other error occurred.")
*reply = err.Error()
} else {
*reply = "Cpu profile is being written to " + *path
}
return nil
}

// StopCpuProfile stops writing already cpu profile
func (*Admin) StopCpuProfile(_ struct{}, reply *string) (err error) {
defer func() {
if r := recover(); r != nil {
pkgLogger.Error(r)
err = fmt.Errorf("internal Rudder server error: %v", r)
}
}()
pkgLogger.Info("Stopping cpu profile")
pprof.StopCPUProfile()
*reply = "Cpu profile stopped."
return nil
}

// ServerConfig fetches current configuration as set in viper
func (*Admin) ServerConfig(_ struct{}, reply *string) (err error) {
defer func() {
Expand Down
6 changes: 0 additions & 6 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
}
a.log.Info("Embedded mode: Starting Rudder Core")

readonlyGatewayDB, err := setupReadonlyDBs()
if err != nil {
return err
}

g, ctx := errgroup.WithContext(ctx)

deploymentType, err := deployment.GetFromEnv()
Expand Down Expand Up @@ -271,7 +266,6 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
}
defer gatewayDB.Stop()

gw.SetReadonlyDB(readonlyGatewayDB)
err = gw.Setup(
ctx,
a.app, backendconfig.DefaultBackendConfig, gatewayDB,
Expand Down
6 changes: 0 additions & 6 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
}
a.log.Info("Gateway starting")

readonlyGatewayDB, err := setupReadonlyDBs()
if err != nil {
return err
}

deploymentType, err := deployment.GetFromEnv()
if err != nil {
return fmt.Errorf("failed to get deployment type: %v", err)
Expand Down Expand Up @@ -113,7 +108,6 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
if err != nil {
return fmt.Errorf("failed to create rate limiter: %w", err)
}
gw.SetReadonlyDB(readonlyGatewayDB)
rsourcesService, err := NewRsourcesService(deploymentType)
if err != nil {
return err
Expand Down
3 changes: 0 additions & 3 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
}
a.log.Info("Processor starting")

if _, err := setupReadonlyDBs(); err != nil {
return err
}
g, ctx := errgroup.WithContext(ctx)

deploymentType, err := deployment.GetFromEnv()
Expand Down
34 changes: 0 additions & 34 deletions app/apphandlers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@ import (
"context"
"fmt"
"net/http"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
"github.com/rudderlabs/rudder-server/app/cluster/state"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/validators"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -56,35 +51,6 @@ func rudderCoreWorkSpaceTableSetup() error {
return validators.CheckAndValidateWorkspaceToken()
}

func setupReadonlyDBs() (gw *jobsdb.ReadonlyHandleT, err error) {
if diagnostics.EnableServerStartMetric {
diagnostics.Diagnostics.Track(diagnostics.ServerStart, map[string]interface{}{
diagnostics.ServerStart: fmt.Sprint(time.Unix(misc.AppStartTime, 0)),
})
}
var gwDB, rtDB, batchrtDB, procerrDB jobsdb.ReadonlyHandleT

if err := gwDB.Setup("gw"); err != nil {
return nil, fmt.Errorf("setting up gw readonly db: %w", err)
}
gw = &gwDB

if err := rtDB.Setup("rt"); err != nil {
return nil, fmt.Errorf("setting up gw readonly db: %w", err)
}
if err := batchrtDB.Setup("batch_rt"); err != nil {
return nil, fmt.Errorf("setting up batch_rt readonly db: %w", err)
}
router.RegisterAdminHandlers(&rtDB, &batchrtDB)

if err := procerrDB.Setup("proc_error"); err != nil {
return nil, fmt.Errorf("setting up proc_error readonly db: %w", err)
}
processor.RegisterAdminHandlers(&procerrDB)

return
}

// NewRsourcesService produces a rsources.JobService through environment configuration (env variables & config file)
func NewRsourcesService(deploymentType deployment.Type) (rsources.JobService, error) {
var rsourcesConfig rsources.JobServiceConfig
Expand Down
3 changes: 0 additions & 3 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,9 @@ func initJobsDB() {
admin.Init()
jobsdb.Init()
jobsdb.Init2()
jobsdb.Init3()
archiver.Init()
router.Init()
router.InitRouterAdmin()
batchrouter.Init()
batchrouter.Init2()
Init()
}

Expand Down
93 changes: 0 additions & 93 deletions backend-config/admin.go

This file was deleted.

9 changes: 0 additions & 9 deletions backend-config/backend-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
adminpkg "github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/backend-config/internal/cache"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
Expand Down Expand Up @@ -245,13 +244,6 @@ func (bc *backendConfigImpl) pollConfigUpdate(ctx context.Context, workspaces st
}
}

func getConfig() map[string]ConfigT {
bc, _ := DefaultBackendConfig.(*backendConfigImpl)
bc.curSourceJSONLock.RLock()
defer bc.curSourceJSONLock.RUnlock()
return bc.curSourceJSON
}

/*
Subscribe subscribes a channel to a specific topic of backend config updates.

Expand Down Expand Up @@ -317,7 +309,6 @@ func Setup(configEnvHandler types.ConfigEnvI) (err error) {

DefaultBackendConfig = backendConfig

adminpkg.RegisterAdminHandler("BackendConfig", &admin{})
return nil
}

Expand Down
1 change: 0 additions & 1 deletion event-schema/event_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func jobsDBInit(es envSetter, pgResource *resource.PostgresResource) {

admin.Init()
jobsdb.Init()
jobsdb.Init3()
jobsdb.Init2()
}

Expand Down
Loading