Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/infra/internal/domain/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (d *domain) syncKloudliteDeviceOnCluster(ctx InfraContext, gvpnName string)
}

// 2. Grab wireguard config from that device
wgConfig, err := d.getGlobalVPNDeviceWgConfig(ctx, gv.Name, gv.KloudliteDevice.Name)
wgConfig, err := d.getGlobalVPNDeviceWgConfig(ctx, gv.Name, gv.KloudliteDevice.Name, nil)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion apps/infra/internal/domain/global-vpn-cluster-connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

const (
gvpnConnectionDeviceMethod = "gvpn-connection"
gvpnConnectionDeviceMethod = "gvpn-connection"
kloudliteGlobalVPNDeviceMethod = "kloudlite-global-vpn-device"
)

func (d *domain) getGlobalVPNConnectionPeers(vpns []*entities.GlobalVPNConnection) ([]wgv1.Peer, error) {
Expand Down
44 changes: 31 additions & 13 deletions apps/infra/internal/domain/global-vpn-devices.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,19 @@ func (d *domain) DeleteGlobalVPNDevice(ctx InfraContext, gvpn string, deviceName
}

func (d *domain) ListGlobalVPNDevice(ctx InfraContext, gvpn string, search map[string]repos.MatchFilter, pagination repos.CursorPagination) (*repos.PaginatedRecord[*entities.GlobalVPNDevice], error) {
filter := d.gvpnDevicesRepo.MergeMatchFilters(repos.Filter{
fc.AccountName: ctx.AccountName,
fc.GlobalVPNDeviceGlobalVPNName: gvpn,
fc.GlobalVPNDeviceCreationMethod: map[string]any{"$ne": gvpnConnectionDeviceMethod},
}, search)
filter := d.gvpnDevicesRepo.MergeMatchFilters(
repos.Filter{
fc.AccountName: ctx.AccountName,
fc.GlobalVPNDeviceGlobalVPNName: gvpn,
},
map[string]repos.MatchFilter{
fc.GlobalVPNDeviceCreationMethod: {
MatchType: repos.MatchTypeNotInArray,
NotInArray: []any{gvpnConnectionDeviceMethod, kloudliteGlobalVPNDevice},
},
},
search,
)
return d.gvpnDevicesRepo.FindPaginated(ctx, filter, pagination)
}

Expand Down Expand Up @@ -167,9 +175,11 @@ func (d *domain) createGlobalVPNDevice(ctx InfraContext, gvpnDevice entities.Glo
func (d *domain) buildPeersFromGlobalVPNDevices(ctx InfraContext, gvpn string) (publicPeers []wgv1.Peer, privatePeers []wgv1.Peer, err error) {
devices, err := d.gvpnDevicesRepo.Find(ctx, repos.Query{
Filter: map[string]any{
fc.AccountName: ctx.AccountName,
fc.GlobalVPNDeviceGlobalVPNName: gvpn,
fc.GlobalVPNDeviceCreationMethod: map[string]any{"$ne": gvpnConnectionDeviceMethod},
fc.AccountName: ctx.AccountName,
fc.GlobalVPNDeviceGlobalVPNName: gvpn,
fc.GlobalVPNDeviceCreationMethod: map[string]any{
"$ne": gvpnConnectionDeviceMethod,
},
},
})
if err != nil {
Expand Down Expand Up @@ -202,14 +212,18 @@ func (d *domain) buildPeersFromGlobalVPNDevices(ctx InfraContext, gvpn string) (
}

func (d *domain) GetGlobalVPNDevice(ctx InfraContext, gvpn string, gvpnDevice string) (*entities.GlobalVPNDevice, error) {
if gvpn == "" || gvpnDevice == "" {
return nil, errors.New("invalid global vpn or device")
}

return d.findGlobalVPNDevice(ctx, gvpn, gvpnDevice)
}

func (d *domain) GetGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnDevice string) (string, error) {
return d.getGlobalVPNDeviceWgConfig(ctx, gvpn, gvpnDevice)
return d.getGlobalVPNDeviceWgConfig(ctx, gvpn, gvpnDevice, nil)
}

func (d *domain) getGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnDevice string) (string, error) {
func (d *domain) getGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnDevice string, postUp []string) (string, error) {
device, err := d.findGlobalVPNDevice(ctx, gvpn, gvpnDevice)
if err != nil {
return "", err
Expand Down Expand Up @@ -262,9 +276,13 @@ func (d *domain) getGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnD
}

config, err := wgutils.GenerateWireguardConfig(wgutils.WgConfigParams{
IPAddr: device.IPAddr,
PrivateKey: device.PrivateKey,
DNS: dnsServer,
IPAddr: device.IPAddr,
PrivateKey: device.PrivateKey,
DNS: dnsServer,
PostUp: postUp,
// PostUp: []string{
// "sudo iptables -A INPUT -i wg0 -j DROP",
// },
PublicPeers: publicPeers,
PrivatePeers: privatePeers,
})
Expand Down
4 changes: 2 additions & 2 deletions apps/infra/internal/domain/global-vpn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func (d *domain) createGlobalVPN(ctx InfraContext, gvpn entities.GlobalVPN) (*en
Name: kloudliteGlobalVPNDevice,
},
ResourceMetadata: common.ResourceMetadata{
DisplayName: "kloudlite-platform-device",
DisplayName: kloudliteGlobalVPNDevice,
CreatedBy: common.CreatedOrUpdatedByKloudlite,
LastUpdatedBy: common.CreatedOrUpdatedByKloudlite,
},
AccountName: ctx.AccountName,
GlobalVPNName: gv.Name,
PublicEndpoint: nil,
CreationMethod: gvpnConnectionDeviceMethod,
CreationMethod: kloudliteGlobalVPNDevice,
})
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ spec:
capabilities:
add:
- NET_ADMIN
- SYS_MODULE
privileged: true
volumeMounts:
- mountPath: /config/wg_confs/wg0.conf
Expand Down Expand Up @@ -114,7 +115,6 @@ spec:
memory: 100Mi

dnsPolicy: ClusterFirst

volumes:
- name: wg-config
secret:
Expand Down
97 changes: 56 additions & 41 deletions apps/observability/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ var Module = fx.Module(
return infra.NewInfraClient(conn)
}),

