Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] NETOBSERV-844 Unable to have a working statusUrl in FlowCollector with Loki Operator 5.6 #314

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 20 additions & 16 deletions cmd/plugin-backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,25 @@
corsHeaders = flag.String("cors-headers", "Origin, X-Requested-With, Content-Type, Accept", "CORS allowed headers (default: Origin, X-Requested-With, Content-Type, Accept)")
corsMaxAge = flag.String("cors-max-age", "", "CORS allowed max age (default: unset)")
// todo: default value temporarily kept to make it work with older versions of the NOO. Remove default and force setup of loki url
lokiURL = flag.String("loki", "http://localhost:3100", "URL of the loki querier host")
lokiStatusURL = flag.String("loki-status", "", "URL for loki /ready /metrics /config endpoints. (default: loki flag value)")
lokiLabels = flag.String("loki-labels", "SrcK8S_Namespace,SrcK8S_OwnerName,DstK8S_Namespace,DstK8S_OwnerName,FlowDirection", "Loki labels, comma separated")
lokiTimeout = flag.Duration("loki-timeout", 30*time.Second, "Timeout of the Loki query to retrieve logs")
lokiTenantID = flag.String("loki-tenant-id", "netobserv", "Tenant organization ID for multi-tenant-loki (submitted as the X-Scope-OrgID HTTP header)")
lokiTokenPath = flag.String("loki-token-path", "", "Path to Bearer authorization header for loki gateway")
lokiForwardUserToken = flag.Bool("loki-forward-user-token", false, "Forward the user Bearer authorization header for loki gateway, this override loki-token-path option")
lokiCAPath = flag.String("loki-ca-path", "", "Path to loki CA certificate")
lokiSkipTLS = flag.Bool("loki-skip-tls", false, "Skip TLS checks for loki HTTPS connection")
lokiMock = flag.Bool("loki-mock", false, "Fake loki results using saved mocks")
logLevel = flag.String("loglevel", "info", "log level (default: info)")
frontendConfig = flag.String("frontend-config", "", "path to the console plugin config file")
authCheck = flag.String("auth-check", "auto", "type of authentication check: authenticated, admin, auto or none (default is auto, based on loki auth mode)")
versionFlag = flag.Bool("v", false, "print version")
log = logrus.WithField("module", "main")
lokiURL = flag.String("loki", "http://localhost:3100", "URL of the loki querier host")
lokiStatusURL = flag.String("loki-status", "", "URL for loki /ready /metrics /config endpoints. (default: loki flag value)")
lokiLabels = flag.String("loki-labels", "SrcK8S_Namespace,SrcK8S_OwnerName,DstK8S_Namespace,DstK8S_OwnerName,FlowDirection", "Loki labels, comma separated")
lokiTimeout = flag.Duration("loki-timeout", 30*time.Second, "Timeout of the Loki query to retrieve logs")
lokiTenantID = flag.String("loki-tenant-id", "netobserv", "Tenant organization ID for multi-tenant-loki (submitted as the X-Scope-OrgID HTTP header)")
lokiTokenPath = flag.String("loki-token-path", "", "Path to Bearer authorization header for loki gateway")
lokiForwardUserToken = flag.Bool("loki-forward-user-token", false, "Forward the user Bearer authorization header for loki gateway, this override loki-token-path option")
lokiCAPath = flag.String("loki-ca-path", "", "Path to loki CA certificate")
lokiSkipTLS = flag.Bool("loki-skip-tls", false, "Skip TLS checks for loki HTTPS connection")
lokiStatusCAPath = flag.String("loki-status-ca-path", "", "Path to loki status CA certificate")
lokiStatusUserCertPath = flag.String("loki-status-user-cert-path", "", "Path to loki status user cert for mTLS")
lokiStatusUserKeyPath = flag.String("loki-status-user-key-path", "", "Path to loki status user key for mTLS")
lokiStatusSkipTLS = flag.Bool("loki-status-skip-tls", false, "Skip TLS checks for loki status HTTPS connection")
lokiMock = flag.Bool("loki-mock", false, "Fake loki results using saved mocks")
logLevel = flag.String("loglevel", "info", "log level (default: info)")
frontendConfig = flag.String("frontend-config", "", "path to the console plugin config file")
authCheck = flag.String("auth-check", "auto", "type of authentication check: authenticated, admin, auto or none (default is auto, based on loki auth mode)")
versionFlag = flag.Bool("v", false, "print version")
log = logrus.WithField("module", "main")
)

