Skip to content

Commit

Permalink
feat: auto-create 1.x DB or 2.x bucket for flux task logs (#2622)
Browse files Browse the repository at this point in the history
* feat: auto-create 1.x DB or 2.x bucket for flux task logs

Closes #2620

Flux task logs are stored in the "Analytic Store", which is backed
by an influxdb 1.x Database or 2.x Bucket.

Auto-creation gives an infinite retention policy, which can be adjusted later by the user.

Note that for 1.x, "database/rp" bucket names will not be autocreated, only
bare "database" bucket names. This could potentially be added in the future
but the assumption is if a user wants control over the retention policy name
they probably want to control the retention policy creation as well.

Tested manually against 1.x and 2.x.

* fix: succeed auto-creation if final try is successful
  • Loading branch information
lesam committed Sep 16, 2021
1 parent cbcd989 commit b498fce
Show file tree
Hide file tree
Showing 20 changed files with 322 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [#2605](https://github.com/influxdata/kapacitor/pull/2605): Updated jwt dependencies of libraries because of https://nvd.nist.gov/vuln/detail/CVE-2020-26160
- [#2601](https://github.com/influxdata/kapacitor/pull/2601): Switched to github.com/golang-jwt/jwt for kapacitor's use because of https://nvd.nist.gov/vuln/detail/CVE-2020-26160
- [#2618](https://github.com/influxdata/kapacitor/pull/2618): Switch task service to use Flux formatter that preserves comments
- [#2622](https://github.com/influxdata/kapacitor/pull/2622): auto-create 1.x DB or 2.x bucket for flux task logs

### Features

Expand Down
3 changes: 1 addition & 2 deletions barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package kapacitor

import (
"errors"
"time"

"sync"
"sync/atomic"
"time"

"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/models"
Expand Down
5 changes: 2 additions & 3 deletions http_post.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kapacitor

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -9,9 +11,6 @@ import (
"sync"
"time"

"bytes"
"context"

"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
Expand Down
79 changes: 72 additions & 7 deletions influxdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type Client interface {
// if it exists. Unlike QueryFlux, this returns a *Response
// object.
QueryFluxResponse(q FluxQuery) (*Response, error)

// CreateBucketV2 uses the 2.x /api/v2/bucket api to create a bucket.
// Note that 1.x does not support this API
CreateBucketV2(bucket string, org string, orgID string) error
}

type ClientUpdater interface {
Expand Down Expand Up @@ -147,6 +151,69 @@ type HTTPClient struct {
compression string
}

func (c *HTTPClient) getOrgID(org string) (string, error) {
u := c.url()
u.Path = "/api/v2/orgs"
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return "", err
}
reader, err := c.doHttpV2(req, 200)
if err != nil {
return "", err
}
var val struct {
Orgs []struct {
ID string `json:"id"`
Name string `json:"name"`
} `json:"orgs"`
}
err = json.NewDecoder(reader).Decode(&val)
if err != nil {
return "", fmt.Errorf("decoding json from org request: %w", err)
}
// Check for "org" matching id to account for odd flux query API that will accept an org ID as an org name
for _, o := range val.Orgs {
if o.ID == org {
return o.ID, nil
}
}
for _, o := range val.Orgs {
if o.Name == org {
return o.ID, nil
}
}
return "", fmt.Errorf("unknown organization name %s", org)
}

func (c *HTTPClient) CreateBucketV2(bucket, org, orgID string) error {
u := c.url()
u.Path = "/api/v2/buckets"

var err error
if orgID == "" {
orgID, err = c.getOrgID(org)
if err != nil {
return fmt.Errorf("attempting to get org id for org: %w", err)
}
}

body := fmt.Sprintf(`{"orgID": %q, "name": %q, "retentionRules": [] }`, orgID, bucket)

req, err := http.NewRequest("POST", u.String(), bytes.NewBufferString(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept-Encoding", "gzip")
reader, err := c.doHttpV2(req, 201)
if err != nil {
return err
}
_, err = io.Copy(io.Discard, reader)
return err
}

// NewHTTPClient returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func NewHTTPClient(conf Config) (*HTTPClient, error) {
Expand Down Expand Up @@ -367,7 +434,7 @@ func (r readClose) Close() error {

var _ io.ReadCloser = readClose{}

func (c *HTTPClient) doFlux(req *http.Request, codes ...int) (io.ReadCloser, error) {
func (c *HTTPClient) doHttpV2(req *http.Request, codes ...int) (io.ReadCloser, error) {
// Get current config
config := c.loadConfig()
// Set auth credentials
Expand Down Expand Up @@ -422,7 +489,7 @@ func (c *HTTPClient) doFlux(req *http.Request, codes ...int) (io.ReadCloser, err
Error string `json:"error"`
}{}
if err := d.Decode(&rp); err != nil {
return nil, err
return nil, fmt.Errorf("error attempting to decode http response, status %d: %w", resp.StatusCode, err)
}
if rp.Error != "" {
return nil, errors.New(rp.Error)
Expand Down Expand Up @@ -593,9 +660,7 @@ type Result struct {

func (c *HTTPClient) buildFluxRequest(q FluxQuery) (*http.Request, error) {
u := c.url()
if c.url().Path == "" || c.url().Path == "/" {
u.Path = "/api/v2/query"
}
u.Path = "/api/v2/query"

v := url.Values{}
if q.Org != "" {
Expand Down Expand Up @@ -645,7 +710,7 @@ func (c *HTTPClient) QueryFluxResponse(q FluxQuery) (*Response, error) {
if err != nil {
return nil, err
}
reader, err := c.doFlux(req, http.StatusOK)
reader, err := c.doHttpV2(req, http.StatusOK)
if err != nil {
return nil, err
}
Expand All @@ -663,7 +728,7 @@ func (c *HTTPClient) QueryFlux(q FluxQuery) (flux.ResultIterator, error) {
if err != nil {
return nil, err
}
reader, err := c.doFlux(req, http.StatusOK)
reader, err := c.doHttpV2(req, http.StatusOK)
if err != nil {
return nil, err
}
Expand Down
117 changes: 117 additions & 0 deletions influxdb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClient_Flux(t *testing.T) {
Expand Down Expand Up @@ -451,3 +453,118 @@ func TestBatchPoints_SettersGetters(t *testing.T) {
t.Errorf("Expected: %s, got %s", bp.WriteConsistency(), "wc2")
}
}

const cannedOrgResponse = `{
"links": {
"self": "/api/v2/orgs"
},
"orgs": [
{
"links": {
"buckets": "/api/v2/buckets?org=myorg",
"dashboards": "/api/v2/dashboards?org=myorg",
"labels": "/api/v2/orgs/e89732d2ac84d94a/labels",
"logs": "/api/v2/orgs/e89732d2ac84d94a/logs",
"members": "/api/v2/orgs/e89732d2ac84d94a/members",
"owners": "/api/v2/orgs/e89732d2ac84d94a/owners",
"secrets": "/api/v2/orgs/e89732d2ac84d94a/secrets",
"self": "/api/v2/orgs/e89732d2ac84d94a",
"tasks": "/api/v2/tasks?org=myorg"
},
"id": "e89732d2ac84d94a",
"name": "org1",
"description": "",
"createdAt": "2021-09-13T18:57:23.816229Z",
"updatedAt": "2021-09-13T18:57:23.816239Z"
},
{
"links": {
"buckets": "/api/v2/buckets?org=myorg",
"dashboards": "/api/v2/dashboards?org=myorg",
"labels": "/api/v2/orgs/0x0x0x0xac84d94a/labels",
"logs": "/api/v2/orgs/0x0x0x0xac84d94a/logs",
"members": "/api/v2/orgs/0x0x0x0xac84d94a/members",
"owners": "/api/v2/orgs/0x0x0x0xac84d94a/owners",
"secrets": "/api/v2/orgs/0x0x0x0xac84d94a/secrets",
"self": "/api/v2/orgs/0x0x0x0xac84d94a",
"tasks": "/api/v2/tasks?org=myorg"
},
"id": "0x0x0x0xac84d94a",
"name": "org2",
"description": "",
"createdAt": "2021-09-13T18:57:23.816229Z",
"updatedAt": "2021-09-13T18:57:23.816239Z"
}
]
}`

func TestHTTPClient_CreateBucketV2(t *testing.T) {

tests := []struct {
name string
org string
orgId string
bucket string
expectBucketBody string
wantErr string
}{
{
name: "basic",
orgId: "x0x0x0",
bucket: "mybucket",
expectBucketBody: `{"orgID": "x0x0x0", "name": "mybucket", "retentionRules": [] }`,
wantErr: "",
},
{
name: "with-unknown-org",
org: "some-other-org",
bucket: "mybucket",
expectBucketBody: "",
wantErr: "unknown organization name",
},
{
name: "with-org",
org: "org2",
bucket: "mybucket",
expectBucketBody: `{"orgID": "0x0x0x0xac84d94a", "name": "mybucket", "retentionRules": [] }`,
wantErr: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotBucketBody := false
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "Token mytoken", r.Header.Get("Authorization"))
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
switch r.URL.Path {
case "/api/v2/buckets":
assert.Equal(t, tt.expectBucketBody, string(body))
gotBucketBody = true
w.WriteHeader(201)
case "/api/v2/orgs":
w.WriteHeader(200)
w.Write([]byte(cannedOrgResponse))
default:
w.WriteHeader(404)
}
}))
c, err := NewHTTPClient(Config{
URLs: []string{ts.URL},
Credentials: Credentials{
Method: TokenAuthentication,
Token: "mytoken",
},
})
require.NoError(t, err)
err = c.CreateBucketV2(tt.bucket, tt.org, tt.orgId)
if tt.wantErr == "" {
assert.NoError(t, err)
} else {
assert.Contains(t, err.Error(), tt.wantErr)
}
assert.Equal(t, tt.expectBucketBody != "", gotBucketBody)
})
}
}
4 changes: 4 additions & 0 deletions influxdb/token_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (tc *tokenClient) QueryFluxResponse(q FluxQuery) (*Response, error) {
return tc.client.QueryFluxResponse(q)
}

func (tc *tokenClient) CreateBucketV2(bucket, org, orgID string) error {
return tc.client.CreateBucketV2(bucket, org, orgID)
}

func (tc *tokenClient) Open() error {
tc.mu.Lock()
defer tc.mu.Unlock()
Expand Down
1 change: 0 additions & 1 deletion services/auth/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/influxdata/kapacitor/services/storage"
"github.com/influxdata/kapacitor/tlsconfig"
"github.com/pkg/errors"

"golang.org/x/crypto/bcrypt"
)

Expand Down
5 changes: 3 additions & 2 deletions services/bigpanda/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package bigpanda

import (
"bytes"
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/models"
"testing"
"time"

"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/models"
)

func TestService_SerializeEventData(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions services/discord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (
"io"
"io/ioutil"
"net/http"
"sync"
text "text/template"
"time"

"sync"

"github.com/influxdata/kapacitor/alert"
khttp "github.com/influxdata/kapacitor/http"
"github.com/influxdata/kapacitor/keyvalue"
Expand Down
17 changes: 14 additions & 3 deletions services/fluxtask/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fluxtask

import (
"context"
"fmt"
"time"

"github.com/influxdata/influxdb/v2/kit/platform"
Expand Down Expand Up @@ -42,9 +43,16 @@ func (s *Service) Open() error {
if s.config.Enabled {
// create the task stack
s.kvService = kv.New(s.StorageService)
s.kvService.Open()
if err := s.kvService.Open(); err != nil {
return err
}
bucket := s.config.TaskRunBucket
if bucket == "" {
// Deal with previous default of empty string
bucket = task.DefaultTaskRunBucket
}
dataDestination := backend.DataDestination{
Bucket: s.config.TaskRunBucket,
Bucket: bucket,
Org: s.config.TaskRunOrg,
OrgID: s.config.TaskRunOrgID,
Measurement: s.config.TaskRunMeasurement,
Expand All @@ -60,13 +68,16 @@ func (s *Service) Open() error {
if err != nil {
return err
}
combinedTaskService := backend.NewAnalyticalStorage(
combinedTaskService, err := backend.NewAnalyticalStorage(
s.logger.With(zap.String("service", "fluxtask-analytical-store")),
taskService,
taskControlService,
cli,
dataDestination,
)
if err != nil {
return fmt.Errorf("creating analytical store: %w", err)
}
taskService = combinedTaskService
taskControlService = combinedTaskService
}
Expand Down

0 comments on commit b498fce

Please sign in to comment.