Skip to content

Commit

Permalink
Merge pull request #2401 from jmprusi/jmprusi/syncer-tunnels-pod-logs
Browse files Browse the repository at this point in the history
✨ Support for pod logs and other subresources
  • Loading branch information
openshift-merge-robot committed Jan 30, 2023
2 parents da4b8d7 + 4c4e687 commit f53db78
Show file tree
Hide file tree
Showing 12 changed files with 911 additions and 254 deletions.
6 changes: 6 additions & 0 deletions pkg/cliplugins/workload/plugin/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ func (o *SyncOptions) enableSyncerForWorkspace(ctx context.Context, config *rest
ResourceNames: []string{syncTargetName},
Resources: []string{"synctargets"},
},
{
Verbs: []string{"get"},
APIGroups: []string{workloadv1alpha1.SchemeGroupVersion.Group},
ResourceNames: []string{syncTargetName},
Resources: []string{"synctargets/tunnel"},
},
{
Verbs: []string{"get", "list", "watch"},
APIGroups: []string{workloadv1alpha1.SchemeGroupVersion.Group},
Expand Down
14 changes: 10 additions & 4 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,16 @@ func NewConfig(opts *kcpserveroptions.CompletedOptions) (*Config, error) {
apiHandler = authorization.WithSubjectAccessReviewAuditAnnotations(apiHandler)
apiHandler = authorization.WithDeepSubjectAccessReview(apiHandler)

if kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel) {
tunneler := tunneler.NewTunneler()
apiHandler = tunneler.WithSyncerTunnelHandler(apiHandler)
apiHandler = tunneler.WithPodSubresourceProxying(
apiHandler,
c.DynamicClusterClient,
c.KcpSharedInformerFactory,
)
}

// The following ensures that only the default main api handler chain executes authorizers which log audit messages.
// All other invocations of the same authorizer chain still work but do not produce audit log entries.
// This compromises audit log size and information overflow vs. having audit reasons for the main api handler only.
Expand Down Expand Up @@ -419,10 +429,6 @@ func NewConfig(opts *kcpserveroptions.CompletedOptions) (*Config, error) {
*c.preHandlerChainMux = append(*c.preHandlerChainMux, mux)
apiHandler = mux

if kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel) {
apiHandler = tunneler.WithSyncerTunnel(apiHandler)
}

apiHandler = kcpfilters.WithAuditEventClusterAnnotation(apiHandler)
apiHandler = WithAuditAnnotation(apiHandler) // Must run before any audit annotation is made
apiHandler = WithLocalProxy(apiHandler, opts.Extra.ShardName, opts.Extra.ShardBaseURL, c.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters())
Expand Down
111 changes: 76 additions & 35 deletions pkg/tunneler/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package tunneler

import (
"bytes"
"fmt"
"io"
"net/http"
Expand All @@ -26,9 +27,37 @@ import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
)

func setup(t *testing.T) (*http.Client, string, func()) {
// requestInfoHandler is a helping function to populate the requestInfo of a request as expected
// by the WithSyncerTunnelHandler.
func requestInfoHandler(handler http.Handler) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if _, ok := genericapirequest.RequestInfoFrom(ctx); ok {
handler.ServeHTTP(w, r)
return
}
r = r.WithContext(genericapirequest.WithRequestInfo(ctx,
&genericapirequest.RequestInfo{
IsResourceRequest: true,
APIGroup: "workload.kcp.io",
APIVersion: "v1alpha1",
Resource: "synctargets",
Subresource: "tunnel",
Name: "d001",
},
))
r = r.WithContext(genericapirequest.WithCluster(r.Context(), genericapirequest.Cluster{Name: "ws"}))
handler.ServeHTTP(w, r)
})
}

