diff --git a/apps/infra/internal/domain/clusters.go b/apps/infra/internal/domain/clusters.go index d2955250f..5aa38fc9f 100644 --- a/apps/infra/internal/domain/clusters.go +++ b/apps/infra/internal/domain/clusters.go @@ -399,7 +399,7 @@ func (d *domain) syncKloudliteDeviceOnCluster(ctx InfraContext, gvpnName string) } // 2. Grab wireguard config from that device - wgConfig, err := d.getGlobalVPNDeviceWgConfig(ctx, gv.Name, gv.KloudliteDevice.Name) + wgConfig, err := d.getGlobalVPNDeviceWgConfig(ctx, gv.Name, gv.KloudliteDevice.Name, nil) if err != nil { return err } diff --git a/apps/infra/internal/domain/global-vpn-cluster-connection.go b/apps/infra/internal/domain/global-vpn-cluster-connection.go index ab92f7310..db3441bc0 100644 --- a/apps/infra/internal/domain/global-vpn-cluster-connection.go +++ b/apps/infra/internal/domain/global-vpn-cluster-connection.go @@ -21,7 +21,8 @@ import ( ) const ( - gvpnConnectionDeviceMethod = "gvpn-connection" + gvpnConnectionDeviceMethod = "gvpn-connection" + kloudliteGlobalVPNDeviceMethod = "kloudlite-global-vpn-device" ) func (d *domain) getGlobalVPNConnectionPeers(vpns []*entities.GlobalVPNConnection) ([]wgv1.Peer, error) { diff --git a/apps/infra/internal/domain/global-vpn-devices.go b/apps/infra/internal/domain/global-vpn-devices.go index f3742c005..2a555faaf 100644 --- a/apps/infra/internal/domain/global-vpn-devices.go +++ b/apps/infra/internal/domain/global-vpn-devices.go @@ -116,11 +116,19 @@ func (d *domain) DeleteGlobalVPNDevice(ctx InfraContext, gvpn string, deviceName } func (d *domain) ListGlobalVPNDevice(ctx InfraContext, gvpn string, search map[string]repos.MatchFilter, pagination repos.CursorPagination) (*repos.PaginatedRecord[*entities.GlobalVPNDevice], error) { - filter := d.gvpnDevicesRepo.MergeMatchFilters(repos.Filter{ - fc.AccountName: ctx.AccountName, - fc.GlobalVPNDeviceGlobalVPNName: gvpn, - fc.GlobalVPNDeviceCreationMethod: map[string]any{"$ne": gvpnConnectionDeviceMethod}, - }, search) + filter := d.gvpnDevicesRepo.MergeMatchFilters( + repos.Filter{ + fc.AccountName: ctx.AccountName, + fc.GlobalVPNDeviceGlobalVPNName: gvpn, + }, + map[string]repos.MatchFilter{ + fc.GlobalVPNDeviceCreationMethod: { + MatchType: repos.MatchTypeNotInArray, + NotInArray: []any{gvpnConnectionDeviceMethod, kloudliteGlobalVPNDevice}, + }, + }, + search, + ) return d.gvpnDevicesRepo.FindPaginated(ctx, filter, pagination) } @@ -167,9 +175,11 @@ func (d *domain) createGlobalVPNDevice(ctx InfraContext, gvpnDevice entities.Glo func (d *domain) buildPeersFromGlobalVPNDevices(ctx InfraContext, gvpn string) (publicPeers []wgv1.Peer, privatePeers []wgv1.Peer, err error) { devices, err := d.gvpnDevicesRepo.Find(ctx, repos.Query{ Filter: map[string]any{ - fc.AccountName: ctx.AccountName, - fc.GlobalVPNDeviceGlobalVPNName: gvpn, - fc.GlobalVPNDeviceCreationMethod: map[string]any{"$ne": gvpnConnectionDeviceMethod}, + fc.AccountName: ctx.AccountName, + fc.GlobalVPNDeviceGlobalVPNName: gvpn, + fc.GlobalVPNDeviceCreationMethod: map[string]any{ + "$ne": gvpnConnectionDeviceMethod, + }, }, }) if err != nil { @@ -202,14 +212,18 @@ func (d *domain) buildPeersFromGlobalVPNDevices(ctx InfraContext, gvpn string) ( } func (d *domain) GetGlobalVPNDevice(ctx InfraContext, gvpn string, gvpnDevice string) (*entities.GlobalVPNDevice, error) { + if gvpn == "" || gvpnDevice == "" { + return nil, errors.New("invalid global vpn or device") + } + return d.findGlobalVPNDevice(ctx, gvpn, gvpnDevice) } func (d *domain) GetGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnDevice string) (string, error) { - return d.getGlobalVPNDeviceWgConfig(ctx, gvpn, gvpnDevice) + return d.getGlobalVPNDeviceWgConfig(ctx, gvpn, gvpnDevice, nil) } -func (d *domain) getGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnDevice string) (string, error) { +func (d *domain) getGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnDevice string, postUp []string) (string, error) { device, err := d.findGlobalVPNDevice(ctx, gvpn, gvpnDevice) if err != nil { return "", err @@ -262,9 +276,13 @@ func (d *domain) getGlobalVPNDeviceWgConfig(ctx InfraContext, gvpn string, gvpnD } config, err := wgutils.GenerateWireguardConfig(wgutils.WgConfigParams{ - IPAddr: device.IPAddr, - PrivateKey: device.PrivateKey, - DNS: dnsServer, + IPAddr: device.IPAddr, + PrivateKey: device.PrivateKey, + DNS: dnsServer, + PostUp: postUp, + // PostUp: []string{ + // "sudo iptables -A INPUT -i wg0 -j DROP", + // }, PublicPeers: publicPeers, PrivatePeers: privatePeers, }) diff --git a/apps/infra/internal/domain/global-vpn.go b/apps/infra/internal/domain/global-vpn.go index 08d3732b3..7cbae8503 100644 --- a/apps/infra/internal/domain/global-vpn.go +++ b/apps/infra/internal/domain/global-vpn.go @@ -48,14 +48,14 @@ func (d *domain) createGlobalVPN(ctx InfraContext, gvpn entities.GlobalVPN) (*en Name: kloudliteGlobalVPNDevice, }, ResourceMetadata: common.ResourceMetadata{ - DisplayName: "kloudlite-platform-device", + DisplayName: kloudliteGlobalVPNDevice, CreatedBy: common.CreatedOrUpdatedByKloudlite, LastUpdatedBy: common.CreatedOrUpdatedByKloudlite, }, AccountName: ctx.AccountName, GlobalVPNName: gv.Name, PublicEndpoint: nil, - CreationMethod: gvpnConnectionDeviceMethod, + CreationMethod: kloudliteGlobalVPNDevice, }) if err != nil { return nil, err diff --git a/apps/infra/internal/domain/templates/global-vpn-kloudlite-device.yml.tpl b/apps/infra/internal/domain/templates/global-vpn-kloudlite-device.yml.tpl index 0a2034ad4..8a6683beb 100644 --- a/apps/infra/internal/domain/templates/global-vpn-kloudlite-device.yml.tpl +++ b/apps/infra/internal/domain/templates/global-vpn-kloudlite-device.yml.tpl @@ -68,6 +68,7 @@ spec: capabilities: add: - NET_ADMIN + - SYS_MODULE privileged: true volumeMounts: - mountPath: /config/wg_confs/wg0.conf @@ -114,7 +115,6 @@ spec: memory: 100Mi dnsPolicy: ClusterFirst - volumes: - name: wg-config secret: diff --git a/apps/observability/internal/app/app.go b/apps/observability/internal/app/app.go index f75ad05da..996ade768 100644 --- a/apps/observability/internal/app/app.go +++ b/apps/observability/internal/app/app.go @@ -44,11 +44,7 @@ var Module = fx.Module( return infra.NewInfraClient(conn) }), - fx.Provide(func(cfg *rest.Config) (k8s.Client, error) { - return k8s.NewClient(cfg, nil) - }), - - fx.Invoke(func(infraCli infra.InfraClient, kcli k8s.Client, iamCli iam.IAMClient, mux *http.ServeMux, sessStore SessionStore, ev *env.Env, logger logging.Logger) { + fx.Invoke(func(infraCli infra.InfraClient, kcfg *rest.Config, iamCli iam.IAMClient, mux *http.ServeMux, sessStore SessionStore, ev *env.Env, logger logging.Logger) { sessionMiddleware := httpServer.NewReadSessionMiddlewareHandler(sessStore, constants.CookieName, constants.CacheSessionPrefix) loggingMiddleware := httpServer.NewLoggingMiddleware(logger) @@ -116,10 +112,41 @@ var Module = fx.Module( step = "15s" } - if err := queryProm(ev.PromHttpAddr, PromMetricsType(metricsType), map[string]string{ - "kl_account_name": accountName, - "kl_cluster_name": clusterName, - "kl_tracking_id": trackingId, + k8sCli, err := func() (k8s.Client, error) { + if strings.HasPrefix(trackingId, "clus-") { + return k8s.NewClient(kcfg, nil) + } + + return k8s.NewClient(&rest.Config{ + Host: fmt.Sprintf("http://kloudlite-device-proxy-%s.kl-account-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName), + WrapTransport: func(rt http.RoundTripper) http.RoundTripper { + return httpServer.NewRoundTripperWithHeaders(rt, map[string][]string{ + "X-Kloudlite-Authz": {fmt.Sprintf("Bearer %s", ev.GlobalVPNAuthzSecret)}, + }) + }, + }, nil) + }() + if err != nil { + http.Error(w, fmt.Sprintf("failed to create k8s client: %v", err), http.StatusInternalServerError) + return + } + + pods, err := ListPods(r.Context(), k8sCli, map[string]string{constants.ObservabilityTrackingKey: trackingId}) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + podNames := make([]string, 0, len(pods)) + for _, pod := range pods { + podNames = append(podNames, pod.Name) + } + + if err := queryProm(ev.PromHttpAddr, PromMetricsType(metricsType), map[string]PromValue{ + "kl_account_name": {Operator: PromOperatorEqual, Value: accountName}, + "kl_cluster_name": {Operator: PromOperatorEqual, Value: clusterName}, + "kl_tracking_id": {Operator: PromOperatorEqual, Value: trackingId}, + "pod_name": {Operator: PromOperatorMatchRegex, Value: fmt.Sprintf("^(%s)$", strings.Join(podNames, ","))}, }, st, et, step, w); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -147,40 +174,26 @@ var Module = fx.Module( clusterName := r.URL.Query().Get("cluster_name") trackingId := r.URL.Query().Get("tracking_id") - if !strings.HasPrefix(trackingId, "clus-") { - cfg := &rest.Config{ - Host: fmt.Sprintf("http://kloudlite-device-proxy-%s.kl-account-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName), - // Host: fmt.Sprintf("http://kube-access.test-kube-access-globalvpn.svc.cluster.local:8080/clusters/%s", clusterName), - // Host: fmt.Sprintf("http://kloudlite-device-proxy-default.kl-%s.svc.cluster.local:8080/clusters/%s", accountName, clusterName), + k8sCli, err := func() (k8s.Client, error) { + if strings.HasPrefix(trackingId, "clus-") { + return k8s.NewClient(kcfg, nil) } - //out, err := infraCli.GetClusterKubeconfig(r.Context(), &infra.GetClusterIn{ - // UserId: string(sess.UserId), - // UserName: sess.UserName, - // UserEmail: sess.UserEmail, - // AccountName: accountName, - // ClusterName: clusterName, - //}) - //if err != nil { - // http.Error(w, err.Error(), 500) - // return - //} - // - //cfg, err := k8s.RestConfigFromKubeConfig(out.Kubeconfig) - //if err != nil { - // http.Error(w, err.Error(), 500) - // return - //} - - var err error - kcli, err = k8s.NewClient(cfg, nil) - if err != nil { - http.Error(w, err.Error(), 500) - return - } + return k8s.NewClient(&rest.Config{ + Host: fmt.Sprintf("http://kloudlite-device-proxy-%s.kl-account-%s.svc.cluster.local:8080/clusters/%s", "default", accountName, clusterName), + WrapTransport: func(rt http.RoundTripper) http.RoundTripper { + return httpServer.NewRoundTripperWithHeaders(rt, map[string][]string{ + "X-Kloudlite-Authz": {fmt.Sprintf("Bearer %s", ev.GlobalVPNAuthzSecret)}, + }) + }, + }, nil) + }() + if err != nil { + http.Error(w, fmt.Sprintf("failed to create k8s client: %v", err), http.StatusInternalServerError) + return } - pods, err := ListPods(r.Context(), kcli, map[string]string{constants.ObservabilityTrackingKey: trackingId}) + pods, err := ListPods(r.Context(), k8sCli, map[string]string{constants.ObservabilityTrackingKey: trackingId}) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -212,7 +225,9 @@ var Module = fx.Module( msg, err := b.ReadBytes('\n') if err != nil { if !errors.Is(err, io.EOF) { - http.Error(w, err.Error(), 500) + if !closed { + http.Error(w, err.Error(), 500) + } } return } @@ -221,7 +236,7 @@ var Module = fx.Module( } }() - if err := StreamLogs(r.Context(), kcli, pods, pw, logger); err != nil { + if err := StreamLogs(r.Context(), k8sCli, pods, pw, logger); err != nil { http.Error(w, err.Error(), 500) } }))) diff --git a/apps/observability/internal/app/metrics.go b/apps/observability/internal/app/metrics.go index d75a48b0e..2d6742969 100644 --- a/apps/observability/internal/app/metrics.go +++ b/apps/observability/internal/app/metrics.go @@ -87,13 +87,26 @@ const ( WorkspaceTargetNs ObservabilityLabel = "kl_workspace_target_ns" ) -func buildPromQuery(resType PromMetricsType, filters map[string]string) (string, error) { +type PromOperator string + +const ( + PromOperatorEqual = PromOperator("=") + PromOperatorNotEqual = PromOperator("!=") + PromOperatorMatchRegex = PromOperator("=~") + PromOperatorNotMatchRegex = PromOperator("!~") +) + +type PromValue struct { + Operator PromOperator + // Value must be a VALID prometheus value suitable for the specified PromOperator + Value any +} + +func buildPromQuery(resType PromMetricsType, filters map[string]PromValue) (string, error) { tags := make([]string, 0, len(filters)) for k, v := range filters { - if v != "" { - tags = append(tags, fmt.Sprintf(`%s=%q`, k, v)) - } + tags = append(tags, fmt.Sprintf(`%s%s%q`, k, v.Operator, v.Value)) } switch resType { @@ -115,7 +128,7 @@ func buildPromQuery(resType PromMetricsType, filters map[string]string) (string, } } -func queryProm(promAddr string, resType PromMetricsType, filters map[string]string, startTime string, endTime string, step string, writer io.Writer) error { +func queryProm(promAddr string, resType PromMetricsType, filters map[string]PromValue, startTime string, endTime string, step string, writer io.Writer) error { promQuery, err := buildPromQuery(resType, filters) if err != nil { return errors.NewE(err) diff --git a/apps/observability/internal/env/env.go b/apps/observability/internal/env/env.go index 8acd6dddb..db31fd1a0 100644 --- a/apps/observability/internal/env/env.go +++ b/apps/observability/internal/env/env.go @@ -19,6 +19,8 @@ type Env struct { IsDev bool KubernetesApiProxy string `env:"KUBERNETES_API_PROXY"` + + GlobalVPNAuthzSecret string `env:"GLOBAL_VPN_AUTHZ_SECRET" required:"true"` } func LoadEnv() (*Env, error) { diff --git a/cmd/global-vpn-kube-proxy/main.go b/cmd/global-vpn-kube-proxy/main.go index 6b3146d09..734bf2a41 100644 --- a/cmd/global-vpn-kube-proxy/main.go +++ b/cmd/global-vpn-kube-proxy/main.go @@ -14,18 +14,34 @@ func main() { var addr string var proxyAddr string var debug bool + var authz string flag.BoolVar(&debug, "debug", false, "--debug") flag.StringVar(&addr, "addr", ":8080", "--addr ") flag.StringVar(&proxyAddr, "proxy-addr", "", "--proxy-addr ") + flag.StringVar(&authz, "authz", "", "--authz ") flag.Parse() + if authz == "" { + log.Fatal("authz token, must be provided") + } + reverseProxyMap := make(map[string]*httputil.ReverseProxy) mux := http.NewServeMux() + kloudliteAuthzHeader := "X-Kloudlite-Authz" + counter := 1 mux.HandleFunc("/clusters/", func(w http.ResponseWriter, req *http.Request) { + token := strings.TrimPrefix(req.Header.Get(kloudliteAuthzHeader), "Bearer ") + fmt.Println("HERE", token) + + if len(token) != len(authz) || token != authz { + http.Error(w, "UnAuthorized", http.StatusUnauthorized) + return + } + sp := strings.Split(strings.TrimPrefix(req.URL.Path, "/clusters/"), "/") if len(sp) <= 1 { http.Error(w, "invalid request", http.StatusForbidden) @@ -55,6 +71,7 @@ func main() { req.URL.Scheme = "http" req.URL.Host = strings.ReplaceAll(proxyAddr, "{{.CLUSTER_NAME}}", clusterName) req.URL.Path = fmt.Sprintf("/%s", strings.Join(sp[1:], "/")) + req.Header.Del(kloudliteAuthzHeader) }, } } diff --git a/pkg/functions/list.go b/pkg/functions/list.go new file mode 100644 index 000000000..3ddec825e --- /dev/null +++ b/pkg/functions/list.go @@ -0,0 +1,10 @@ +package functions + +func Reduce[T any, V any](items []T, reducerFn func(V, T), value V) V { + for i := range items { + item := items[i] + reducerFn(value, item) + } + + return value +} diff --git a/pkg/logging/logger.go b/pkg/logging/logger.go index a5e0d8290..52902be6e 100644 --- a/pkg/logging/logger.go +++ b/pkg/logging/logger.go @@ -72,6 +72,7 @@ func New(options *Options) (Logger, error) { return cfg } pcfg := zap.NewProductionEncoderConfig() + pcfg.EncodeLevel = zapcore.CapitalColorLevelEncoder pcfg.TimeKey = "" pcfg.LineEnding = "\n" return pcfg @@ -100,9 +101,6 @@ func New(options *Options) (Logger, error) { } lgr := zap.New(zapcore.NewCore(zapcore.NewConsoleEncoder(cfg), os.Stdout, loglevel), zapOpts...) - - cLogger := &logger{ - zapLogger: lgr.Sugar(), - } + cLogger := &logger{zapLogger: lgr.Sugar()} return cLogger, nil } diff --git a/pkg/repos/db-repo-mongo.go b/pkg/repos/db-repo-mongo.go index aea4c2b81..372df73d5 100644 --- a/pkg/repos/db-repo-mongo.go +++ b/pkg/repos/db-repo-mongo.go @@ -630,24 +630,32 @@ func (repo *dbRepo[T]) ErrAlreadyExists(err error) bool { return mongo.IsDuplicateKeyError(err) } -func (repo *dbRepo[T]) MergeMatchFilters(filter Filter, mFilter map[string]MatchFilter) Filter { +func (repo *dbRepo[T]) MergeMatchFilters(filter Filter, matchFilters ...map[string]MatchFilter) Filter { if filter == nil { filter = map[string]any{} } - for k, v := range mFilter { - _, ok := filter[k] - if ok { - fmt.Printf("skipping search filter field %q, as it is already specified in filter", k) - continue - } - switch v.MatchType { - case MatchTypeExact: - filter[k] = v.Exact - case MatchTypeArray: - filter[k] = bson.M{"$in": v.Array} - case MatchTypeRegex: - filter[k] = bson.M{"$regex": primitive.Regex{Pattern: *v.Regex, Options: "i"}} + for _, mfilter := range matchFilters { + for k, v := range mfilter { + _, ok := filter[k] + if ok { + fmt.Printf("skipping search filter field %q, as it is already specified in filter", k) + continue + } + switch v.MatchType { + case MatchTypeExact: + filter[k] = v.Exact + case MatchTypeArray: + filter[k] = bson.M{"$in": v.Array} + case MatchTypeNotInArray: + filter[k] = bson.M{"$nin": v.NotInArray} + case MatchTypeRegex: + filter[k] = bson.M{"$regex": primitive.Regex{Pattern: *v.Regex, Options: "i"}} + default: + { + fmt.Printf("[WARN, repo, mongo]: unknown match type: %q, supported ones: %+v\n", v.MatchType, []string{MatchTypeExact, MatchTypeArray, MatchTypeNotInArray, MatchTypeRegex}) + } + } } } return filter diff --git a/pkg/repos/db-repo.go b/pkg/repos/db-repo.go index 020cd835f..1b238cad6 100644 --- a/pkg/repos/db-repo.go +++ b/pkg/repos/db-repo.go @@ -45,16 +45,19 @@ type Query struct { type MatchType string const ( - MatchTypeExact = "exact" - MatchTypeArray = "array" - MatchTypeRegex = "regex" + MatchTypeExact = "exact" + MatchTypeArray = "array" + MatchTypeNotInArray = "not-in-array" + MatchTypeRegex = "regex" ) type MatchFilter struct { - MatchType MatchType `json:"matchType" graphql:"enum=exact;array;regex;"` - Exact any `json:"exact,omitempty"` - Array []any `json:"array,omitempty"` - Regex *string `json:"regex,omitempty"` + // MatchType MatchType `json:"matchType" graphql:"enum=exact;array;regex;"` + MatchType MatchType `json:"matchType"` + Exact any `json:"exact,omitempty"` + Array []any `json:"array,omitempty"` + NotInArray []any `json:"notInArray,omitempty"` + Regex *string `json:"regex,omitempty"` } type ID string @@ -113,7 +116,7 @@ type DbRepo[T Entity] interface { DeleteOne(ctx context.Context, filter Filter) error ErrAlreadyExists(err error) bool - MergeMatchFilters(filter Filter, matchFilters map[string]MatchFilter) Filter + MergeMatchFilters(filter Filter, matchFilters ...map[string]MatchFilter) Filter } type indexOrder bool diff --git a/pkg/wgutils/peer-config.go b/pkg/wgutils/peer-config.go index fc1767cb7..e15d97c47 100644 --- a/pkg/wgutils/peer-config.go +++ b/pkg/wgutils/peer-config.go @@ -21,6 +21,9 @@ type WgConfigParams struct { PrivateKey string DNS string + PostUp []string + PostDown []string + PublicPeers []PublicPeer PrivatePeers []PrivatePeer } @@ -33,6 +36,14 @@ Address = {{.IPAddr}}/32 PrivateKey = {{.PrivateKey}} DNS = {{.DNS}} +{{- range .PostUp -}} +PostUp = {{.}} +{{- end -}} + +{{- range .PostDown -}} +PostDown = {{.}} +{{- end -}} + {{- range .PublicPeers}} {{- with .}} [Peer]