Skip to content

Commit

Permalink
sharded-test-server: refactor vw to start/wait
Browse files Browse the repository at this point in the history
Align with the shard WaitForReady pattern ref #2303 as a step
towards enabling multiple VW servers when there are multiple shards
  • Loading branch information
Steven Hardy committed Dec 13, 2022
1 parent 0bc848e commit 27a1c3e
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 49 deletions.
30 changes: 23 additions & 7 deletions cmd/sharded-test-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,22 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb
shards = append(shards, shard)
}

// Start virtual-workspace servers
vwPort := "6444"
virtualWorkspacesErrCh := make(chan indexErrTuple)
var virtualWorkspaces []*VirtualWorkspace
if standaloneVW {
// TODO: support multiple virtual workspace servers (i.e. multiple ports)
vwPort = "7444"

for i := 0; i < numberOfShards; i++ {
virtualWorkspaceErrCh, err := startVirtual(ctx, i, servingCA, hostIP.String(), logDirPath, workDirPath, clientCA)
vw, err := newVirtualWorkspace(ctx, i, servingCA, hostIP.String(), logDirPath, workDirPath, clientCA)
if err != nil {
return fmt.Errorf("error starting virtual workspaces server %d: %w", i, err)
return err
}
if err := vw.start(ctx); err != nil {
return err
}
go func(vwIndex int, vwErrCh <-chan error) {
err := <-virtualWorkspaceErrCh
virtualWorkspacesErrCh <- indexErrTuple{vwIndex, err}
}(i, virtualWorkspaceErrCh)
virtualWorkspaces = append(virtualWorkspaces, vw)
}
}

Expand All @@ -222,6 +223,21 @@ func start(proxyFlags, shardFlags []string, logDirPath, workDirPath string, numb
return err
}

// Wait for virtual workspaces to be ready
virtualWorkspacesErrCh := make(chan indexErrTuple)
if standaloneVW {
for i, vw := range virtualWorkspaces {
terminatedCh, err := vw.waitForReady(ctx)
if err != nil {
return err
}
go func(i int, terminatedCh <-chan error) {
err := <-terminatedCh
virtualWorkspacesErrCh <- indexErrTuple{i, err}
}(i, terminatedCh)
}
}