func main() {
Expand Down Expand Up @@ -110,7 +114,7 @@
CORSAllowMethods: *corsMethods,
CORSAllowHeaders: *corsHeaders,
CORSMaxAge: *corsMaxAge,
Loki: loki.NewConfig(lURL, lStatusURL, *lokiTimeout, *lokiTenantID, *lokiTokenPath, *lokiForwardUserToken, *lokiSkipTLS, *lokiCAPath, *lokiMock, strings.Split(lLabels, ",")),
Loki: loki.NewConfig(lURL, lStatusURL, *lokiTimeout, *lokiTenantID, *lokiTokenPath, *lokiForwardUserToken, *lokiSkipTLS, *lokiCAPath, *lokiStatusSkipTLS, *lokiStatusCAPath, *lokiStatusUserCertPath, *lokiStatusUserKeyPath, *lokiMock, strings.Split(lLabels, ",")),

Check warning on line 117 in cmd/plugin-backend.go

View check run for this annotation

Codecov / codecov/patch

cmd/plugin-backend.go#L117

Added line #L117 was not covered by tests
FrontendConfig: *frontendConfig,
}, checker)
}
2 changes: 1 addition & 1 deletion pkg/handler/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

func ExportFlows(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
lokiClient := newLokiClient(cfg, r.Header)
lokiClient := newLokiClient(cfg, r.Header, false)

Check warning on line 21 in pkg/handler/export.go

View check run for this annotation

Codecov / codecov/patch

pkg/handler/export.go#L21

Added line #L21 was not covered by tests
var code int
startTime := time.Now()
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func getLimit(params url.Values) (string, int, error) {

func GetFlows(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
lokiClient := newLokiClient(cfg, r.Header)
lokiClient := newLokiClient(cfg, r.Header, false)
var code int
startTime := time.Now()
defer func() {
Expand Down
23 changes: 17 additions & 6 deletions pkg/handler/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
lokiOrgIDHeader = "X-Scope-OrgID"
)

func newLokiClient(cfg *loki.Config, requestHeader http.Header) httpclient.Caller {
func newLokiClient(cfg *loki.Config, requestHeader http.Header, useStatusConfig bool) httpclient.Caller {
headers := map[string][]string{}
if cfg.TenantID != "" {
headers[lokiOrgIDHeader] = []string{cfg.TenantID}
Expand All @@ -57,8 +57,19 @@
return new(lokiclientmock.LokiClientMock)
}

skipTLS := cfg.SkipTLS
caPath := cfg.CAPath
userCertPath := ""
userKeyPath := ""
if useStatusConfig {
skipTLS = cfg.StatusSkipTLS
caPath = cfg.StatusCAPath
userCertPath = cfg.StatusUserCertPath
userKeyPath = cfg.StatusUserKeyPath
}

Check warning on line 69 in pkg/handler/loki.go

View check run for this annotation

Codecov / codecov/patch

pkg/handler/loki.go#L65-L69

Added lines #L65 - L69 were not covered by tests

// TODO: loki with auth
return httpclient.NewHTTPClient(cfg.Timeout, headers, cfg.SkipTLS, cfg.CAPath)
return httpclient.NewHTTPClient(cfg.Timeout, headers, skipTLS, caPath, userCertPath, userKeyPath)
}

/* loki query will fail if spaces or quotes are not encoded
Expand Down Expand Up @@ -185,7 +196,7 @@

func LokiReady(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
lokiClient := newLokiClient(cfg, r.Header)
lokiClient := newLokiClient(cfg, r.Header, true)

Check warning on line 199 in pkg/handler/loki.go

View check run for this annotation

Codecov / codecov/patch

pkg/handler/loki.go#L199

Added line #L199 was not covered by tests
baseURL := strings.TrimRight(cfg.StatusURL.String(), "/")

resp, code, err := executeLokiQuery(fmt.Sprintf("%s/%s", baseURL, "ready"), lokiClient)
Expand All @@ -207,7 +218,7 @@

func LokiMetrics(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
lokiClient := newLokiClient(cfg, r.Header)
lokiClient := newLokiClient(cfg, r.Header, true)

Check warning on line 221 in pkg/handler/loki.go

View check run for this annotation

Codecov / codecov/patch

pkg/handler/loki.go#L221

Added line #L221 was not covered by tests
baseURL := strings.TrimRight(cfg.StatusURL.String(), "/")

resp, code, err := executeLokiQuery(fmt.Sprintf("%s/%s", baseURL, "metrics"), lokiClient)
Expand All @@ -222,7 +233,7 @@

func LokiBuildInfos(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
lokiClient := newLokiClient(cfg, r.Header)
lokiClient := newLokiClient(cfg, r.Header, true)

Check warning on line 236 in pkg/handler/loki.go

View check run for this annotation

Codecov / codecov/patch

pkg/handler/loki.go#L236

Added line #L236 was not covered by tests
baseURL := strings.TrimRight(cfg.StatusURL.String(), "/")

resp, code, err := executeLokiQuery(fmt.Sprintf("%s/%s", baseURL, "loki/api/v1/status/buildinfo"), lokiClient)
Expand All @@ -237,7 +248,7 @@

func LokiConfig(cfg *loki.Config, param string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
lokiClient := newLokiClient(cfg, r.Header)
lokiClient := newLokiClient(cfg, r.Header, true)

Check warning on line 251 in pkg/handler/loki.go

View check run for this annotation

Codecov / codecov/patch

pkg/handler/loki.go#L251

Added line #L251 was not covered by tests
baseURL := strings.TrimRight(cfg.StatusURL.String(), "/")

resp, code, err := executeLokiQuery(fmt.Sprintf("%s/%s", baseURL, "config"), lokiClient)
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

func GetNamespaces(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
lokiClient := newLokiClient(cfg, r.Header)
lokiClient := newLokiClient(cfg, r.Header, false)

Check warning on line 24 in pkg/handler/resources.go

View check run for this annotation

Codecov / codecov/patch

pkg/handler/resources.go#L24

Added line #L24 was not covered by tests
var code int
startTime := time.Now()
defer func() {
Expand Down Expand Up @@ -75,7 +75,7 @@

func GetNames(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
lokiClient := newLokiClient(cfg, r.Header)
lokiClient := newLokiClient(cfg, r.Header, false)

Check warning on line 78 in pkg/handler/resources.go

View check run for this annotation

Codecov / codecov/patch

pkg/handler/resources.go#L78

Added line #L78 was not covered by tests
var code int
startTime := time.Now()
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (

func GetTopology(cfg *loki.Config) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
lokiClient := newLokiClient(cfg, r.Header)
lokiClient := newLokiClient(cfg, r.Header, false)
var code int
startTime := time.Now()
defer func() {
Expand Down
31 changes: 22 additions & 9 deletions pkg/httpclient/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

var slog = logrus.WithField("module", "server")

func NewHTTPClient(timeout time.Duration, overrideHeaders map[string][]string, skipTLS bool, capath string) Caller {
func NewHTTPClient(timeout time.Duration, overrideHeaders map[string][]string, skipTLS bool, capath string, userCertPath string, userKeyPath string) Caller {
transport := &http.Transport{
DialContext: (&net.Dialer{Timeout: timeout}).DialContext,
IdleConnTimeout: timeout,
Expand All @@ -33,14 +33,27 @@
if skipTLS {
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
slog.Warn("skipping TLS checks. SSL certificate verification is now disabled !")
} else if capath != "" {
caCert, err := os.ReadFile(capath)
if err != nil {
slog.Errorf("Cannot load loki ca certificate: %v", err)
} else {
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
transport.TLSClientConfig = &tls.Config{RootCAs: pool}
} else if capath != "" || userCertPath != "" {
transport.TLSClientConfig = &tls.Config{}

if capath != "" {
caCert, err := os.ReadFile(capath)
if err != nil {
slog.Errorf("Cannot load loki ca certificate: %v", err)
} else {
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
transport.TLSClientConfig.RootCAs = pool
}

Check warning on line 47 in pkg/httpclient/http_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/httpclient/http_client.go#L37-L47

Added lines #L37 - L47 were not covered by tests
}

if userCertPath != "" {
cert, err := tls.LoadX509KeyPair(userCertPath, userKeyPath)
if err != nil {
slog.Errorf("Cannot load loki user certificate: %v", err)
} else {
transport.TLSClientConfig.Certificates = []tls.Certificate{cert}
}

Check warning on line 56 in pkg/httpclient/http_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/httpclient/http_client.go#L50-L56

Added lines #L50 - L56 were not covered by tests
}
}

Expand Down
45 changes: 27 additions & 18 deletions pkg/loki/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,39 @@ import (
)

type Config struct {
URL *url.URL
StatusURL *url.URL
Timeout time.Duration
TenantID string
TokenPath string
SkipTLS bool
CAPath string
URL *url.URL
StatusURL *url.URL
Timeout time.Duration
TenantID string
TokenPath string
SkipTLS bool
CAPath string
StatusSkipTLS bool
StatusCAPath string
StatusUserCertPath string
StatusUserKeyPath string

UseMocks bool
ForwardUserToken bool
Labels map[string]struct{}
}

func NewConfig(url *url.URL, statusURL *url.URL, timeout time.Duration, tenantID string, tokenPath string, forwardUserToken bool, skipTLS bool, capath string, useMocks bool, labels []string) Config {
func NewConfig(url *url.URL, statusURL *url.URL, timeout time.Duration, tenantID string, tokenPath string, forwardUserToken bool, skipTLS bool, capath string, statusSkipTLS bool, statusCapath string, statusUserCertPath string, statusUserKeyPath string, useMocks bool, labels []string) Config {
return Config{
URL: url,
StatusURL: statusURL,
Timeout: timeout,
TenantID: tenantID,
TokenPath: tokenPath,
SkipTLS: skipTLS,
CAPath: capath,
UseMocks: useMocks,
ForwardUserToken: forwardUserToken,
Labels: utils.GetMapInterface(labels),
URL: url,
StatusURL: statusURL,
Timeout: timeout,
TenantID: tenantID,
TokenPath: tokenPath,
SkipTLS: skipTLS,
CAPath: capath,
StatusSkipTLS: statusSkipTLS,
StatusCAPath: statusCapath,
StatusUserCertPath: statusUserCertPath,
StatusUserKeyPath: statusUserKeyPath,
UseMocks: useMocks,
ForwardUserToken: forwardUserToken,
Labels: utils.GetMapInterface(labels),
}
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/loki/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestFlowQuery_AddLabelFilters(t *testing.T) {
lokiURL, err := url.Parse("/")
require.NoError(t, err)
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, []string{"foo", "flis"})
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"foo", "flis"})
query := NewFlowQueryBuilderWithDefaults(&cfg)
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
require.NoError(t, err)
Expand All @@ -26,15 +26,15 @@ func TestFlowQuery_AddLabelFilters(t *testing.T) {
func TestQuery_BackQuote_Error(t *testing.T) {
lokiURL, err := url.Parse("/")
require.NoError(t, err)
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, []string{"lab1", "lab2"})
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"lab1", "lab2"})
query := NewFlowQueryBuilderWithDefaults(&cfg)
assert.Error(t, query.addFilter(filters.NewMatch("key", "backquoted`val")))
}

func TestFlowQuery_AddNotLabelFilters(t *testing.T) {
lokiURL, err := url.Parse("/")
require.NoError(t, err)
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, []string{"foo", "flis"})
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"foo", "flis"})
query := NewFlowQueryBuilderWithDefaults(&cfg)
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
require.NoError(t, err)
Expand All @@ -51,7 +51,7 @@ func backtick(str string) string {
func TestFlowQuery_AddLineFilterMultipleValues(t *testing.T) {
lokiURL, err := url.Parse("/")
require.NoError(t, err)
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, []string{})
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{})
query := NewFlowQueryBuilderWithDefaults(&cfg)
err = query.addFilter(filters.NewMatch("foo", `bar,baz`))
require.NoError(t, err)
Expand All @@ -62,7 +62,7 @@ func TestFlowQuery_AddLineFilterMultipleValues(t *testing.T) {
func TestFlowQuery_AddNotLineFilters(t *testing.T) {
lokiURL, err := url.Parse("/")
require.NoError(t, err)
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, []string{})
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{})
query := NewFlowQueryBuilderWithDefaults(&cfg)
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
require.NoError(t, err)
Expand All @@ -75,7 +75,7 @@ func TestFlowQuery_AddNotLineFilters(t *testing.T) {
func TestFlowQuery_AddLineFiltersWithEmpty(t *testing.T) {
lokiURL, err := url.Parse("/")
require.NoError(t, err)
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, []string{})
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{})
query := NewFlowQueryBuilderWithDefaults(&cfg)
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
require.NoError(t, err)
Expand All @@ -88,7 +88,7 @@ func TestFlowQuery_AddLineFiltersWithEmpty(t *testing.T) {
func TestFlowQuery_AddRecordTypeLabelFilter(t *testing.T) {
lokiURL, err := url.Parse("/")
require.NoError(t, err)
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, []string{"foo", "flis", "_RecordType"})
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, "", "", "", false, []string{"foo", "flis", "_RecordType"})
query := NewFlowQueryBuilderWithDefaults(&cfg)
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/server_flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ func TestLokiFiltering(t *testing.T) {
false,
"",
false,
"",
"",
"",
false,
[]string{"SrcK8S_Namespace", "SrcK8S_OwnerName", "DstK8S_Namespace", "DstK8S_OwnerName", "FlowDirection"},
),
}, &authM)
Expand Down