Skip to content
This repository has been archived by the owner on Jan 19, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1133 from wwitzel3/issue-1129
Browse files Browse the repository at this point in the history
use options.KubeConfig to check if loading API is needed
  • Loading branch information
Sam Foo committed Jul 20, 2020
2 parents 1c3adf6 + d365645 commit 07e7f69
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 49 deletions.
2 changes: 2 additions & 0 deletions changelogs/unreleased/1129-wwitzel3
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed bug where the client would request from the loading API with a valid kubeconfig.
Fixed bug where --kubeconfig flag would not work.
123 changes: 74 additions & 49 deletions pkg/dash/dash.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/soheilhy/cmux"
"github.com/spf13/viper"
"go.opencensus.io/trace"
"k8s.io/client-go/tools/clientcmd"

"github.com/vmware-tanzu/octant/internal/api"
"github.com/vmware-tanzu/octant/internal/cluster"
Expand Down Expand Up @@ -68,6 +67,7 @@ type Runner struct {
moduleManager *module.Manager
actionManager *action.Manager
websocketClientManager *api.WebsocketClientManager
apiCreated bool
}

func NewRunner(ctx context.Context, logger log.Logger, options Options) (*Runner, error) {
Expand All @@ -92,13 +92,25 @@ func NewRunner(ctx context.Context, logger log.Logger, options Options) (*Runner
return nil, fmt.Errorf("use OCTANT_LISTENER_ADDR to set host:port: %w", err)
}

// Initialize the API
apiService, err := r.initLoadingAPI(ctx, logger, options)
if err != nil {
return nil, fmt.Errorf("failed to start loading api: %w", err)
var pluginService *pluginAPI.GRPCService
var apiService api.Service
var apiErr error

if err := findKubeConfig(logger, options.KubeConfig); err == nil {
apiService, pluginService, apiErr = r.initAPI(ctx, logger, options)
if apiErr != nil {
return nil, fmt.Errorf("failed to start service api: %w", apiErr)
}
} else {
logger.Infof("no valid kube config found, initializing loading API")
// Initialize the API
apiService, apiErr = r.initLoadingAPI(ctx, logger, options)
if apiErr != nil {
return nil, fmt.Errorf("failed to start loading api: %w", err)
}
}

d, err := newDash(listener, options.Namespace, options.FrontendURL, options.BrowserPath, apiService, logger)
d, err := newDash(listener, options.Namespace, options.FrontendURL, options.BrowserPath, apiService, pluginService, logger)
if err != nil {
return nil, fmt.Errorf("failed to create dash instance: %w", err)
}
Expand All @@ -121,16 +133,27 @@ func (r *Runner) Start(ctx context.Context, logger log.Logger, options Options,
return
}()

go func() {
if r.dash != nil {
options.KubeConfig = findKubeConfig(logger, kubeConfigPath)
if options.KubeConfig != "" {
logger.Debugf("Loading configuration: %v", options.KubeConfig)
go r.startAPIService(ctx, logger, options)
return
if !r.apiCreated {
go func() {
if r.dash != nil {
if err := findKubeConfig(logger, options.KubeConfig); err != nil {
logger.Infof("waiting for kube config ...")
options.KubeConfig = <-kubeConfigPath
}

if options.KubeConfig != "" {
logger.Debugf("Loading configuration: %v", options.KubeConfig)
r.startAPIService(ctx, logger, options)
return
} else {
logger.Errorf("unexpected empty kube config")
return
}
}
}
}()
}()
} else {
r.startAPIService(ctx, logger, options)
}

<-ctx.Done()

Expand All @@ -149,7 +172,7 @@ func (r *Runner) initLoadingAPI(ctx context.Context, logger log.Logger, _ Option
return apiService, nil
}

func (r *Runner) initAPI(ctx context.Context, logger log.Logger, options Options) (*api.API, error) {
func (r *Runner) initAPI(ctx context.Context, logger log.Logger, options Options) (*api.API, *pluginAPI.GRPCService, error) {
frontendProxy := pluginAPI.FrontendProxy{}

restConfigOptions := cluster.RESTConfigOptions{
Expand All @@ -159,19 +182,19 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, options Options
}
clusterClient, err := cluster.FromKubeConfig(ctx, options.KubeConfig, options.Context, options.Namespace, options.Namespaces, restConfigOptions)
if err != nil {
return nil, fmt.Errorf("failed to init cluster client: %w", err)
return nil, nil, fmt.Errorf("failed to init cluster client, does your kube config have a current-context set?: %w", err)
}

if options.EnableOpenCensus {
if err := enableOpenCensus(); err != nil {
logger.Infof("Enabling OpenCensus")
return nil, fmt.Errorf("enabling open census: %w", err)
return nil, nil, fmt.Errorf("enabling open census: %w", err)
}
}

nsClient, err := clusterClient.NamespaceClient()
if err != nil {
return nil, fmt.Errorf("failed to create namespace client: %w", err)
return nil, nil, fmt.Errorf("failed to create namespace client: %w", err)
}

// If not overridden, use initial namespace from current context in KUBECONFIG
Expand All @@ -183,12 +206,12 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, options Options

appObjectStore, err := initObjectStore(ctx, clusterClient)
if err != nil {
return nil, fmt.Errorf("initializing store: %w", err)
return nil, nil, fmt.Errorf("initializing store: %w", err)
}

errorStore, err := oerrors.NewErrorStore()
if err != nil {
return nil, fmt.Errorf("initializing error store: %w", err)
return nil, nil, fmt.Errorf("initializing error store: %w", err)
}

crdWatcher, err := describer.NewDefaultCRDWatcher(ctx, clusterClient, appObjectStore, errorStore)
Expand All @@ -199,13 +222,13 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, options Options
logger.Warnf("skipping CRD watcher due to access denied error starting watcher")
}
} else {
return nil, fmt.Errorf("initializing CRD watcher: %w", err)
return nil, nil, fmt.Errorf("initializing CRD watcher: %w", err)
}
}

portForwarder, err := initPortForwarder(ctx, clusterClient, appObjectStore)
if err != nil {
return nil, fmt.Errorf("initializing port forwarder: %w", err)
return nil, nil, fmt.Errorf("initializing port forwarder: %w", err)
}

mo := &moduleOptions{
Expand All @@ -216,7 +239,7 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, options Options
}
moduleManager, err := initModuleManager(mo)
if err != nil {
return nil, fmt.Errorf("init module manager: %w", err)
return nil, nil, fmt.Errorf("init module manager: %w", err)
}

r.moduleManager = moduleManager
Expand All @@ -230,7 +253,7 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, options Options

pluginManager, err := initPlugin(moduleManager, r.actionManager, pluginDashboardService)
if err != nil {
return nil, fmt.Errorf("initializing plugin manager: %w", err)
return nil, nil, fmt.Errorf("initializing plugin manager: %w", err)
}

r.pluginManager = pluginManager
Expand All @@ -256,36 +279,34 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, options Options
buildInfo)

if err := watchConfigs(ctx, dashConfig, options.KubeConfig); err != nil {
return nil, fmt.Errorf("set up config watcher: %w", err)
return nil, nil, fmt.Errorf("set up config watcher: %w", err)
}

moduleList, err := initModules(ctx, dashConfig, options.Namespace, options)
if err != nil {
return nil, fmt.Errorf("initializing modules: %w", err)
return nil, nil, fmt.Errorf("initializing modules: %w", err)
}

for _, mod := range moduleList {
if err := moduleManager.Register(mod); err != nil {
return nil, fmt.Errorf("loading module %s: %w", mod.Name(), err)
return nil, nil, fmt.Errorf("loading module %s: %w", mod.Name(), err)
}
}

if err := pluginManager.Start(ctx); err != nil {
return nil, fmt.Errorf("start plugin manager: %w", err)
return nil, nil, fmt.Errorf("start plugin manager: %w", err)
}

// Watch for CRDs after modules initialized
if err := crdWatcher.Watch(ctx); err != nil {
return nil, fmt.Errorf("unable to start CRD watcher: %w", err)
return nil, nil, fmt.Errorf("unable to start CRD watcher: %w", err)
}

apiService := api.New(ctx, api.PathPrefix, r.actionManager, r.websocketClientManager, dashConfig)
frontendProxy.FrontendUpdateController = apiService

r.dash.apiHandler = apiService
r.dash.pluginService = pluginDashboardService

return apiService, nil
r.apiCreated = true
return apiService, pluginDashboardService, nil
}

