Permalink
Cannot retrieve contributors at this time
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
320 lines (273 sloc)
7.68 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Package dsstate implements the IPFS Cluster state interface using | |
| // an underlying go-datastore. | |
| package dsstate | |
| import ( | |
| "context" | |
| "io" | |
| "github.com/ipfs/ipfs-cluster/api" | |
| "github.com/ipfs/ipfs-cluster/state" | |
| cid "github.com/ipfs/go-cid" | |
| ds "github.com/ipfs/go-datastore" | |
| query "github.com/ipfs/go-datastore/query" | |
| dshelp "github.com/ipfs/go-ipfs-ds-help" | |
| logging "github.com/ipfs/go-log/v2" | |
| codec "github.com/ugorji/go/codec" | |
| trace "go.opencensus.io/trace" | |
| ) | |
| var _ state.State = (*State)(nil) | |
| var _ state.BatchingState = (*BatchingState)(nil) | |
| var logger = logging.Logger("dsstate") | |
| // State implements the IPFS Cluster "state" interface by wrapping | |
| // a go-datastore and choosing how api.Pin objects are stored | |
| // in it. It also provides serialization methods for the whole | |
| // state which are datastore-independent. | |
| type State struct { | |
| dsRead ds.Read | |
| dsWrite ds.Write | |
| codecHandle codec.Handle | |
| namespace ds.Key | |
| // version int | |
| } | |
| // DefaultHandle returns the codec handler of choice (Msgpack). | |
| func DefaultHandle() codec.Handle { | |
| h := &codec.MsgpackHandle{} | |
| return h | |
| } | |
| // New returns a new state using the given datastore. | |
| // | |
| // All keys are namespaced with the given string when written. Thus the same | |
| // go-datastore can be sharded for different uses. | |
| // | |
| // The Handle controls options for the serialization of the full state | |
| // (marshaling/unmarshaling). | |
| func New(dstore ds.Datastore, namespace string, handle codec.Handle) (*State, error) { | |
| if handle == nil { | |
| handle = DefaultHandle() | |
| } | |
| st := &State{ | |
| dsRead: dstore, | |
| dsWrite: dstore, | |
| codecHandle: handle, | |
| namespace: ds.NewKey(namespace), | |
| } | |
| return st, nil | |
| } | |
| // Add adds a new Pin or replaces an existing one. | |
| func (st *State) Add(ctx context.Context, c *api.Pin) error { | |
| _, span := trace.StartSpan(ctx, "state/dsstate/Add") | |
| defer span.End() | |
| ps, err := st.serializePin(c) | |
| if err != nil { | |
| return err | |
| } | |
| return st.dsWrite.Put(st.key(c.Cid), ps) | |
| } | |
| // Rm removes an existing Pin. It is a no-op when the | |
| // item does not exist. | |
| func (st *State) Rm(ctx context.Context, c cid.Cid) error { | |
| _, span := trace.StartSpan(ctx, "state/dsstate/Rm") | |
| defer span.End() | |
| err := st.dsWrite.Delete(st.key(c)) | |
| if err == ds.ErrNotFound { | |
| return nil | |
| } | |
| return err | |
| } | |
| // Get returns a Pin from the store and whether it | |
| // was present. When not present, a default pin | |
| // is returned. | |
| func (st *State) Get(ctx context.Context, c cid.Cid) (*api.Pin, error) { | |
| _, span := trace.StartSpan(ctx, "state/dsstate/Get") | |
| defer span.End() | |
| v, err := st.dsRead.Get(st.key(c)) | |
| if err != nil { | |
| if err == ds.ErrNotFound { | |
| return nil, state.ErrNotFound | |
| } | |
| return nil, err | |
| } | |
| p, err := st.deserializePin(c, v) | |
| if err != nil { | |
| return nil, err | |
| } | |
| return p, nil | |
| } | |
| // Has returns whether a Cid is stored. | |
| func (st *State) Has(ctx context.Context, c cid.Cid) (bool, error) { | |
| _, span := trace.StartSpan(ctx, "state/dsstate/Has") | |
| defer span.End() | |
| ok, err := st.dsRead.Has(st.key(c)) | |
| if err != nil { | |
| return false, err | |
| } | |
| return ok, nil | |
| } | |
| // List returns the unsorted list of all Pins that have been added to the | |
| // datastore. | |
| func (st *State) List(ctx context.Context) ([]*api.Pin, error) { | |
| _, span := trace.StartSpan(ctx, "state/dsstate/List") | |
| defer span.End() | |
| q := query.Query{ | |
| Prefix: st.namespace.String(), | |
| } | |
| results, err := st.dsRead.Query(q) | |
| if err != nil { | |
| return nil, err | |
| } | |
| defer results.Close() | |
| var pins []*api.Pin | |
| for r := range results.Next() { | |
| if r.Error != nil { | |
| logger.Errorf("error in query result: %s", r.Error) | |
| return pins, r.Error | |
| } | |
| k := ds.NewKey(r.Key) | |
| ci, err := st.unkey(k) | |
| if err != nil { | |
| logger.Warn("bad key (ignoring). key: ", k, "error: ", err) | |
| continue | |
| } | |
| p, err := st.deserializePin(ci, r.Value) | |
| if err != nil { | |
| logger.Errorf("error deserializing pin (%s): %s", r.Key, err) | |
| continue | |
| } | |
| pins = append(pins, p) | |
| } | |
| return pins, nil | |
| } | |
| // Migrate migrates an older state version to the current one. | |
| // This is a no-op for now. | |
| func (st *State) Migrate(ctx context.Context, r io.Reader) error { | |
| return nil | |
| } | |
| type serialEntry struct { | |
| Key string `codec:"k"` | |
| Value []byte `codec:"v"` | |
| } | |
| // Marshal dumps the state to a writer. It does this by encoding every | |
| // key/value in the store. The keys are stored without the namespace part to | |
| // reduce the size of the snapshot. | |
| func (st *State) Marshal(w io.Writer) error { | |
| q := query.Query{ | |
| Prefix: st.namespace.String(), | |
| } | |
| results, err := st.dsRead.Query(q) | |
| if err != nil { | |
| return err | |
| } | |
| defer results.Close() | |
| enc := codec.NewEncoder(w, st.codecHandle) | |
| for r := range results.Next() { | |
| if r.Error != nil { | |
| logger.Errorf("error in query result: %s", r.Error) | |
| return r.Error | |
| } | |
| k := ds.NewKey(r.Key) | |
| // reduce snapshot size by not storing the prefix | |
| err := enc.Encode(serialEntry{ | |
| Key: k.BaseNamespace(), | |
| Value: r.Value, | |
| }) | |
| if err != nil { | |
| logger.Error(err) | |
| return err | |
| } | |
| } | |
| return nil | |
| } | |
| // Unmarshal reads and parses a previous dump of the state. | |
| // All the parsed key/values are added to the store. As of now, | |
| // Unmarshal does not empty the existing store from any values | |
| // before unmarshaling from the given reader. | |
| func (st *State) Unmarshal(r io.Reader) error { | |
| dec := codec.NewDecoder(r, st.codecHandle) | |
| for { | |
| var entry serialEntry | |
| if err := dec.Decode(&entry); err == io.EOF { | |
| break | |
| } else if err != nil { | |
| return err | |
| } | |
| k := st.namespace.Child(ds.NewKey(entry.Key)) | |
| err := st.dsWrite.Put(k, entry.Value) | |
| if err != nil { | |
| logger.Error("error adding unmarshaled key to datastore:", err) | |
| return err | |
| } | |
| } | |
| return nil | |
| } | |
| // used to be on go-ipfs-ds-help | |
| func cidToDsKey(c cid.Cid) ds.Key { | |
| return dshelp.NewKeyFromBinary(c.Bytes()) | |
| } | |
| // used to be on go-ipfs-ds-help | |
| func dsKeyToCid(k ds.Key) (cid.Cid, error) { | |
| kb, err := dshelp.BinaryFromDsKey(k) | |
| if err != nil { | |
| return cid.Undef, err | |
| } | |
| return cid.Cast(kb) | |
| } | |
| // convert Cid to /namespace/cid1Key | |
| func (st *State) key(c cid.Cid) ds.Key { | |
| k := cidToDsKey(c) | |
| return st.namespace.Child(k) | |
| } | |
| // convert /namespace/cidKey to Cid | |
| func (st *State) unkey(k ds.Key) (cid.Cid, error) { | |
| return dsKeyToCid(ds.NewKey(k.BaseNamespace())) | |
| } | |
| // this decides how a Pin object is serialized to be stored in the | |
| // datastore. Changing this may require a migration! | |
| func (st *State) serializePin(c *api.Pin) ([]byte, error) { | |
| return c.ProtoMarshal() | |
| } | |
| // this deserializes a Pin object from the datastore. It should be | |
| // the exact opposite from serializePin. | |
| func (st *State) deserializePin(c cid.Cid, buf []byte) (*api.Pin, error) { | |
| p := &api.Pin{} | |
| err := p.ProtoUnmarshal(buf) | |
| p.Cid = c | |
| return p, err | |
| } | |
| // BatchingState implements the IPFS Cluster "state" interface by wrapping a | |
| // batching go-datastore. All writes are batched and only written disk | |
| // when Commit() is called. | |
| type BatchingState struct { | |
| *State | |
| batch ds.Batch | |
| } | |
| // NewBatching returns a new batching statate using the given datastore. | |
| // | |
| // All keys are namespaced with the given string when written. Thus the same | |
| // go-datastore can be sharded for different uses. | |
| // | |
| // The Handle controls options for the serialization of the full state | |
| // (marshaling/unmarshaling). | |
| func NewBatching(dstore ds.Batching, namespace string, handle codec.Handle) (*BatchingState, error) { | |
| if handle == nil { | |
| handle = DefaultHandle() | |
| } | |
| batch, err := dstore.Batch() | |
| if err != nil { | |
| return nil, err | |
| } | |
| st := &State{ | |
| dsRead: dstore, | |
| dsWrite: batch, | |
| codecHandle: handle, | |
| namespace: ds.NewKey(namespace), | |
| } | |
| bst := &BatchingState{} | |
| bst.State = st | |
| bst.batch = batch | |
| return bst, nil | |
| } | |
| // Commit persists the batched write operations. | |
| func (bst *BatchingState) Commit(ctx context.Context) error { | |
| _, span := trace.StartSpan(ctx, "state/dsstate/Commit") | |
| defer span.End() | |
| return bst.batch.Commit() | |
| } |