func setup(t *testing.T) (string, *tunneler, func()) {
t.Helper()
backend := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello world")
Expand All @@ -38,17 +67,19 @@ func setup(t *testing.T) (*http.Client, string, func()) {

// public server
mux := http.NewServeMux()
apiHandler := WithSyncerTunnel(mux)
tunneler := NewTunneler()
apiHandler := tunneler.WithSyncerTunnelHandler(mux)
apiHandler = requestInfoHandler(apiHandler)
publicServer := httptest.NewUnstartedServer(apiHandler)
publicServer.EnableHTTP2 = true
publicServer.StartTLS()

// private server
dstUrl, err := SyncerTunnelURL(publicServer.URL, "ws", "d001")
dstURL, err := SyncerTunnelURL(publicServer.URL, "ws", "d001")
if err != nil {
t.Fatal(err)
}
l, err := NewListener(publicServer.Client(), dstUrl)
l, err := NewListener(publicServer.Client(), dstURL)
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,22 +104,25 @@ func setup(t *testing.T) (*http.Client, string, func()) {
publicServer.Close()
backend.Close()
}
return publicServer.Client(), dstUrl + "/" + cmdTunnelProxy + "/", stop
return dstURL, tunneler, stop
}

func Test_integration(t *testing.T) {
client, uri, stop := setup(t)
resp, err := client.Get(uri) //nolint:noctx
if err != nil {
t.Fatalf("Request Failed: %s", err)
}
defer resp.Body.Close()
uri, tunneler, stop := setup(t)
rw := httptest.NewRecorder()
b := &bytes.Buffer{}
req, err := http.NewRequest(http.MethodGet, uri, b) //nolint:noctx
require.NoError(t, err)
tunneler.Proxy("ws", "d001", rw, req)
defer stop()

body, err := io.ReadAll(resp.Body)
response := rw.Result()
body, err := io.ReadAll(response.Body)
defer response.Body.Close()
if err != nil {
t.Fatalf("Reading body failed: %s", err)
}

// Log the request body
bodyString := string(body)
if bodyString != "Hello world" {
Expand All @@ -97,23 +131,26 @@ func Test_integration(t *testing.T) {
}

func Test_integration_multiple_connections(t *testing.T) {
client, uri, stop := setup(t)
uri, tunneler, stop := setup(t)
defer stop()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := client.Get(uri) //nolint:noctx
if err != nil {
t.Errorf("Request Failed: %s", err)
}
defer resp.Body.Close()
rw := httptest.NewRecorder()
b := &bytes.Buffer{}
req, err := http.NewRequest(http.MethodGet, uri, b) //nolint:noctx
require.NoError(t, err)
tunneler.Proxy("ws", "d001", rw, req)

body, err := io.ReadAll(resp.Body)
response := rw.Result()
body, err := io.ReadAll(response.Body)
defer response.Body.Close()
if err != nil {
t.Errorf("Reading body failed: %s", err)
}

// Log the request body
bodyString := string(body)
if bodyString != "Hello world" {
Expand All @@ -134,7 +171,9 @@ func Test_integration_listener_reconnect(t *testing.T) {

// public server
mux := http.NewServeMux()
apiHandler := WithSyncerTunnel(mux)
tunneler := NewTunneler()
apiHandler := tunneler.WithSyncerTunnelHandler(mux)
apiHandler = requestInfoHandler(apiHandler)
publicServer := httptest.NewUnstartedServer(apiHandler)
publicServer.EnableHTTP2 = true
publicServer.StartTLS()
Expand Down Expand Up @@ -166,16 +205,15 @@ func Test_integration_listener_reconnect(t *testing.T) {
// wait for the reverse connection to be established
time.Sleep(1 * time.Second)

client := publicServer.Client()
uri := dstURL + "/" + cmdTunnelProxy + "/"
rw := httptest.NewRecorder()
b := &bytes.Buffer{}
req, err := http.NewRequest(http.MethodGet, dstURL, b) //nolint:noctx
require.NoError(t, err)
tunneler.Proxy("ws", "d001", rw, req)

resp, err := client.Get(uri) //nolint:noctx
if err != nil {
t.Fatalf("Request Failed: %s", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
response := rw.Result()
body, err := io.ReadAll(response.Body)
defer response.Body.Close()
if err != nil {
t.Fatalf("Reading body failed: %s", err)
}
Expand All @@ -201,16 +239,19 @@ func Test_integration_listener_reconnect(t *testing.T) {
go server2.Serve(l2)
defer server2.Close()

resp, err = client.Get(uri) //nolint:noctx
if err != nil {
t.Fatalf("Request Failed: %s", err)
}
defer resp.Body.Close()
rw2 := httptest.NewRecorder()
b2 := &bytes.Buffer{}
req2, err := http.NewRequest(http.MethodGet, dstURL, b2) //nolint:noctx
require.NoError(t, err)
tunneler.Proxy("ws", "d001", rw2, req2)

body, err = io.ReadAll(resp.Body)
response = rw2.Result()
body, err = io.ReadAll(response.Body)
defer response.Body.Close()
if err != nil {
t.Fatalf("Reading body failed: %s", err)
}

// Log the request body
bodyString = string(body)
if bodyString != "Hello world" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/tunneler/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (ln *Listener) sendMessage(m controlMsg) {
}

func (ln *Listener) dial() (net.Conn, error) {
connect := ln.url + "/" + cmdTunnelConnect
connect := ln.url + "/" + tunnelSubresourcePath
pr, pw := io.Pipe()
req, err := http.NewRequest(http.MethodGet, connect, pr) //nolint:noctx
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/tunneler/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Test_SyncerTunnelURL(t *testing.T) {
host: "https://host:9443/base",
ws: "myws",
target: "syncer001",
want: "https://host:9443/base/services/syncer-tunnels/clusters/myws/apis/workload.kcp.io/v1alpha1/synctargets/syncer001",
want: "https://host:9443/base/clusters/myws/apis/workload.kcp.io/v1alpha1/synctargets/syncer001",
},
{
name: "invalid host scheme",
Expand Down

0 comments on commit f53db78

Please sign in to comment.