Skip to content

Commit

Permalink
feat(notebooks): notebooks database implementation (#21573)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamhbaker committed May 28, 2021
1 parent c267b31 commit ed629bf
Show file tree
Hide file tree
Showing 14 changed files with 469 additions and 168 deletions.
9 changes: 4 additions & 5 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,11 +940,10 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
)
}

notebookSvc, err := notebooks.NewService()
if err != nil {
m.log.Error("Failed to initialize notebook service", zap.Error(err))
return err
}
notebookSvc := notebooks.NewService(
m.log.With(zap.String("service", "notebooks")),
m.sqlStore,
)
notebookServer := notebookTransport.NewNotebookHandler(
m.log.With(zap.String("handler", "notebooks")),
authorizer.NewNotebookService(notebookSvc),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ require (
github.com/influxdata/pkg-config v0.2.7
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jessevdk/go-flags v1.4.0
github.com/jmoiron/sqlx v1.3.4
github.com/jsternberg/zap-logfmt v1.2.0
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect
github.com/kevinburke/go-bindata v3.11.0+incompatible
github.com/lib/pq v1.2.0 // indirect
github.com/mattn/go-isatty v0.0.12
github.com/mattn/go-sqlite3 v1.14.7
github.com/matttproud/golang_protobuf_extensions v1.0.1
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGAR
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmoiron/sqlx v1.3.4 h1:wv+0IJZfL5z0uZoUjlpKgHkgaFSYD+r9CfrXjEXsO7w=
github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -413,6 +415,7 @@ github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA=
github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104 h1:d8RFOZ2IiFtFWBcKEHAFYJcPTf0wY5q0exFNJZVWa1U=
Expand Down
18 changes: 18 additions & 0 deletions kit/platform/id.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package platform

import (
"database/sql/driver"
"encoding/binary"
"encoding/hex"
"strconv"
Expand Down Expand Up @@ -143,3 +144,20 @@ func (i ID) MarshalText() ([]byte, error) {
func (i *ID) UnmarshalText(b []byte) error {
return i.Decode(b)
}

// Value implements the database/sql Valuer interface for adding IDs to a sql database.
func (i ID) Value() (driver.Value, error) {
return i.String(), nil
}

// Scan implements the database/sql Scanner interface for retrieving IDs from a sql database.
func (i *ID) Scan(value interface{}) error {
switch v := value.(type) {
case int64:
return i.DecodeFromString(strconv.FormatInt(v, 10))
case string:
return i.DecodeFromString(v)
default:
return ErrInvalidID
}
}
36 changes: 30 additions & 6 deletions notebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package influxdb

import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/influxdata/influxdb/v2/kit/platform"
Expand Down Expand Up @@ -36,17 +39,38 @@ func fieldRequiredError(field string) error {

// Notebook represents all visual and query data for a notebook.
type Notebook struct {
OrgID platform.ID `json:"orgID"`
ID platform.ID `json:"id"`
Name string `json:"name"`
Spec NotebookSpec `json:"spec"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
OrgID platform.ID `json:"orgID" db:"org_id"`
ID platform.ID `json:"id" db:"id"`
Name string `json:"name" db:"name"`
Spec NotebookSpec `json:"spec" db:"spec"`
CreatedAt time.Time `json:"createdAt" db:"created_at"`
UpdatedAt time.Time `json:"updatedAt" db:"updated_at"`
}

// NotebookSpec is an abitrary JSON object provided by the client.
type NotebookSpec map[string]interface{}

// Value implements the database/sql Valuer interface for adding NotebookSpecs to the database.
func (s NotebookSpec) Value() (driver.Value, error) {
spec, err := json.Marshal(s)
if err != nil {
return nil, err
}

return string(spec), nil
}

// Scan implements the database/sql Scanner interface for retrieving NotebookSpecs from the database.
func (s *NotebookSpec) Scan(value interface{}) error {
var spec NotebookSpec
if err := json.NewDecoder(strings.NewReader(value.(string))).Decode(&spec); err != nil {
return err
}

*s = spec
return nil
}

// NotebookService is the service contract for Notebooks.
type NotebookService interface {
GetNotebook(ctx context.Context, id platform.ID) (*Notebook, error)
Expand Down
115 changes: 0 additions & 115 deletions notebooks/fake_store.go

This file was deleted.

152 changes: 152 additions & 0 deletions notebooks/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package notebooks

import (
"context"
"database/sql"
"errors"
"time"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/influxdata/influxdb/v2/snowflake"
"github.com/influxdata/influxdb/v2/sqlite"
"go.uber.org/zap"
)

var _ influxdb.NotebookService = (*Service)(nil)

type Service struct {
store *sqlite.SqlStore
log *zap.Logger
idGenerator platform.IDGenerator
}

func NewService(logger *zap.Logger, store *sqlite.SqlStore) *Service {
return &Service{
store: store,
log: logger,
idGenerator: snowflake.NewIDGenerator(),
}
}

func (s *Service) GetNotebook(ctx context.Context, id platform.ID) (*influxdb.Notebook, error) {
var n influxdb.Notebook

query := `
SELECT id, org_id, name, spec, created_at, updated_at
FROM notebooks WHERE id = $1`

if err := s.store.DB.GetContext(ctx, &n, query, id); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, influxdb.ErrNotebookNotFound
}

return nil, err
}

return &n, nil
}

// CreateNotebook creates a notebook. Note that this and all "write" operations on the database need to use the Mutex lock,
// since sqlite can only handle 1 concurrent write operation at a time.
func (s *Service) CreateNotebook(ctx context.Context, create *influxdb.NotebookReqBody) (*influxdb.Notebook, error) {
s.store.Mu.Lock()
defer s.store.Mu.Unlock()

nowTime := time.Now().UTC()
n := influxdb.Notebook{
ID: s.idGenerator.ID(),
OrgID: create.OrgID,
Name: create.Name,
Spec: create.Spec,
CreatedAt: nowTime,
UpdatedAt: nowTime,
}

query := `
INSERT INTO notebooks (id, org_id, name, spec, created_at, updated_at)
VALUES (:id, :org_id, :name, :spec, :created_at, :updated_at)`

_, err := s.store.DB.NamedExecContext(ctx, query, &n)
if err != nil {
return nil, err
}

// Ideally, the create query would use "RETURNING" in order to avoid making a separate query.
// Unfortunately this breaks the scanning of values into the result struct, so we have to make a separate
// SELECT request to return the result from the database.
return s.GetNotebook(ctx, n.ID)
}

// UpdateNotebook updates a notebook.
func (s *Service) UpdateNotebook(ctx context.Context, id platform.ID, update *influxdb.NotebookReqBody) (*influxdb.Notebook, error) {
s.store.Mu.Lock()
defer s.store.Mu.Unlock()

nowTime := time.Now().UTC()
n := influxdb.Notebook{
ID: id,
OrgID: update.OrgID,
Name: update.Name,
Spec: update.Spec,
UpdatedAt: nowTime,
}

query := `
UPDATE notebooks SET org_id = :org_id, name = :name, spec = :spec, updated_at = :updated_at
WHERE id = :id`

_, err := s.store.DB.NamedExecContext(ctx, query, &n)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, influxdb.ErrNotebookNotFound
}

return nil, err
}

return s.GetNotebook(ctx, n.ID)
}

// DeleteNotebook deletes a notebook.
func (s *Service) DeleteNotebook(ctx context.Context, id platform.ID) error {
s.store.Mu.Lock()
defer s.store.Mu.Unlock()

query := `
DELETE FROM notebooks
WHERE id = $1`

res, err := s.store.DB.ExecContext(ctx, query, id.String())
if err != nil {
return err
}

r, err := res.RowsAffected()
if err != nil {
return err
}

if r == 0 {
return influxdb.ErrNotebookNotFound
}

return nil
}

// ListNotebooks lists notebooks matching the provided filter. Currently, only org_id is used in the filter.
// Future uses may support pagination via this filter as well.
func (s *Service) ListNotebooks(ctx context.Context, filter influxdb.NotebookListFilter) ([]*influxdb.Notebook, error) {
var ns []*influxdb.Notebook

query := `
SELECT id, org_id, name, spec, created_at, updated_at
FROM notebooks
WHERE org_id = $1`

if err := s.store.DB.SelectContext(ctx, &ns, query, filter.OrgID); err != nil {
return nil, err
}

return ns, nil
}
Loading

0 comments on commit ed629bf

Please sign in to comment.