Skip to content

Commit

Permalink
tests: gather metrics
Browse files Browse the repository at this point in the history
test-server and sharded-test-server now gather metrics from all the
shards when shutting down.

Metrics are also gathered when a PrivateKcpServer() shuts down.

Signed-off-by: Andy Goldstein <andy.goldstein@redhat.com>
  • Loading branch information
ncdc committed Jan 30, 2023
1 parent f53db78 commit d5374d5
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 6 deletions.
22 changes: 19 additions & 3 deletions cmd/sharded-test-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
machineryutilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
kuser "k8s.io/apiserver/pkg/authentication/user"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -68,8 +69,13 @@ func main() {
}

func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numberOfShards int, quiet bool) error {
ctx, cancelFn := context.WithCancel(genericapiserver.SetupSignalContext())
defer cancelFn()
// We use a shutdown context to know that it's time to gather metrics, before stopping the shards, proxy, etc.
shutdownCtx, shutdownCancel := context.WithCancel(genericapiserver.SetupSignalContext())
defer shutdownCancel()

// This context controls the life of the shards, proxy, etc.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create request header CA and client cert for front-proxy to connect to shards
requestHeaderCA, err := crypto.MakeSelfSignedCA(
Expand Down Expand Up @@ -299,15 +305,25 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb
}
}

// Wait for either a premature termination error from the shards/proxy/etc, or for the test server process to shut down
select {
case shardIndexErr := <-shardsErrCh:
return fmt.Errorf("shard %d exited: %w", shardIndexErr.index, shardIndexErr.error)
case vwIndexErr := <-virtualWorkspacesErrCh:
return fmt.Errorf("virtual workspaces %d exited: %w", vwIndexErr.index, vwIndexErr.error)
case cacheErr := <-cacheServerErrCh:
return fmt.Errorf("cache server exited: %w", cacheErr.error)
case <-ctx.Done():
case <-shutdownCtx.Done():
}

// We've received the notice to shut down, so try to gather metrics. Use a new context with a fixed timeout.
metricxCtx, metricsCancel := context.WithTimeout(ctx, wait.ForeverTestTimeout)
defer metricsCancel()

for _, shard := range shards {
shard.GatherMetrics(metricxCtx)
}

return nil
}

Expand Down
34 changes: 34 additions & 0 deletions cmd/test-server/kcp/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,40 @@ func (s *Shard) WaitForReady(ctx context.Context) (<-chan error, error) {
return s.terminatedCh, nil
}

func (s *Shard) GatherMetrics(ctx context.Context) {
logger := klog.FromContext(ctx).WithValues("shard", s.name)
logger.Info("gathering shard metrics")

kubeconfigPath := filepath.Join(s.runtimeDir, "admin.kubeconfig")
configLoader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&clientcmd.ConfigOverrides{CurrentContext: "system:admin"},
)
config, err := configLoader.ClientConfig()
if err != nil {
logger.Error(err, "unable to collect metrics: error getting client config")
return
}
kcpClient, err := kcpclientset.NewForConfig(config)
if err != nil {
logger.Error(err, "unable to collect metrics: failed to create kcp client")
return
}
raw, err := kcpClient.RESTClient().Get().RequestURI("/metrics").DoRaw(ctx)
if err != nil {
logger.Error(err, "error getting metrics for shard")
return
}

logDir := filepath.Dir(s.logFilePath)
metricsFile := filepath.Join(logDir, fmt.Sprintf("%s-metrics.txt", s.name))
logger.Info("writing metrics file", "path", metricsFile)
if err := os.WriteFile(metricsFile, raw, 0o644); err != nil {
logger.Error(err, "error writing metrics file", "path", metricsFile)
}

logger.Info("wrote metrics file", "path", metricsFile)
}

// there doesn't seem to be any simple way to get a metav1.Status from the Go client, so we get
// the content in a string-formatted error, unfortunately.
func unreadyComponentsFromError(err error) sets.String {
Expand Down
25 changes: 22 additions & 3 deletions cmd/test-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"path/filepath"
"strings"

"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"

"github.com/kcp-dev/kcp/cmd/sharded-test-server/third_party/library-go/crypto"
Expand Down Expand Up @@ -77,8 +78,13 @@ func main() {
}

func start(shardFlags []string, quiet bool) error {
ctx, cancelFn := context.WithCancel(genericapiserver.SetupSignalContext())
defer cancelFn()
// We use a shutdown context to know that it's time to gather metrics, before stopping the shard
shutdownCtx, shutdownCancel := context.WithCancel(genericapiserver.SetupSignalContext())
defer shutdownCancel()

// This context controls the life of the shard
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create client CA and kcp-admin client cert to connect through front-proxy
_, err := crypto.MakeSelfSignedCA(
Expand Down Expand Up @@ -111,5 +117,18 @@ func start(shardFlags []string, quiet bool) error {
return err
}

return <-errCh
// Wait for either a premature termination error from the shard, or for the test server process to shut down
select {
case err := <-errCh:
return err
case <-shutdownCtx.Done():
}

// We've received the notice to shut down, so try to gather metrics. Use a new context with a fixed timeout.
metricsCtx, metricsCancel := context.WithTimeout(ctx, wait.ForeverTestTimeout)
defer metricsCancel()

shard.GatherMetrics(metricsCtx)

return nil
}
32 changes: 32 additions & 0 deletions test/e2e/framework/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (

"github.com/kcp-dev/kcp/cmd/sharded-test-server/third_party/library-go/crypto"
corev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/core/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
"github.com/kcp-dev/kcp/pkg/embeddedetcd"
"github.com/kcp-dev/kcp/pkg/server"
"github.com/kcp-dev/kcp/pkg/server/options"
Expand Down Expand Up @@ -177,6 +178,28 @@ func SharedKcpServer(t *testing.T) RunningServer {
return f.Servers[serverName]
}

func GatherMetrics(ctx context.Context, t *testing.T, server RunningServer, directory string) {
cfg := server.RootShardSystemMasterBaseConfig(t)
client, err := kcpclientset.NewForConfig(cfg)
if err != nil {
// Don't fail the test if we couldn't scrape metrics
t.Logf("error creating metrics client for server %s: %v", server.Name(), err)
}

raw, err := client.RESTClient().Get().RequestURI("/metrics").DoRaw(ctx)
if err != nil {
// Don't fail the test if we couldn't scrape metrics
t.Logf("error getting metrics for server %s: %v", server.Name(), err)
return
}

metricsFile := filepath.Join(directory, fmt.Sprintf("%s-metrics.txt", server.Name()))
if err := os.WriteFile(metricsFile, raw, 0o644); err != nil {
// Don't fail the test if we couldn't scrape metrics
t.Logf("error writing metrics file %s: %v", metricsFile, err)
}
}

func CreateClientCA(t *testing.T) (string, string) {
clientCADir := t.TempDir()
_, err := crypto.MakeSelfSignedCA(
Expand Down Expand Up @@ -242,6 +265,15 @@ func newKcpFixture(t *testing.T, cfgs ...kcpConfig) *kcpFixture {
t.Fatal("Fixture setup failed: one or more servers did not become ready")
}

t.Cleanup(func() {
ctx, cancel := context.WithTimeout(context.Background(), wait.ForeverTestTimeout)
defer cancel()

for _, server := range servers {
GatherMetrics(ctx, t, server, server.artifactDir)
}
})

t.Logf("Started kcp servers after %s", time.Since(start))

return f
Expand Down

0 comments on commit d5374d5

Please sign in to comment.