Skip to content

Commit

Permalink
add more storage for metadata (#207)
Browse files Browse the repository at this point in the history
* add more storage for metadata

* fix pebble storage
  • Loading branch information
hengfeiyang committed Jun 14, 2022
1 parent 42a3e38 commit 2a2f918
Show file tree
Hide file tree
Showing 11 changed files with 733 additions and 7 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/blugelabs/query_string v0.3.0
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/bwmarrin/snowflake v0.3.0
github.com/cockroachdb/pebble v0.0.0-20220613151633-4a952c0d3bdd
github.com/dgraph-io/badger/v3 v3.2103.2
github.com/getsentry/sentry-go v0.13.0
github.com/gin-contrib/cors v1.3.1
Expand All @@ -37,6 +38,7 @@ require (
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/zsais/go-gin-prometheus v0.0.0-20200217150448-2199a42d96c1
go.etcd.io/bbolt v1.3.6
go.etcd.io/etcd/client/v3 v3.5.4
go.opencensus.io v0.23.0 // indirect
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce
Expand Down
83 changes: 82 additions & 1 deletion go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/auth/firststart.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func init() {
}
if firstStart {
if err := initFirstUser(); err != nil {
log.Fatal().Msg(err.Error())
log.Fatal().Err(err).Msg("init first user")
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type config struct {
ServerMode string `env:"ZINC_SERVER_MODE,default=node"`
NodeID string `env:"ZINC_NODE_ID,default=1"`
DataPath string `env:"ZINC_DATA_PATH,default=./data"`
MetadataStorage string `env:"ZINC_METADATA_STORAGE,default=bolt"`
IceCompressor string `env:"ZINC_ICE_COMPRESSOR"`
SentryEnable bool `env:"ZINC_SENTRY,default=true"`
SentryDSN string `env:"ZINC_SENTRY_DSN,default=https://15b6d9b8be824b44896f32b0234c32b7@o1218932.ingest.sentry.io/6360942"`
Expand Down
11 changes: 10 additions & 1 deletion pkg/metadata/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"github.com/zinclabs/zinc/pkg/config"
"github.com/zinclabs/zinc/pkg/metadata/storage"
"github.com/zinclabs/zinc/pkg/metadata/storage/badger"
"github.com/zinclabs/zinc/pkg/metadata/storage/bolt"
"github.com/zinclabs/zinc/pkg/metadata/storage/etcd"
"github.com/zinclabs/zinc/pkg/metadata/storage/pebble"
)

var ErrorKeyNotExists = errors.New("key not exists")
Expand All @@ -33,7 +35,14 @@ func init() {
if strings.ToLower(config.Global.ServerMode) == "cluster" {
db = etcd.New(config.Global.Etcd.Prefix + "/metadata")
} else {
db = badger.New("_metadata.db")
switch strings.ToLower(config.Global.MetadataStorage) {
case "badger":
db = badger.New("_metadata.db")
case "pebble":
db = pebble.New("_metadata.peb")
default:
db = bolt.New("_metadata.bolt")
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/metadata/storage/badger/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ func Test_badgerStorage_List(t *testing.T) {
{
name: "normal",
args: args{
prefix: "/test",
prefix: "/test/",
},
wantNum: 1,
wantErr: false,
},
{
name: "empty",
args: args{
prefix: "/notexist",
prefix: "/notexist/",
},
wantNum: 0,
wantErr: false,
Expand Down
131 changes: 131 additions & 0 deletions pkg/metadata/storage/bolt/bolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/* Copyright 2022 Zinc Labs Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bolt

import (
"bytes"
"os"
"path"

"github.com/rs/zerolog/log"
"go.etcd.io/bbolt"

"github.com/zinclabs/zinc/pkg/config"
"github.com/zinclabs/zinc/pkg/errors"
"github.com/zinclabs/zinc/pkg/metadata/storage"
)

type boltStorage struct {
db *bbolt.DB
}

func New(dbpath string) storage.Storager {
db, err := openbboltDB(path.Join(config.Global.DataPath, dbpath), false)
if err != nil {
log.Fatal().Err(err).Msg("open bbolt db for metadata failed")
}
return &boltStorage{db}
}

func openbboltDB(dbpath string, readOnly bool) (*bbolt.DB, error) {
opt := &bbolt.Options{
Timeout: 0,
NoGrowSync: false,
FreelistType: bbolt.FreelistArrayType,
}
if err := os.MkdirAll(path.Dir(dbpath), 0755); err != nil {
return nil, err
}
return bbolt.Open(dbpath, 0666, opt)
}

func (t *boltStorage) List(prefix string, _, _ int) ([][]byte, error) {
data := make([][]byte, 0)
bucket, _ := t.splitBucketAndKey(prefix)
err := t.db.View(func(txn *bbolt.Tx) error {
b := txn.Bucket(bucket)
if b == nil {
return nil
}
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
valCopy := make([]byte, len(v))
copy(valCopy, v)
data = append(data, valCopy)
}
return nil
})
return data, err
}

func (t *boltStorage) Get(key string) ([]byte, error) {
var data []byte
bucket, name := t.splitBucketAndKey(key)
err := t.db.View(func(txn *bbolt.Tx) error {
b := txn.Bucket(bucket)
if b == nil {
return errors.ErrKeyNotFound
}
v := b.Get(name)
if v == nil {
return errors.ErrKeyNotFound
}
data = make([]byte, len(v))
copy(data, v)
return nil
})
return data, err
}

func (t *boltStorage) Set(key string, value []byte) error {
if key == "" {
return errors.ErrEmptyKey
}
bucket, name := t.splitBucketAndKey(key)
return t.db.Update(func(txn *bbolt.Tx) error {
b, err := txn.CreateBucketIfNotExists(bucket)
if err != nil {
return err
}
return b.Put(name, value)
})
}

func (t *boltStorage) Delete(key string) error {
if key == "" {
return errors.ErrEmptyKey
}
bucket, name := t.splitBucketAndKey(key)
return t.db.Update(func(Tx *bbolt.Tx) error {
b := Tx.Bucket(bucket)
if b == nil {
return nil
}
return b.Delete(name)
})
}

func (t *boltStorage) Close() error {
return t.db.Close()
}

func (t *boltStorage) splitBucketAndKey(key string) ([]byte, []byte) {
if key == "" {
return nil, nil
}
p := bytes.LastIndex([]byte(key), []byte("/"))
return []byte(key[:p]), []byte(key[p+1:])
}

0 comments on commit 2a2f918

Please sign in to comment.