// Wait for shards to be ready
shardsErrCh := make(chan indexErrTuple)
for i, shard := range shards {
Expand Down
117 changes: 75 additions & 42 deletions cmd/sharded-test-server/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"context"
"fmt"
"io"
"net/http"
"os"
"os/exec"
Expand All @@ -37,19 +38,27 @@ import (

"github.com/kcp-dev/kcp/cmd/sharded-test-server/third_party/library-go/crypto"
"github.com/kcp-dev/kcp/cmd/test-server/helpers"
"github.com/kcp-dev/kcp/test/e2e/framework"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
"github.com/kcp-dev/kcp/test/e2e/framework"
)

func startVirtual(ctx context.Context, index int, servingCA *crypto.CA, hostIP string, logDirPath, workDirPath string, clientCA *crypto.CA) (<-chan error, error) {
logger := klog.FromContext(ctx)
type headWriter interface {
io.Writer
StopOut()
}

prefix := fmt.Sprintf("VW-%d", index)
yellow := color.New(color.BgYellow, color.FgHiWhite).SprintFunc()
out := lineprefix.New(
lineprefix.Prefix(yellow(prefix)),
lineprefix.Color(color.New(color.FgHiYellow)),
)
type VirtualWorkspace struct {
index int
workDirPath string
logDirPath string
args []string

terminatedCh <-chan error
writer headWriter
}

func newVirtualWorkspace(ctx context.Context, index int, servingCA *crypto.CA, hostIP string, logDirPath, workDirPath string, clientCA *crypto.CA) (*VirtualWorkspace, error) {
logger := klog.FromContext(ctx)

// create serving cert
hostnames := sets.NewString("localhost", hostIP)
Expand All @@ -70,24 +79,20 @@ func startVirtual(ctx context.Context, index int, servingCA *crypto.CA, hostIP s
shardUser := &user.DefaultInfo{Name: fmt.Sprintf("kcp-vw-%d", index), Groups: []string{"system:masters"}}
_, err = clientCA.MakeClientCertificate(vwClientCert, vwClientCertKey, shardUser, 365)
if err != nil {
fmt.Printf("failed to create vw client cert: %v\n", err)
os.Exit(1)
return nil, fmt.Errorf("failed to create vw client cert: %w", err)
}

servingCAPath, err := filepath.Abs(filepath.Join(workDirPath, ".kcp/serving-ca.crt"))
if err != nil {
fmt.Printf("error getting absolute path for %q: %v\n", filepath.Join(workDirPath, ".kcp/serving-ca.crt"), err)
os.Exit(1)
return nil, fmt.Errorf("error getting absolute path for %q: %w", filepath.Join(workDirPath, ".kcp/serving-ca.crt"), err)
}
vwClientCertPath, err := filepath.Abs(vwClientCert)
if err != nil {
fmt.Printf("error getting absolute path for %q: %v\n", vwClientCert, err)
os.Exit(1)
return nil, fmt.Errorf("error getting absolute path for %q: %w", vwClientCert, err)
}
vwClientCertKeyPath, err := filepath.Abs(vwClientCertKey)
if err != nil {
fmt.Printf("error getting absolute path for %q: %v\n", vwClientCertKey, err)
os.Exit(1)
return nil, fmt.Errorf("error getting absolute path for %q: %w", vwClientCertKey, err)
}

virtualWorkspaceKubeConfig := clientcmdapi.Config{
Expand Down Expand Up @@ -121,71 +126,99 @@ func startVirtual(ctx context.Context, index int, servingCA *crypto.CA, hostIP s
authenticationKubeconfigPath := filepath.Join(workDirPath, fmt.Sprintf(".kcp-%d", index), "admin.kubeconfig")
clientCAFilePath := filepath.Join(workDirPath, ".kcp", "client-ca.crt")

commandLine := framework.DirectOrGoRunCommand("virtual-workspaces")
commandLine = append(
commandLine,
args := []string{}
args = append(args,
fmt.Sprintf("--kubeconfig=%s", kubeconfigPath),
fmt.Sprintf("--authentication-kubeconfig=%s", authenticationKubeconfigPath),
"--authentication-skip-lookup",
fmt.Sprintf("--client-ca-file=%s", clientCAFilePath),
fmt.Sprintf("--tls-private-key-file=%s", servingKeyFile),
fmt.Sprintf("--tls-cert-file=%s", servingCertFile),
fmt.Sprintf("--secure-port=%d", 7444+index),
)

return &VirtualWorkspace{
index: index,
workDirPath: workDirPath,
logDirPath: logDirPath,
args: args,
}, nil
}

func (v *VirtualWorkspace) start(ctx context.Context) error {
prefix := fmt.Sprintf("VW-%d", v.index)
yellow := color.New(color.BgYellow, color.FgHiWhite).SprintFunc()
out := lineprefix.New(
lineprefix.Prefix(yellow(prefix)),
lineprefix.Color(color.New(color.FgHiYellow)),
)

commandLine := framework.DirectOrGoRunCommand("virtual-workspaces")
commandLine = append(commandLine, v.args...)
commandLine = append(
commandLine,
"--authentication-skip-lookup",
"--requestheader-username-headers=X-Remote-User",
"--requestheader-group-headers=X-Remote-Group",
fmt.Sprintf("--requestheader-client-ca-file=%s", filepath.Join(workDirPath, ".kcp/requestheader-ca.crt")),
fmt.Sprintf("--requestheader-client-ca-file=%s", filepath.Join(v.workDirPath, ".kcp/requestheader-ca.crt")),
"--v=4",
)
fmt.Fprintf(out, "running: %v\n", strings.Join(commandLine, " "))

cmd := exec.CommandContext(ctx, commandLine[0], commandLine[1:]...)

logFilePath := filepath.Join(workDirPath, fmt.Sprintf(".kcp-virtual-workspaces-%d/virtualworkspace.log", index))
if logDirPath != "" {
logFilePath = filepath.Join(logDirPath, fmt.Sprintf("kcp-virtual-workspaces-%d.log", index))
logFilePath := filepath.Join(v.workDirPath, fmt.Sprintf(".kcp-virtual-workspaces-%d/virtualworkspace.log", v.index))
if v.logDirPath != "" {
logFilePath = filepath.Join(v.logDirPath, fmt.Sprintf("kcp-virtual-workspaces-%d.log", v.index))
}

if err := os.MkdirAll(filepath.Dir(logFilePath), 0755); err != nil {
return nil, err
return err
}
logFile, err := os.OpenFile(logFilePath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
return err
}

writer := helpers.NewHeadWriter(logFile, out)
cmd.Stdout = writer
v.writer = helpers.NewHeadWriter(logFile, out)
cmd.Stdout = v.writer
cmd.Stdin = os.Stdin
cmd.Stderr = writer
cmd.Stderr = v.writer

if err := cmd.Start(); err != nil {
return nil, err
return err
}

terminatedCh := make(chan error, 1)
v.terminatedCh = terminatedCh
go func() {
terminatedCh <- cmd.Wait()
}()

return nil
}

func (v *VirtualWorkspace) waitForReady(ctx context.Context) (<-chan error, error) {
// wait for readiness
logger.WithValues("virtual-workspaces", index).Info("Waiting for virtual-workspaces /readyz to succeed")
logger := klog.FromContext(ctx)
logger.WithValues("virtual-workspaces", v.index).Info("Waiting for virtual-workspaces /readyz to succeed")
for {
time.Sleep(100 * time.Millisecond)

select {
case <-ctx.Done():
return nil, fmt.Errorf("context canceled")
case rc := <-terminatedCh:
return nil, fmt.Errorf("virtual-workspaces terminated with exit code %d", rc)
case rc := <-v.terminatedCh:
return nil, fmt.Errorf("virtual-workspaces terminated with exit code %w", rc)
default:
}

vwHost := fmt.Sprintf("https://localhost:%d", 7444+index)
vwHost := fmt.Sprintf("https://localhost:%d", 7444+v.index)
kubeconfigPath := filepath.Join(v.workDirPath, fmt.Sprintf(".kcp-virtual-workspaces-%d/virtualworkspace.kubeconfig", v.index))
vwConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
// We override the Server here because virtualworkspace.kubeconfig is
// for VW->shard but we want to poll the VW endpoint readyz
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: vwHost}}).ClientConfig()
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
// We override the Server here because virtualworkspace.kubeconfig is
// for VW->shard but we want to poll the VW endpoint readyz
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: vwHost}}).ClientConfig()
if err != nil {
logger.Error(err, "failed to create vw config")
continue
Expand Down Expand Up @@ -214,11 +247,11 @@ func startVirtual(ctx context.Context, index int, servingCA *crypto.CA, hostIP s
}
}

logger.WithValues("virtual-workspaces", index).Info("virtual-workspaces ready")
logger.WithValues("virtual-workspaces", v.index).Info("virtual-workspaces ready")

if !logger.V(3).Enabled() {
writer.StopOut()
v.writer.StopOut()
}

return terminatedCh, nil
return v.terminatedCh, nil
}

0 comments on commit 27a1c3e

Please sign in to comment.