fx.Provide(func(cfg *rest.Config) (k8s.Client, error) {
return k8s.NewClient(cfg, nil)
}),

fx.Invoke(func(infraCli infra.InfraClient, kcli k8s.Client, iamCli iam.IAMClient, mux *http.ServeMux, sessStore SessionStore, ev *env.Env, logger logging.Logger) {
fx.Invoke(func(infraCli infra.InfraClient, kcfg *rest.Config, iamCli iam.IAMClient, mux *http.ServeMux, sessStore SessionStore, ev *env.Env, logger logging.Logger) {
sessionMiddleware := httpServer.NewReadSessionMiddlewareHandler(sessStore, constants.CookieName, constants.CacheSessionPrefix)

loggingMiddleware := httpServer.NewLoggingMiddleware(logger)
Expand Down Expand Up @@ -116,10 +112,41 @@ var Module = fx.Module(
step = "15s"
}

if err := queryProm(ev.PromHttpAddr, PromMetricsType(metricsType), map[string]string{
"kl_account_name": accountName,
"kl_cluster_name": clusterName,
"kl_tracking_id": trackingId,
k8sCli, err := func() (k8s.Client, error) {
if strings.HasPrefix(trackingId, "clus-") {
return k8s.NewClient(kcfg, nil)
}

return k8s.NewClient(&rest.Config{
Host: fmt.Sprintf("http://kloudlite-device-proxy-%s.kl-account-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName),
WrapTransport: func(rt http.RoundTripper) http.RoundTripper {
return httpServer.NewRoundTripperWithHeaders(rt, map[string][]string{
"X-Kloudlite-Authz": {fmt.Sprintf("Bearer %s", ev.GlobalVPNAuthzSecret)},
})
},
}, nil)
}()
if err != nil {
http.Error(w, fmt.Sprintf("failed to create k8s client: %v", err), http.StatusInternalServerError)
return
}

pods, err := ListPods(r.Context(), k8sCli, map[string]string{constants.ObservabilityTrackingKey: trackingId})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

podNames := make([]string, 0, len(pods))
for _, pod := range pods {
podNames = append(podNames, pod.Name)
}

if err := queryProm(ev.PromHttpAddr, PromMetricsType(metricsType), map[string]PromValue{
"kl_account_name": {Operator: PromOperatorEqual, Value: accountName},
"kl_cluster_name": {Operator: PromOperatorEqual, Value: clusterName},
"kl_tracking_id": {Operator: PromOperatorEqual, Value: trackingId},
"pod_name": {Operator: PromOperatorMatchRegex, Value: fmt.Sprintf("^(%s)$", strings.Join(podNames, ","))},
}, st, et, step, w); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -147,40 +174,26 @@ var Module = fx.Module(
clusterName := r.URL.Query().Get("cluster_name")
trackingId := r.URL.Query().Get("tracking_id")

if !strings.HasPrefix(trackingId, "clus-") {
cfg := &rest.Config{
Host: fmt.Sprintf("http://kloudlite-device-proxy-%s.kl-account-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName),
// Host: fmt.Sprintf("http://kube-access.test-kube-access-globalvpn.svc.cluster.local:8080/clusters/%s", clusterName),
// Host: fmt.Sprintf("http://kloudlite-device-proxy-default.kl-%s.svc.cluster.local:8080/clusters/%s", accountName, clusterName),
k8sCli, err := func() (k8s.Client, error) {
if strings.HasPrefix(trackingId, "clus-") {
return k8s.NewClient(kcfg, nil)
}

//out, err := infraCli.GetClusterKubeconfig(r.Context(), &infra.GetClusterIn{
// UserId: string(sess.UserId),
// UserName: sess.UserName,
// UserEmail: sess.UserEmail,
// AccountName: accountName,
// ClusterName: clusterName,
//})
//if err != nil {
// http.Error(w, err.Error(), 500)
// return
//}
//
//cfg, err := k8s.RestConfigFromKubeConfig(out.Kubeconfig)
//if err != nil {
// http.Error(w, err.Error(), 500)
// return
//}

var err error
kcli, err = k8s.NewClient(cfg, nil)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
return k8s.NewClient(&rest.Config{
Host: fmt.Sprintf("http://kloudlite-device-proxy-%s.kl-account-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName),
WrapTransport: func(rt http.RoundTripper) http.RoundTripper {
return httpServer.NewRoundTripperWithHeaders(rt, map[string][]string{
"X-Kloudlite-Authz": {fmt.Sprintf("Bearer %s", ev.GlobalVPNAuthzSecret)},
})
},
}, nil)
}()
if err != nil {
http.Error(w, fmt.Sprintf("failed to create k8s client: %v", err), http.StatusInternalServerError)
return
}

pods, err := ListPods(r.Context(), kcli, map[string]string{constants.ObservabilityTrackingKey: trackingId})
pods, err := ListPods(r.Context(), k8sCli, map[string]string{constants.ObservabilityTrackingKey: trackingId})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -212,7 +225,9 @@ var Module = fx.Module(
msg, err := b.ReadBytes('\n')
if err != nil {
if !errors.Is(err, io.EOF) {
http.Error(w, err.Error(), 500)
if !closed {
http.Error(w, err.Error(), 500)
}
}
return
}
Expand All @@ -221,7 +236,7 @@ var Module = fx.Module(
}
}()

if err := StreamLogs(r.Context(), kcli, pods, pw, logger); err != nil {
if err := StreamLogs(r.Context(), k8sCli, pods, pw, logger); err != nil {
http.Error(w, err.Error(), 500)
}
})))
Expand Down
23 changes: 18 additions & 5 deletions apps/observability/internal/app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,26 @@ const (
WorkspaceTargetNs ObservabilityLabel = "kl_workspace_target_ns"
)

func buildPromQuery(resType PromMetricsType, filters map[string]string) (string, error) {
type PromOperator string

const (
PromOperatorEqual = PromOperator("=")
PromOperatorNotEqual = PromOperator("!=")
PromOperatorMatchRegex = PromOperator("=~")
PromOperatorNotMatchRegex = PromOperator("!~")
)

type PromValue struct {
Operator PromOperator
// Value must be a VALID prometheus value suitable for the specified PromOperator
Value any
}

func buildPromQuery(resType PromMetricsType, filters map[string]PromValue) (string, error) {
tags := make([]string, 0, len(filters))

for k, v := range filters {
if v != "" {
tags = append(tags, fmt.Sprintf(`%s=%q`, k, v))
}
tags = append(tags, fmt.Sprintf(`%s%s%q`, k, v.Operator, v.Value))
}

switch resType {
Expand All @@ -115,7 +128,7 @@ func buildPromQuery(resType PromMetricsType, filters map[string]string) (string,
}
}

func queryProm(promAddr string, resType PromMetricsType, filters map[string]string, startTime string, endTime string, step string, writer io.Writer) error {
func queryProm(promAddr string, resType PromMetricsType, filters map[string]PromValue, startTime string, endTime string, step string, writer io.Writer) error {
promQuery, err := buildPromQuery(resType, filters)
if err != nil {
return errors.NewE(err)
Expand Down
2 changes: 2 additions & 0 deletions apps/observability/internal/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Env struct {
IsDev bool

KubernetesApiProxy string `env:"KUBERNETES_API_PROXY"`

GlobalVPNAuthzSecret string `env:"GLOBAL_VPN_AUTHZ_SECRET" required:"true"`
}

func LoadEnv() (*Env, error) {
Expand Down
17 changes: 17 additions & 0 deletions cmd/global-vpn-kube-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,34 @@ func main() {
var addr string
var proxyAddr string
var debug bool
var authz string

flag.BoolVar(&debug, "debug", false, "--debug")
flag.StringVar(&addr, "addr", ":8080", "--addr <host:port>")
flag.StringVar(&proxyAddr, "proxy-addr", "", "--proxy-addr <host:port>")
flag.StringVar(&authz, "authz", "", "--authz <authz-token>")
flag.Parse()

if authz == "" {
log.Fatal("authz token, must be provided")
}

reverseProxyMap := make(map[string]*httputil.ReverseProxy)

mux := http.NewServeMux()

kloudliteAuthzHeader := "X-Kloudlite-Authz"

counter := 1
mux.HandleFunc("/clusters/", func(w http.ResponseWriter, req *http.Request) {
token := strings.TrimPrefix(req.Header.Get(kloudliteAuthzHeader), "Bearer ")
fmt.Println("HERE", token)

if len(token) != len(authz) || token != authz {
http.Error(w, "UnAuthorized", http.StatusUnauthorized)
return
}

sp := strings.Split(strings.TrimPrefix(req.URL.Path, "/clusters/"), "/")
if len(sp) <= 1 {
http.Error(w, "invalid request", http.StatusForbidden)
Expand Down Expand Up @@ -55,6 +71,7 @@ func main() {
req.URL.Scheme = "http"
req.URL.Host = strings.ReplaceAll(proxyAddr, "{{.CLUSTER_NAME}}", clusterName)
req.URL.Path = fmt.Sprintf("/%s", strings.Join(sp[1:], "/"))
req.Header.Del(kloudliteAuthzHeader)
},
}
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/functions/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package functions

func Reduce[T any, V any](items []T, reducerFn func(V, T), value V) V {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Consider adding type constraints to the generic parameters.

Adding type constraints to the generic parameters would make the function signature more explicit and prevent potential misuse.

Suggested change
func Reduce[T any, V any](items []T, reducerFn func(V, T), value V) V {
func Reduce[T any, V any](items []T, reducerFn func(V, T) V, value V) V {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment correct?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment helpful?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the comment type correct?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the comment area correct?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What type of LLM test could this comment become?

  • 👍 - this comment is really good/important and we should always make it
  • 👎 - this comment is really bad and we should never make it
  • no reaction - don't turn this comment into an LLM test

for i := range items {
item := items[i]
reducerFn(value, item)
}

return value
}
6 changes: 2 additions & 4 deletions pkg/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func New(options *Options) (Logger, error) {
return cfg
}
pcfg := zap.NewProductionEncoderConfig()
pcfg.EncodeLevel = zapcore.CapitalColorLevelEncoder
pcfg.TimeKey = ""
pcfg.LineEnding = "\n"
return pcfg
Expand Down Expand Up @@ -100,9 +101,6 @@ func New(options *Options) (Logger, error) {
}

lgr := zap.New(zapcore.NewCore(zapcore.NewConsoleEncoder(cfg), os.Stdout, loglevel), zapOpts...)

cLogger := &logger{
zapLogger: lgr.Sugar(),
}
cLogger := &logger{zapLogger: lgr.Sugar()}
return cLogger, nil
}
Loading