// initObjectStore initializes the cluster object store interface
Expand Down Expand Up @@ -415,7 +436,7 @@ type dash struct {
pluginService pluginAPI.Service
}

func newDash(listener net.Listener, namespace, uiURL string, browserPath string, apiHandler api.Service, logger log.Logger) (*dash, error) {
func newDash(listener net.Listener, namespace, uiURL string, browserPath string, apiHandler api.Service, pluginHandler pluginAPI.Service, logger log.Logger) (*dash, error) {
hf := octant.NewHandlerFactory(
octant.BackendHandler(apiHandler.Handler),
octant.FrontendURL(viper.GetString("proxy-frontend")))
Expand All @@ -430,6 +451,7 @@ func newDash(listener net.Listener, namespace, uiURL string, browserPath string,
defaultHandler: web.Handler,
willOpenBrowser: true,
apiHandler: apiHandler,
pluginService: pluginHandler,
logger: logger,
}, nil
}
Expand All @@ -448,8 +470,10 @@ func (d *dash) Run(ctx context.Context, startupCh chan bool) error {

// Enable serving the plugin API on the same endpoint as the Octant streaming API.
// This enables remote gRPC plugins.
// grpcl := d.mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
// go serveGRPC(grpcl, d.pluginService)
// if d.pluginService != nil {
// grpcl := d.mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
// go serveGRPC(grpcl, d.pluginService)
// }

http1 := d.mux.Match(cmux.Any())
go func() {
Expand Down Expand Up @@ -518,32 +542,33 @@ func enableOpenCensus() error {
}

// findKubeConfig looks for kube config from .kube or provided by user
func findKubeConfig(logger log.Logger, kubeConfigPath chan string) string {
kubeconfig := clientcmd.NewDefaultClientConfigLoadingRules().GetDefaultFilename()

if _, err := os.Stat(kubeconfig); err == nil {
logger.Infof("using kube config: %v", kubeconfig)
return kubeconfig
func findKubeConfig(logger log.Logger, kubeConfig string) error {
if _, err := os.Stat(kubeConfig); err == nil {
logger.Infof("using kube config: %v", kubeConfig)
return nil
}

return <-kubeConfigPath
return fmt.Errorf("no kubeconfig found")
}

func (r *Runner) startAPIService(ctx context.Context, logger log.Logger, options Options) {
_, err := r.initAPI(ctx, logger, options)
if err != nil {
logger.Errorf("cannot create api: %v", err)
if r.apiCreated == false {
apiService, pluginService, err := r.initAPI(ctx, logger, options)
if err != nil {
logger.Errorf("cannot create api: %v", err)
}
r.dash.apiHandler = apiService
r.dash.pluginService = pluginService
}

hf := octant.NewHandlerFactory(
octant.BackendHandler(r.dash.apiHandler.Handler),
octant.FrontendURL(viper.GetString("proxy-frontend")))

var err error
r.dash.server.Handler, err = hf.Handler(ctx)
if err != nil {
logger.Errorf("cannot create handler: %v", err)
}

logger.Infof("using api service")
return
}

0 comments on commit 07e7f69

Please sign in to comment.